Skip to main content

Overview

The stream() function streams responses from a chat model in real-time, allowing you to process tokens as they’re generated. This is ideal for interactive applications where you want to display responses progressively.

Function Signature

export async function stream(
    params: StreamParams
): Promise<StreamResult>

export type StreamParams = GenerateOptions & {
    model: ChatModel;
};

Parameters

model
ChatModel
required
The chat model instance to use for streaming.
messages
Message[]
required
Array of messages in the conversation. Must not be empty.
reasoning
ReasoningConfig
Configuration for extended thinking/reasoning capabilities.
tools
ToolSet
Object mapping tool names to tool definitions.
toolChoice
ToolChoice
Controls how the model uses tools: 'auto', 'none', 'required', or { type: 'tool', toolName: string }.
config
ModelConfig
Model configuration parameters (temperature, maxTokens, etc.).
providerOptions
Record<string, unknown>
Provider-specific options.
signal
AbortSignal
AbortSignal for cancelling the stream.

Return Value

Returns a Promise<StreamResult>, which is an async iterable that yields StreamEvent objects.

StreamResult

type StreamResult = AsyncIterable<StreamEvent> & {
    toResponse(): Promise<GenerateResult>;
};
[Symbol.asyncIterator]
AsyncIterator<StreamEvent>
Async iterator for streaming events. Can only be iterated once.
toResponse
() => Promise<GenerateResult>
Consumes the entire stream and returns the final result. Can be called without iterating.

StreamEvent Types

The stream yields different event types:
reasoning-start
{ type: 'reasoning-start' }
Emitted when reasoning/thinking begins.
reasoning-delta
{ type: 'reasoning-delta'; text: string }
Emitted for each chunk of reasoning text.
reasoning-end
{ type: 'reasoning-end' }
Emitted when reasoning completes.
text-delta
{ type: 'text-delta'; text: string }
Emitted for each chunk of response text.
tool-call-start
{ type: 'tool-call-start'; toolCallId: string; toolName: string }
Emitted when a tool call begins.
tool-call-delta
{ type: 'tool-call-delta'; toolCallId: string; argumentsDelta: string }
Emitted for each chunk of tool call arguments.
tool-call-end
{ type: 'tool-call-end'; toolCall: ToolCall }
Emitted when a tool call completes with the full tool call object.
finish
{ type: 'finish'; finishReason: FinishReason; usage: ChatUsage }
Emitted when streaming completes with final metadata.

Examples

Basic Streaming

import { stream } from '@coreai/core';
import { openai } from '@coreai/openai';

const result = await stream({
  model: openai('gpt-4'),
  messages: [
    { role: 'user', content: 'Write a short story' }
  ]
});

for await (const event of result) {
  if (event.type === 'text-delta') {
    process.stdout.write(event.text);
  }
}

Handling All Event Types

const result = await stream({
  model: openai('gpt-4'),
  messages: [
    { role: 'user', content: 'Explain quantum physics' }
  ],
  reasoning: { effort: 'high' }
});

for await (const event of result) {
  switch (event.type) {
    case 'reasoning-start':
      console.log('\n[Thinking...]');
      break;
    case 'reasoning-delta':
      process.stdout.write(event.text);
      break;
    case 'reasoning-end':
      console.log('\n[Done thinking]\n');
      break;
    case 'text-delta':
      process.stdout.write(event.text);
      break;
    case 'finish':
      console.log('\n\nTokens used:', event.usage.outputTokens);
      break;
  }
}

Using toResponse()

// Don't iterate, just get the final result
const result = await stream({
  model: openai('gpt-4'),
  messages: [
    { role: 'user', content: 'Hello' }
  ]
});

const finalResult = await result.toResponse();
console.log(finalResult.content);
console.log('Usage:', finalResult.usage);

Streaming with Tools

import { defineTool } from '@coreai/core';
import { z } from 'zod';

const result = await stream({
  model: openai('gpt-4'),
  messages: [
    { role: 'user', content: 'What\'s the weather in Tokyo?' }
  ],
  tools: {
    get_weather: defineTool({
      name: 'get_weather',
      description: 'Get weather for a location',
      parameters: z.object({ location: z.string() })
    })
  }
});

for await (const event of result) {
  if (event.type === 'tool-call-start') {
    console.log('Calling tool:', event.toolName);
  } else if (event.type === 'tool-call-end') {
    console.log('Tool arguments:', event.toolCall.arguments);
  } else if (event.type === 'text-delta') {
    process.stdout.write(event.text);
  }
}

Cancellation

const controller = new AbortController();

const result = await stream({
  model: openai('gpt-4'),
  messages: [
    { role: 'user', content: 'Write a very long essay' }
  ],
  signal: controller.signal
});

// Cancel after 5 seconds
setTimeout(() => controller.abort(), 5000);

try {
  for await (const event of result) {
    if (event.type === 'text-delta') {
      process.stdout.write(event.text);
    }
  }
} catch (error) {
  console.log('\nStream cancelled');
}

Important Notes

A StreamResult can only be iterated once. Attempting to iterate multiple times will throw an error: “Stream can only be iterated once”
Calling toResponse() without iterating will automatically consume the stream in the background and return a promise for the final result.

Error Handling

Throws LLMError if:
  • Messages array is empty
  • Model encounters an error during streaming
try {
  const result = await stream({
    model: openai('gpt-4'),
    messages: []
  });
} catch (error) {
  if (error instanceof LLMError) {
    console.error('Stream failed:', error.message);
  }
}

Source Location

~/workspace/source/packages/core-ai/src/stream-chat.ts:12