Skip to main content
The stream() function enables real-time streaming of model responses, providing a better user experience for long-form content.

Basic Usage

Stream a chat completion with async iteration:
import { stream } from '@core-ai/core-ai';
import { createOpenAI } from '@core-ai/openai';

const openai = createOpenAI({ apiKey: process.env.OPENAI_API_KEY });
const model = openai.chatModel('gpt-5-mini');

const result = await stream({
  model,
  messages: [
    {
      role: 'user',
      content: 'Write a short haiku about strongly typed APIs.',
    },
  ],
});

console.log('Streaming output:\n');
for await (const event of result) {
  if (event.type === 'text-delta') {
    process.stdout.write(event.text);
  }
}

const response = await result.toResponse();
console.log('\n\nFinish reason:', response.finishReason);
console.log('Usage:', response.usage);

Stream Event Types

The stream emits different event types:
type StreamEvent =
  | { type: 'reasoning-start' }
  | { type: 'reasoning-delta'; text: string }
  | { type: 'reasoning-end' }
  | { type: 'text-delta'; text: string }
  | { type: 'tool-call-start'; toolCallId: string; toolName: string }
  | { type: 'tool-call-delta'; toolCallId: string; argumentsDelta: string }
  | { type: 'tool-call-end'; toolCall: ToolCall }
  | { type: 'finish'; finishReason: FinishReason; usage: ChatUsage };

Handling Different Event Types

Process different event types for rich streaming experiences:
const result = await stream({ model, messages });

let textBuffer = '';
let reasoningBuffer = '';

for await (const event of result) {
  switch (event.type) {
    case 'reasoning-start':
      console.log('\n[Model is thinking...]\n');
      break;

    case 'reasoning-delta':
      reasoningBuffer += event.text;
      process.stdout.write(event.text);
      break;

    case 'reasoning-end':
      console.log('\n[Reasoning complete]\n');
      break;

    case 'text-delta':
      textBuffer += event.text;
      process.stdout.write(event.text);
      break;

    case 'tool-call-start':
      console.log(`\n[Calling tool: ${event.toolName}]`);
      break;

    case 'tool-call-delta':
      // Tool arguments are being streamed
      break;

    case 'tool-call-end':
      console.log(`Tool call: ${event.toolCall.name}`);
      console.log('Arguments:', event.toolCall.arguments);
      break;

    case 'finish':
      console.log('\n\nFinished:', event.finishReason);
      console.log('Usage:', event.usage);
      break;
  }
}

Getting the Complete Response

Convert the stream to a complete response:
const result = await stream({ model, messages });

// Process events in real-time
for await (const event of result) {
  if (event.type === 'text-delta') {
    process.stdout.write(event.text);
  }
}

// Get the complete response after streaming
const response = await result.toResponse();

console.log('\nComplete content:', response.content);
console.log('Reasoning:', response.reasoning);
console.log('Finish reason:', response.finishReason);
console.log('Token usage:', response.usage);
Call toResponse() only after consuming all stream events. Calling it before the stream completes will wait for all events to finish.

Streaming with Configuration

Apply the same configuration options as generate():
const result = await stream({
  model,
  messages: [{ role: 'user', content: 'Write a story.' }],
  config: {
    temperature: 0.8,
    maxTokens: 1000,
    stopSequences: ['The End'],
  },
});

for await (const event of result) {
  if (event.type === 'text-delta') {
    process.stdout.write(event.text);
  }
}

UI Integration Examples

import { useState } from 'react';
import { stream } from '@core-ai/core-ai';

function StreamingChat() {
  const [content, setContent] = useState('');
  const [isStreaming, setIsStreaming] = useState(false);

  const handleStream = async () => {
    setIsStreaming(true);
    setContent('');

    const result = await stream({
      model,
      messages: [{ role: 'user', content: 'Hello!' }],
    });

    for await (const event of result) {
      if (event.type === 'text-delta') {
        setContent((prev) => prev + event.text);
      }
    }

    setIsStreaming(false);
  };

  return (
    <div>
      <button onClick={handleStream} disabled={isStreaming}>
        {isStreaming ? 'Streaming...' : 'Start Stream'}
      </button>
      <div>{content}</div>
    </div>
  );
}

Streaming with Tools

Handle tool calls during streaming:
import { stream, defineTool } from '@core-ai/core-ai';
import { z } from 'zod';

const weatherTool = defineTool({
  name: 'get_weather',
  description: 'Get weather information',
  parameters: z.object({
    location: z.string(),
  }),
});

const result = await stream({
  model,
  messages: [{ role: 'user', content: 'What is the weather in Berlin?' }],
  tools: { get_weather: weatherTool },
});

const toolCalls: ToolCall[] = [];

for await (const event of result) {
  switch (event.type) {
    case 'text-delta':
      process.stdout.write(event.text);
      break;

    case 'tool-call-start':
      console.log(`\nCalling: ${event.toolName}`);
      break;

    case 'tool-call-end':
      toolCalls.push(event.toolCall);
      break;
  }
}

const response = await result.toResponse();
if (response.finishReason === 'tool-calls') {
  console.log('Tool calls:', response.toolCalls);
  // Handle tool execution here
}

Abort Streaming

Cancel streaming with AbortController:
const controller = new AbortController();

const result = await stream({
  model,
  messages: [{ role: 'user', content: 'Write a long story.' }],
  signal: controller.signal,
});

// Cancel after 5 seconds
setTimeout(() => controller.abort(), 5000);

try {
  for await (const event of result) {
    if (event.type === 'text-delta') {
      process.stdout.write(event.text);
    }
  }
} catch (error) {
  if (error.name === 'AbortError') {
    console.log('\nStream cancelled');
  }
}

Error Handling

Handle errors during streaming:
import { LLMError, ProviderError } from '@core-ai/core-ai';

try {
  const result = await stream({ model, messages });

  for await (const event of result) {
    if (event.type === 'text-delta') {
      process.stdout.write(event.text);
    }
  }

  const response = await result.toResponse();
  console.log('\nComplete:', response.finishReason);
} catch (error) {
  if (error instanceof ProviderError) {
    console.error('Provider error:', error.message);
  } else if (error instanceof LLMError) {
    console.error('LLM error:', error.message);
  } else {
    console.error('Unknown error:', error);
  }
}

Best Practices

Make sure to iterate through all events before calling toResponse():
const result = await stream({ model, messages });

// Good: consume all events first
for await (const event of result) {
  // Process events
}
const response = await result.toResponse();

// Bad: calling toResponse() without consuming events
// const response = await result.toResponse(); // This will hang
Accumulate text deltas for complete content:
let fullText = '';

for await (const event of result) {
  if (event.type === 'text-delta') {
    fullText += event.text;
    updateUI(fullText); // Update UI with accumulated text
  }
}
Distinguish between reasoning and response text:
let reasoning = '';
let response = '';

for await (const event of result) {
  if (event.type === 'reasoning-delta') {
    reasoning += event.text;
    // Show in a separate "thinking" UI
  } else if (event.type === 'text-delta') {
    response += event.text;
    // Show in the main response area
  }
}

Next Steps