Skip to main content

Overview

The stream() function streams responses from a chat model in real-time, allowing you to process tokens as they’re generated. This is ideal for interactive applications where you want to display responses progressively.

Function signature

export async function stream(
    params: StreamParams
): Promise<ChatStream>

export type StreamParams = GenerateOptions & {
    model: ChatModel;
};

Parameters

model
ChatModel
required
The chat model instance to use for streaming.
messages
Message[]
required
Array of messages in the conversation. Must not be empty.
temperature
number
Sampling temperature (0-2). Higher values make output more random.
maxTokens
number
Maximum number of tokens to generate.
topP
number
Nucleus sampling parameter (0-1).
reasoning
ReasoningConfig
Configuration for extended thinking/reasoning capabilities.
tools
ToolSet
Object mapping tool names to tool definitions.
toolChoice
ToolChoice
Controls how the model uses tools: 'auto', 'none', 'required', or { type: 'tool', toolName: string }.
providerOptions
GenerateProviderOptions
Provider-specific options, namespaced by provider name.
signal
AbortSignal
AbortSignal for cancelling the stream.

Return value

Returns a Promise<ChatStream>. ChatStream is an async iterable of StreamEvent objects with two additional properties:
type ChatStream = AsyncIterable<StreamEvent> & {
    readonly result: Promise<GenerateResult>;
    readonly events: Promise<readonly StreamEvent[]>;
};
result
Promise<GenerateResult>
Resolves with the aggregated final response when the stream completes. Rejects on abort or upstream failure.
events
Promise<readonly StreamEvent[]>
Resolves with all observed events, including abort and failure cases.
The HTTP request starts as soon as you create the stream. You do not need to iterate before the model begins responding.

StreamEvent types

reasoning-start
{ type: 'reasoning-start' }
Emitted when reasoning/thinking begins.
reasoning-delta
{ type: 'reasoning-delta'; text: string }
Emitted for each chunk of reasoning text.
reasoning-end
{ type: 'reasoning-end'; providerMetadata?: Record<string, Record<string, unknown>> }
Emitted when reasoning completes. May include provider-namespaced metadata.
text-delta
{ type: 'text-delta'; text: string }
Emitted for each chunk of response text.
tool-call-start
{ type: 'tool-call-start'; toolCallId: string; toolName: string }
Emitted when a tool call begins.
tool-call-delta
{ type: 'tool-call-delta'; toolCallId: string; argumentsDelta: string }
Emitted for each chunk of tool call arguments.
tool-call-end
{ type: 'tool-call-end'; toolCall: ToolCall }
Emitted when a tool call completes with the full tool call object.
finish
{ type: 'finish'; finishReason: FinishReason; usage: ChatUsage }
Emitted when streaming completes with final metadata.

Examples

Basic streaming

import { stream } from '@core-ai/core-ai';
import { createOpenAI } from '@core-ai/openai';

const openai = createOpenAI();
const model = openai.chatModel('gpt-5-mini');

const chatStream = await stream({
  model,
  messages: [
    { role: 'user', content: 'Write a short story' }
  ]
});

for await (const event of chatStream) {
  if (event.type === 'text-delta') {
    process.stdout.write(event.text);
  }
}

Handling all event types

const chatStream = await stream({
  model,
  messages: [
    { role: 'user', content: 'Explain quantum physics' }
  ],
  reasoning: { effort: 'high' }
});

for await (const event of chatStream) {
  switch (event.type) {
    case 'reasoning-start':
      console.log('\n[Thinking...]');
      break;
    case 'reasoning-delta':
      process.stdout.write(event.text);
      break;
    case 'reasoning-end':
      console.log('\n[Done thinking]\n');
      break;
    case 'text-delta':
      process.stdout.write(event.text);
      break;
    case 'finish':
      console.log('\n\nTokens used:', event.usage.outputTokens);
      break;
  }
}

Using .result

const chatStream = await stream({
  model,
  messages: [
    { role: 'user', content: 'Hello' }
  ]
});

const finalResult = await chatStream.result;
console.log(finalResult.content);
console.log('Usage:', finalResult.usage);

Using .events

const chatStream = await stream({
  model,
  messages: [{ role: 'user', content: 'Explain streaming' }],
});

const events = await chatStream.events;
console.log(events.map((event) => event.type));
.result resolves when the stream completes, regardless of event consumption. .events resolves with the observed history even on abort or upstream failure. Streams are replayable, so late iteration replays buffered events before continuing live.

Streaming with tools

import { stream, defineTool } from '@core-ai/core-ai';
import { z } from 'zod';

const chatStream = await stream({
  model,
  messages: [
    { role: 'user', content: 'What\'s the weather in Tokyo?' }
  ],
  tools: {
    get_weather: defineTool({
      name: 'get_weather',
      description: 'Get weather for a location',
      parameters: z.object({ location: z.string() })
    })
  }
});

for await (const event of chatStream) {
  if (event.type === 'tool-call-start') {
    console.log('Calling tool:', event.toolName);
  } else if (event.type === 'tool-call-end') {
    console.log('Tool arguments:', event.toolCall.arguments);
  } else if (event.type === 'text-delta') {
    process.stdout.write(event.text);
  }
}

Cancellation

const controller = new AbortController();

const chatStream = await stream({
  model,
  messages: [
    { role: 'user', content: 'Write a very long essay' }
  ],
  signal: controller.signal
});

setTimeout(() => controller.abort(), 5000);

try {
  for await (const event of chatStream) {
    if (event.type === 'text-delta') {
      process.stdout.write(event.text);
    }
  }
} catch (error) {
  console.log('\nStream cancelled');
}

Important notes

ChatStream is replayable: iterating after events have already arrived replays the buffered event history before waiting for later events.
.result resolves with the aggregated final response when the stream completes, regardless of whether you consumed the events via iteration.

Error handling

Throws ValidationError if:
  • Messages array is empty
May also throw:
  • ProviderError if the provider returns an error during streaming
Throws StreamAbortedError if the stream is aborted via the signal.
import { StreamAbortedError, ValidationError } from '@core-ai/core-ai';

try {
  const chatStream = await stream({
    model,
    messages: []
  });
} catch (error) {
  if (error instanceof StreamAbortedError) {
    console.error('Stream was aborted');
  } else if (error instanceof ValidationError) {
    console.error('Stream failed:', error.message);
  }
}