Streaming in Rig

Rig provides full support for streaming completions, allowing you to process LLM responses incrementally as they are generated rather than waiting for the entire response. This is essential for building responsive user interfaces and handling long-form content.

Core Traits

The streaming system mirrors the non-streaming completion traits:

Non-StreamingStreamingDescription
PromptStreamingPromptOne-shot streaming prompt
ChatStreamingChatStreaming chat with history
CompletionStreamingCompletionLow-level streaming completion interface

All of these traits live in the rig::streaming module.

StreamingPrompt

The simplest streaming interface — sends a prompt and returns a stream of chunks:

use rig::streaming::StreamingPrompt;
use futures::StreamExt;
 
let agent = openai.agent("gpt-4o")
    .preamble("You are a helpful assistant.")
    .build();
 
let mut stream = agent.stream_prompt("Tell me a story").await?;
 
while let Some(chunk) = stream.next().await {
    match chunk? {
        StreamedAssistantContent::Text(text) => print!("{text}"),
        StreamedAssistantContent::ToolCallDelta(delta) => { /* handle tool call deltas */ }
        StreamedAssistantContent::FinalUsage(usage) => { /* handle final usage stats */ }
    }
}

StreamingChat

Streaming with conversation history:

use rig::streaming::StreamingChat;
 
let mut stream = agent.stream_chat("Continue the story", chat_history).await?;

StreamingCompletion

Low-level streaming interface with full request customization:

use rig::streaming::StreamingCompletion;
 
let builder = agent.stream_completion("prompt", chat_history).await?;
let response = builder
    .temperature(0.9)
    .send()
    .await?;

Response Types

StreamedAssistantContent

Each chunk from the stream is a StreamedAssistantContent:

pub enum StreamedAssistantContent {
    /// A text delta (partial text content)
    Text(String),
    /// A tool call delta (partial tool call data)
    ToolCallDelta(ToolCallDeltaContent),
    /// Final usage statistics sent at the end of the stream
    FinalUsage(Usage),
}

ToolCallDeltaContent

Tool calls are streamed as deltas:

pub enum ToolCallDeltaContent {
    /// The name of the tool being called
    Name(String),
    /// Partial argument data
    Arguments(String),
}

StreamingCompletionResponse

The full streaming response wraps the inner stream and provides access to the final message:

pub struct StreamingCompletionResponse {
    /// The inner stream of chunks
    pub inner: Pin<Box<dyn Stream<Item = Result<RawStreamingChoice, CompletionError>> + Send>>,
    /// Populated after the stream completes: the full message
    pub message: Option<String>,
    /// Populated after the stream completes: the raw response
    pub response: Option<serde_json::Value>,
}

Multi-Turn Streaming

When using agents with tools, multi-turn streaming produces MultiTurnStreamItem events:

pub enum MultiTurnStreamItem {
    /// Streamed user content (e.g., tool results sent back to the model)
    UserContent(StreamedUserContent),
    /// Streamed assistant content (text, tool call deltas, etc.)
    AssistantContent(StreamedAssistantContent),
}

This allows you to observe the full agent loop in real-time, including tool calls and their results.

Streaming to stdout

Rig provides a convenience function for the common case of printing a stream to the terminal:

use rig::streaming::stream_to_stdout;
 
let stream = agent.stream_prompt("Hello!").await?;
stream_to_stdout(stream).await?;

The stream_to_stdout function prints text chunks as they arrive and ignores tool call deltas (since those are typically not meaningful to display directly).

Pause Control

The PauseControl struct allows you to pause and resume a streaming response:

use rig::streaming::PauseControl;
 
let pause = PauseControl::new();
let pause_clone = pause.clone();
 
// In another task:
pause_clone.pause();
// ...
pause_clone.resume();

This is useful for implementing user-controlled streaming in interactive applications.

Example: Streaming Agent

use rig::providers::openai;
use rig::streaming::StreamingPrompt;
use futures::StreamExt;
 
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
    let openai = openai::Client::from_env();
 
    let agent = openai.agent("gpt-5.2")
        .preamble("You are a storyteller.")
        .temperature(0.9)
        .build();
 
    let mut stream = agent.stream_prompt("Tell me a short story about a robot.").await?;
 
    while let Some(chunk) = stream.next().await {
        match chunk? {
            StreamedAssistantContent::Text(text) => {
                print!("{text}");
            }
            _ => {} // Handle other content types as needed
        }
    }
    println!();
 
    Ok(())
}

Best Practices

  1. Error Handling: Each chunk in the stream can fail independently. Always handle errors per-chunk rather than expecting the entire stream to succeed or fail atomically.

  2. Buffering: For tool calls, buffer the deltas until the complete tool call is received before executing the tool.

  3. Backpressure: Use PauseControl or standard stream backpressure mechanisms when the consumer cannot keep up with the producer.

  4. Token Usage: The final FinalUsage event (when available) provides token counts for the entire completion, not per-chunk.

See Also

  • Completion — Non-streaming completion traits
  • Agents — Agent system with streaming support

API Reference (Streaming)