Pipeline Module
The Pipeline module provides a flexible API for building AI-powered processing pipelines with composable operations. Inspired by orchestration frameworks like Airflow and Dagster, it implements idiomatic Rust patterns for AI workflows.
Overview
This module defines a flexible pipeline API for defining a sequence of operations that may or may not use AI components (e.g.: semantic search, LLMs prompting, etc).
The pipeline API was inspired by general orchestration pipelines such as Airflow, Dagster and Prefect, but implemented with idiomatic Rust patterns and providing some AI-specific ops out-of-the-box along general combinators.
Pipelines are made up of one or more operations, or “ops”, each of which must implement the [Op] trait. The [Op] trait requires the implementation of only one method:
call
, which takes an input and returns an output. The trait provides a wide range of combinators for chaining operations together.One can think of a pipeline as a DAG (Directed Acyclic Graph) where each node is an operation and the edges represent the data flow between operations. When invoking the pipeline on some input,
Core Concepts
Operations (Ops)
Operations are the building blocks of pipelines. Each operation:
- Takes an input
- Performs processing
- Returns an output
- Implements the
Op
trait
use rig::pipeline::{self, Op};
// Simple operation that adds two numbers
let add_op = pipeline::new()
.map(|(x, y)| x + y);
// Operation with async processing
let async_op = pipeline::new()
.then(|x| async move { x * 2 });
Pipeline Structure
Pipelines form a Directed Acyclic Graph (DAG) where:
- Nodes represent operations
- Edges represent data flow between operations
- Input flows from root to leaf nodes
- Output is returned from the final node
Example DAG visualization:
Basic Usage
Sequential Operations
Chain operations that execute one after another:
use rig::pipeline::{self, Op};
let pipeline = pipeline::new()
.map(|(x, y)| x + y) // Add numbers
.map(|z| z * 2) // Double result
.map(|n| n.to_string()); // Convert to string
let result = pipeline.call((5, 3)).await;
assert_eq!(result, "16");
Parallel Operations
Execute multiple operations concurrently using the parallel!
macro:
]
values_and_positions: [
$($acc)*
$current ( $($underscores)* + )
]
munching: []
}
);
// Recursion step: map each value with its "position" (underscore count).
(
// Accumulate a token for each future that has been expanded: "_ _ _".
current_position: [
AI-Specific Operations
RAG Pipeline Example
Build a Retrieval-Augmented Generation pipeline:
use rig::pipeline::{self, Op};
let pipeline = pipeline::new()
// Parallel: Query embedding & document lookup
.chain(parallel!(
passthrough(),
lookup::<_, _, Document>(vector_store, 3)
))
// Format context
.map(|(query, docs)| format!(
"Query: {}\nContext: {}",
query,
docs.join("\n")
))
// Generate response
.prompt(llm_model);
Extraction Pipeline
Extract structured data from text:
use rig::pipeline::{self, Op};
#[derive(Deserialize, JsonSchema)]
struct Sentiment {
score: f64,
label: String,
}
let pipeline = pipeline::new()
.map(|text| format!("Analyze sentiment: {}", text))
.extract::<_, _, Sentiment>(extractor);
Error Handling
The module provides the TryOp
trait for operations that may fail:
/// let result = op.try_batch_call(2, vec![2, 4]).await;
/// assert_eq!(result, Ok(vec![3, 5]));
/// ```
fn try_batch_call<I>(
&self,
n: usize,
input: I,
) -> impl Future<Output = Result<Vec<Self::Output>, Self::Error>> + Send
where
I: IntoIterator<Item = Self::Input> + Send,
I::IntoIter: Send,
Self: Sized,
{
use stream::{StreamExt, TryStreamExt};
async move {
stream::iter(input)
Advanced Features
Custom Operations
Implement the Op
trait for custom operations:
struct CustomOp;
impl Op for CustomOp {
type Input = String;
type Output = Vec<String>;
async fn call(&self, input: Self::Input) -> Self::Output {
input.split_whitespace()
.map(String::from)
.collect()
}
}
Batch Processing
Process multiple inputs concurrently:
let pipeline = pipeline::new()
.map(|text| analyze_sentiment(text));
// Process 5 documents concurrently
let results = pipeline.batch_call(5, documents).await;
Best Practices
- Composability: Design operations to be modular and reusable
- Error Handling: Use
TryOp
for operations that may fail - Resource Management: Consider batch processing for multiple inputs
- Testing: Unit test individual operations before combining
- Documentation: Document expected inputs/outputs for each operation
See Also
- Agent Module - High-level AI agent abstractions
- Vector Store Module - Document storage and retrieval
- Completion Module - LLM interaction primitives
API Reference (Pipeline)