The stream() function enables real-time streaming of model responses, providing a better user experience for long-form content.
Basic Usage
Stream a chat completion with async iteration:
import { stream } from '@core-ai/core-ai' ;
import { createOpenAI } from '@core-ai/openai' ;
const openai = createOpenAI ({ apiKey: process . env . OPENAI_API_KEY });
const model = openai . chatModel ( 'gpt-5-mini' );
const result = await stream ({
model ,
messages: [
{
role: 'user' ,
content: 'Write a short haiku about strongly typed APIs.' ,
},
],
});
console . log ( 'Streaming output: \n ' );
for await ( const event of result ) {
if ( event . type === 'text-delta' ) {
process . stdout . write ( event . text );
}
}
const response = await result . toResponse ();
console . log ( ' \n\n Finish reason:' , response . finishReason );
console . log ( 'Usage:' , response . usage );
Stream Event Types
The stream emits different event types:
type StreamEvent =
| { type : 'reasoning-start' }
| { type : 'reasoning-delta' ; text : string }
| { type : 'reasoning-end' }
| { type : 'text-delta' ; text : string }
| { type : 'tool-call-start' ; toolCallId : string ; toolName : string }
| { type : 'tool-call-delta' ; toolCallId : string ; argumentsDelta : string }
| { type : 'tool-call-end' ; toolCall : ToolCall }
| { type : 'finish' ; finishReason : FinishReason ; usage : ChatUsage };
Handling Different Event Types
Process different event types for rich streaming experiences:
const result = await stream ({ model , messages });
let textBuffer = '' ;
let reasoningBuffer = '' ;
for await ( const event of result ) {
switch ( event . type ) {
case 'reasoning-start' :
console . log ( ' \n [Model is thinking...] \n ' );
break ;
case 'reasoning-delta' :
reasoningBuffer += event . text ;
process . stdout . write ( event . text );
break ;
case 'reasoning-end' :
console . log ( ' \n [Reasoning complete] \n ' );
break ;
case 'text-delta' :
textBuffer += event . text ;
process . stdout . write ( event . text );
break ;
case 'tool-call-start' :
console . log ( ` \n [Calling tool: ${ event . toolName } ]` );
break ;
case 'tool-call-delta' :
// Tool arguments are being streamed
break ;
case 'tool-call-end' :
console . log ( `Tool call: ${ event . toolCall . name } ` );
console . log ( 'Arguments:' , event . toolCall . arguments );
break ;
case 'finish' :
console . log ( ' \n\n Finished:' , event . finishReason );
console . log ( 'Usage:' , event . usage );
break ;
}
}
Getting the Complete Response
Convert the stream to a complete response:
const result = await stream ({ model , messages });
// Process events in real-time
for await ( const event of result ) {
if ( event . type === 'text-delta' ) {
process . stdout . write ( event . text );
}
}
// Get the complete response after streaming
const response = await result . toResponse ();
console . log ( ' \n Complete content:' , response . content );
console . log ( 'Reasoning:' , response . reasoning );
console . log ( 'Finish reason:' , response . finishReason );
console . log ( 'Token usage:' , response . usage );
Call toResponse() only after consuming all stream events. Calling it before the stream completes will wait for all events to finish.
Streaming with Configuration
Apply the same configuration options as generate():
const result = await stream ({
model ,
messages: [{ role: 'user' , content: 'Write a story.' }],
config: {
temperature: 0.8 ,
maxTokens: 1000 ,
stopSequences: [ 'The End' ],
},
});
for await ( const event of result ) {
if ( event . type === 'text-delta' ) {
process . stdout . write ( event . text );
}
}
UI Integration Examples
React
Node.js HTTP
Express
import { useState } from 'react' ;
import { stream } from '@core-ai/core-ai' ;
function StreamingChat () {
const [ content , setContent ] = useState ( '' );
const [ isStreaming , setIsStreaming ] = useState ( false );
const handleStream = async () => {
setIsStreaming ( true );
setContent ( '' );
const result = await stream ({
model ,
messages: [{ role: 'user' , content: 'Hello!' }],
});
for await ( const event of result ) {
if ( event . type === 'text-delta' ) {
setContent (( prev ) => prev + event . text );
}
}
setIsStreaming ( false );
};
return (
< div >
< button onClick = { handleStream } disabled = { isStreaming } >
{ isStreaming ? 'Streaming...' : 'Start Stream' }
</ button >
< div >{ content } </ div >
</ div >
);
}
import { stream } from '@core-ai/core-ai' ;
import { createServer } from 'http' ;
createServer ( async ( req , res ) => {
res . writeHead ( 200 , {
'Content-Type' : 'text/event-stream' ,
'Cache-Control' : 'no-cache' ,
'Connection' : 'keep-alive' ,
});
const result = await stream ({
model ,
messages: [{ role: 'user' , content: 'Hello!' }],
});
for await ( const event of result ) {
if ( event . type === 'text-delta' ) {
res . write ( `data: ${ JSON . stringify ({ text: event . text }) } \n\n ` );
}
}
res . end ();
}). listen ( 3000 );
import express from 'express' ;
import { stream } from '@core-ai/core-ai' ;
const app = express ();
app . get ( '/stream' , async ( req , res ) => {
res . setHeader ( 'Content-Type' , 'text/event-stream' );
res . setHeader ( 'Cache-Control' , 'no-cache' );
res . setHeader ( 'Connection' , 'keep-alive' );
const result = await stream ({
model ,
messages: [{ role: 'user' , content: 'Hello!' }],
});
for await ( const event of result ) {
if ( event . type === 'text-delta' ) {
res . write ( `data: ${ JSON . stringify ( event ) } \n\n ` );
}
}
res . end ();
});
app . listen ( 3000 );
Handle tool calls during streaming:
import { stream , defineTool } from '@core-ai/core-ai' ;
import { z } from 'zod' ;
const weatherTool = defineTool ({
name: 'get_weather' ,
description: 'Get weather information' ,
parameters: z . object ({
location: z . string (),
}),
});
const result = await stream ({
model ,
messages: [{ role: 'user' , content: 'What is the weather in Berlin?' }],
tools: { get_weather: weatherTool },
});
const toolCalls : ToolCall [] = [];
for await ( const event of result ) {
switch ( event . type ) {
case 'text-delta' :
process . stdout . write ( event . text );
break ;
case 'tool-call-start' :
console . log ( ` \n Calling: ${ event . toolName } ` );
break ;
case 'tool-call-end' :
toolCalls . push ( event . toolCall );
break ;
}
}
const response = await result . toResponse ();
if ( response . finishReason === 'tool-calls' ) {
console . log ( 'Tool calls:' , response . toolCalls );
// Handle tool execution here
}
Abort Streaming
Cancel streaming with AbortController:
const controller = new AbortController ();
const result = await stream ({
model ,
messages: [{ role: 'user' , content: 'Write a long story.' }],
signal: controller . signal ,
});
// Cancel after 5 seconds
setTimeout (() => controller . abort (), 5000 );
try {
for await ( const event of result ) {
if ( event . type === 'text-delta' ) {
process . stdout . write ( event . text );
}
}
} catch ( error ) {
if ( error . name === 'AbortError' ) {
console . log ( ' \n Stream cancelled' );
}
}
Error Handling
Handle errors during streaming:
import { LLMError , ProviderError } from '@core-ai/core-ai' ;
try {
const result = await stream ({ model , messages });
for await ( const event of result ) {
if ( event . type === 'text-delta' ) {
process . stdout . write ( event . text );
}
}
const response = await result . toResponse ();
console . log ( ' \n Complete:' , response . finishReason );
} catch ( error ) {
if ( error instanceof ProviderError ) {
console . error ( 'Provider error:' , error . message );
} else if ( error instanceof LLMError ) {
console . error ( 'LLM error:' , error . message );
} else {
console . error ( 'Unknown error:' , error );
}
}
Best Practices
Always consume the entire stream
Make sure to iterate through all events before calling toResponse(): const result = await stream ({ model , messages });
// Good: consume all events first
for await ( const event of result ) {
// Process events
}
const response = await result . toResponse ();
// Bad: calling toResponse() without consuming events
// const response = await result.toResponse(); // This will hang
Accumulate text deltas for complete content: let fullText = '' ;
for await ( const event of result ) {
if ( event . type === 'text-delta' ) {
fullText += event . text ;
updateUI ( fullText ); // Update UI with accumulated text
}
}
Handle reasoning separately
Distinguish between reasoning and response text: let reasoning = '' ;
let response = '' ;
for await ( const event of result ) {
if ( event . type === 'reasoning-delta' ) {
reasoning += event . text ;
// Show in a separate "thinking" UI
} else if ( event . type === 'text-delta' ) {
response += event . text ;
// Show in the main response area
}
}
Next Steps