The stream() function enables real-time streaming of model responses, providing a better user experience for long-form content.
Basic Usage
Stream a chat completion with async iteration:
import { stream } from '@core-ai/core-ai' ;
import { createOpenAI } from '@core-ai/openai' ;
const openai = createOpenAI ({ apiKey: process . env . OPENAI_API_KEY });
const model = openai . chatModel ( 'gpt-5-mini' );
const result = await stream ({
model ,
messages: [
{
role: 'user' ,
content: 'Write a short haiku about strongly typed APIs.' ,
},
],
});
console . log ( 'Streaming output: \n ' );
for await ( const event of result ) {
if ( event . type === 'text-delta' ) {
process . stdout . write ( event . text );
}
}
const response = await result . result ;
console . log ( ' \n\n Finish reason:' , response . finishReason );
console . log ( 'Usage:' , response . usage );
Stream event types
The stream() function returns a ChatStream — an async iterable that also exposes .result and .events:
type ChatStream = AsyncIterable < StreamEvent > & {
readonly result : Promise < GenerateResult >;
readonly events : Promise < readonly StreamEvent []>;
};
The stream emits different event types:
type StreamEvent =
| { type : 'reasoning-start' }
| { type : 'reasoning-delta' ; text : string }
| { type : 'reasoning-end' ; providerMetadata ?: Record < string , Record < string , unknown >> }
| { type : 'text-delta' ; text : string }
| { type : 'tool-call-start' ; toolCallId : string ; toolName : string }
| { type : 'tool-call-delta' ; toolCallId : string ; argumentsDelta : string }
| { type : 'tool-call-end' ; toolCall : ToolCall }
| { type : 'finish' ; finishReason : FinishReason ; usage : ChatUsage };
Handling Different Event Types
Process different event types for rich streaming experiences:
const result = await stream ({ model , messages });
let textBuffer = '' ;
let reasoningBuffer = '' ;
for await ( const event of result ) {
switch ( event . type ) {
case 'reasoning-start' :
console . log ( ' \n [Model is thinking...] \n ' );
break ;
case 'reasoning-delta' :
reasoningBuffer += event . text ;
process . stdout . write ( event . text );
break ;
case 'reasoning-end' :
console . log ( ' \n [Reasoning complete] \n ' );
break ;
case 'text-delta' :
textBuffer += event . text ;
process . stdout . write ( event . text );
break ;
case 'tool-call-start' :
console . log ( ` \n [Calling tool: ${ event . toolName } ]` );
break ;
case 'tool-call-delta' :
// Tool arguments are being streamed
break ;
case 'tool-call-end' :
console . log ( `Tool call: ${ event . toolCall . name } ` );
console . log ( 'Arguments:' , event . toolCall . arguments );
break ;
case 'finish' :
console . log ( ' \n\n Finished:' , event . finishReason );
console . log ( 'Usage:' , event . usage );
break ;
}
}
Getting the complete response
Access the aggregated result via .result, and the full event list via .events:
const chatStream = await stream ({ model , messages });
for await ( const event of chatStream ) {
if ( event . type === 'text-delta' ) {
process . stdout . write ( event . text );
}
}
const response = await chatStream . result ;
console . log ( ' \n Complete content:' , response . content );
console . log ( 'Reasoning:' , response . reasoning );
console . log ( 'Finish reason:' , response . finishReason );
console . log ( 'Token usage:' , response . usage );
Streaming starts immediately when you create the handle. .result resolves when the stream completes, .events resolves with the observed history even on abort or failure, and the handle is replayable for late iterators.
Streaming with configuration
Apply the same configuration options as generate():
const result = await stream ({
model ,
messages: [{ role: 'user' , content: 'Write a story.' }],
temperature: 0.8 ,
maxTokens: 1000 ,
providerOptions: {
openai: { store: true , user: 'user-123' },
},
});
for await ( const event of result ) {
if ( event . type === 'text-delta' ) {
process . stdout . write ( event . text );
}
}
UI Integration Examples
React
Node.js HTTP
Express
import { useState } from 'react' ;
import { stream } from '@core-ai/core-ai' ;
function StreamingChat () {
const [ content , setContent ] = useState ( '' );
const [ isStreaming , setIsStreaming ] = useState ( false );
const handleStream = async () => {
setIsStreaming ( true );
setContent ( '' );
const result = await stream ({
model ,
messages: [{ role: 'user' , content: 'Hello!' }],
});
for await ( const event of result ) {
if ( event . type === 'text-delta' ) {
setContent (( prev ) => prev + event . text );
}
}
setIsStreaming ( false );
};
return (
< div >
< button onClick = { handleStream } disabled = { isStreaming } >
{ isStreaming ? 'Streaming...' : 'Start Stream' }
</ button >
< div >{ content } </ div >
</ div >
);
}
import { stream } from '@core-ai/core-ai' ;
import { createServer } from 'http' ;
createServer ( async ( req , res ) => {
res . writeHead ( 200 , {
'Content-Type' : 'text/event-stream' ,
'Cache-Control' : 'no-cache' ,
'Connection' : 'keep-alive' ,
});
const result = await stream ({
model ,
messages: [{ role: 'user' , content: 'Hello!' }],
});
for await ( const event of result ) {
if ( event . type === 'text-delta' ) {
res . write ( `data: ${ JSON . stringify ({ text: event . text }) } \n\n ` );
}
}
res . end ();
}). listen ( 3000 );
import express from 'express' ;
import { stream } from '@core-ai/core-ai' ;
const app = express ();
app . get ( '/stream' , async ( req , res ) => {
res . setHeader ( 'Content-Type' , 'text/event-stream' );
res . setHeader ( 'Cache-Control' , 'no-cache' );
res . setHeader ( 'Connection' , 'keep-alive' );
const result = await stream ({
model ,
messages: [{ role: 'user' , content: 'Hello!' }],
});
for await ( const event of result ) {
if ( event . type === 'text-delta' ) {
res . write ( `data: ${ JSON . stringify ( event ) } \n\n ` );
}
}
res . end ();
});
app . listen ( 3000 );
Handle tool calls during streaming:
import { stream , defineTool } from '@core-ai/core-ai' ;
import { z } from 'zod' ;
const weatherTool = defineTool ({
name: 'get_weather' ,
description: 'Get weather information' ,
parameters: z . object ({
location: z . string (),
}),
});
const result = await stream ({
model ,
messages: [{ role: 'user' , content: 'What is the weather in Berlin?' }],
tools: { get_weather: weatherTool },
});
const toolCalls : ToolCall [] = [];
for await ( const event of result ) {
switch ( event . type ) {
case 'text-delta' :
process . stdout . write ( event . text );
break ;
case 'tool-call-start' :
console . log ( ` \n Calling: ${ event . toolName } ` );
break ;
case 'tool-call-end' :
toolCalls . push ( event . toolCall );
break ;
}
}
const response = await result . result ;
if ( response . finishReason === 'tool-calls' ) {
console . log ( 'Tool calls:' , response . toolCalls );
}
Abort Streaming
Cancel streaming with AbortController:
import { StreamAbortedError } from '@core-ai/core-ai' ;
const controller = new AbortController ();
const result = await stream ({
model ,
messages: [{ role: 'user' , content: 'Write a long story.' }],
signal: controller . signal ,
});
// Cancel after 5 seconds
setTimeout (() => controller . abort (), 5000 );
try {
for await ( const event of result ) {
if ( event . type === 'text-delta' ) {
process . stdout . write ( event . text );
}
}
} catch ( error ) {
if ( error instanceof StreamAbortedError ) {
console . log ( ' \n Stream cancelled' );
}
}
Error Handling
Handle errors during streaming:
import { CoreAIError , ProviderError } from '@core-ai/core-ai' ;
try {
const result = await stream ({ model , messages });
for await ( const event of result ) {
if ( event . type === 'text-delta' ) {
process . stdout . write ( event . text );
}
}
const response = await result . result ;
console . log ( ' \n Complete:' , response . finishReason );
} catch ( error ) {
if ( error instanceof ProviderError ) {
console . error ( 'Provider error:' , error . message );
} else if ( error instanceof CoreAIError ) {
console . error ( 'CoreAIError:' , error . message );
} else {
console . error ( 'Unknown error:' , error );
}
}
Best Practices
ChatStream buffers events internally. You can iterate multiple times, and .result resolves independently of event consumption:const chatStream = await stream ({ model , messages });
// Iterate events
for await ( const event of chatStream ) {
// Process events
}
// .result resolves when the stream completes
const response = await chatStream . result ;
// You can also get all events after the fact
const allEvents = await chatStream . events ;
Accumulate text deltas for complete content: let fullText = '' ;
for await ( const event of result ) {
if ( event . type === 'text-delta' ) {
fullText += event . text ;
updateUI ( fullText ); // Update UI with accumulated text
}
}
Handle reasoning separately
Distinguish between reasoning and response text: let reasoning = '' ;
let response = '' ;
for await ( const event of result ) {
if ( event . type === 'reasoning-delta' ) {
reasoning += event . text ;
// Show in a separate "thinking" UI
} else if ( event . type === 'text-delta' ) {
response += event . text ;
// Show in the main response area
}
}
Next Steps
Chat Completion Learn about non-streaming responses
Tool Calling Stream tool calls and responses
Structured Outputs Stream structured JSON objects
Multi-Modal Stream responses with images