Streaming in Rig
Rig provides full support for streaming completions, allowing you to process LLM responses incrementally as they are generated rather than waiting for the entire response. This is essential for building responsive user interfaces and handling long-form content.
Core Traits
The streaming system mirrors the non-streaming completion traits:
| Non-Streaming | Streaming | Description |
|---|---|---|
Prompt | StreamingPrompt | One-shot streaming prompt |
Chat | StreamingChat | Streaming chat with history |
Completion | StreamingCompletion | Low-level streaming completion interface |
All of these traits live in the rig::streaming module.
StreamingPrompt
The simplest streaming interface — sends a prompt and returns a stream of chunks:
use rig::streaming::StreamingPrompt;
use futures::StreamExt;
let agent = openai.agent("gpt-4o")
.preamble("You are a helpful assistant.")
.build();
let mut stream = agent.stream_prompt("Tell me a story").await?;
while let Some(chunk) = stream.next().await {
match chunk? {
StreamedAssistantContent::Text(text) => print!("{text}"),
StreamedAssistantContent::ToolCallDelta(delta) => { /* handle tool call deltas */ }
StreamedAssistantContent::FinalUsage(usage) => { /* handle final usage stats */ }
}
}StreamingChat
Streaming with conversation history:
use rig::streaming::StreamingChat;
let mut stream = agent.stream_chat("Continue the story", chat_history).await?;StreamingCompletion
Low-level streaming interface with full request customization:
use rig::streaming::StreamingCompletion;
let builder = agent.stream_completion("prompt", chat_history).await?;
let response = builder
.temperature(0.9)
.send()
.await?;Response Types
StreamedAssistantContent
Each chunk from the stream is a StreamedAssistantContent:
pub enum StreamedAssistantContent {
/// A text delta (partial text content)
Text(String),
/// A tool call delta (partial tool call data)
ToolCallDelta(ToolCallDeltaContent),
/// Final usage statistics sent at the end of the stream
FinalUsage(Usage),
}ToolCallDeltaContent
Tool calls are streamed as deltas:
pub enum ToolCallDeltaContent {
/// The name of the tool being called
Name(String),
/// Partial argument data
Arguments(String),
}StreamingCompletionResponse
The full streaming response wraps the inner stream and provides access to the final message:
pub struct StreamingCompletionResponse {
/// The inner stream of chunks
pub inner: Pin<Box<dyn Stream<Item = Result<RawStreamingChoice, CompletionError>> + Send>>,
/// Populated after the stream completes: the full message
pub message: Option<String>,
/// Populated after the stream completes: the raw response
pub response: Option<serde_json::Value>,
}Multi-Turn Streaming
When using agents with tools, multi-turn streaming produces MultiTurnStreamItem events:
pub enum MultiTurnStreamItem {
/// Streamed user content (e.g., tool results sent back to the model)
UserContent(StreamedUserContent),
/// Streamed assistant content (text, tool call deltas, etc.)
AssistantContent(StreamedAssistantContent),
}This allows you to observe the full agent loop in real-time, including tool calls and their results.
Streaming to stdout
Rig provides a convenience function for the common case of printing a stream to the terminal:
use rig::streaming::stream_to_stdout;
let stream = agent.stream_prompt("Hello!").await?;
stream_to_stdout(stream).await?;The stream_to_stdout function prints text chunks as they arrive and ignores tool call deltas (since those are typically not meaningful to display directly).
Pause Control
The PauseControl struct allows you to pause and resume a streaming response:
use rig::streaming::PauseControl;
let pause = PauseControl::new();
let pause_clone = pause.clone();
// In another task:
pause_clone.pause();
// ...
pause_clone.resume();This is useful for implementing user-controlled streaming in interactive applications.
Example: Streaming Agent
use rig::providers::openai;
use rig::streaming::StreamingPrompt;
use futures::StreamExt;
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let openai = openai::Client::from_env();
let agent = openai.agent("gpt-5.2")
.preamble("You are a storyteller.")
.temperature(0.9)
.build();
let mut stream = agent.stream_prompt("Tell me a short story about a robot.").await?;
while let Some(chunk) = stream.next().await {
match chunk? {
StreamedAssistantContent::Text(text) => {
print!("{text}");
}
_ => {} // Handle other content types as needed
}
}
println!();
Ok(())
}Best Practices
-
Error Handling: Each chunk in the stream can fail independently. Always handle errors per-chunk rather than expecting the entire stream to succeed or fail atomically.
-
Buffering: For tool calls, buffer the deltas until the complete tool call is received before executing the tool.
-
Backpressure: Use
PauseControlor standard stream backpressure mechanisms when the consumer cannot keep up with the producer. -
Token Usage: The final
FinalUsageevent (when available) provides token counts for the entire completion, not per-chunk.
See Also
- Completion — Non-streaming completion traits
- Agents — Agent system with streaming support
API Reference (Streaming)