Documentation
Pipelines
Chain agent operations into multi-step workflows with typed data flow
One Call, Many Agents
The Pipeline builder lets you chain multiple agent steps into a single server-side execution. The backend handles data wiring between agents, parallel execution where possible, and returns per-step results — all in one HTTP call.
Pipelines vs Standalone Agents
Use Standalone Agents for single operations (one search, one synthesis). Use Pipelines when you need to chain multiple agents together — the server auto-wires data between steps so you don't have to extract and pass IDs manually.
Quick Start
Search and Organize
Find images and organize them into folders — two agents, one call.
from aion import AionVision
async with AionVision(api_key="aion_...") as client: result = await ( client.pipeline() .search_images("damaged utility poles") .organize("Sort by damage severity") .run() )
print(f"Success: {result.success}") print(f"Steps: {len(result.steps)}") print(f"Total time: {result.execution_time_ms}ms")
# Access individual step results search_step = result.step(0) print(f"Search: {search_step.summary}")
organize_step = result.step(1) print(f"Organize: {organize_step.summary}")
# Or get the final step directly print(f"Final: {result.final.summary}")Linear Pipelines
Steps execute sequentially. Each step receives the outputs of the previous step via typed data flow — no manual ID passing needed.
Search and Synthesize
Search documents, then generate a report from the results.
result = await ( client.pipeline() .search_documents("safety inspection procedures") .synthesize("Write a compliance summary") .run())
print(result.final.summary)Search and Analyze
Search images, then run analysis on the results.
result = await ( client.pipeline() .search_images("construction site progress photos") .analyze("Categorize by construction phase") .run())
print(result.step(0).summary) # Search resultsprint(result.step(1).summary) # Analysis resultsSearch, Analyze, and Organize
Three-step chain: search, analyze categories, then organize into folders.
result = await ( client.pipeline() .search_images("equipment photos") .analyze("Categorize by equipment type and condition") .organize("Create folders for each category") .run())
for step in result.steps: print(f"[{step.agent}] {step.status}: {step.summary}")DAG Pipelines (Parallel Fan-In)
Use depends_on to create parallel branches that feed into a downstream step. Steps with no shared dependencies execute in parallel.
Parallel Searches with Synthesis
Search images and documents in parallel, then synthesize a combined report.
result = await ( client.pipeline() .search_images("utility poles") # step 0 .search_documents("inspection reports") # step 1 .synthesize( # step 2 "Cross-reference images with reports", depends_on=[0, 1], # waits for both searches ) .run())
# Steps 0 and 1 run in parallel (same wave)# Step 2 runs after both completeprint(f"Waves: {result.total_waves}") # 2 wavesprint(f"Report: {result.final.summary}")Cross-Reference Pattern
Search multiple sources and cross-reference the findings.
result = await ( client.pipeline() .search_images("building facades") # step 0 .search_documents("property records") # step 1 .search_links("building code updates") # step 2 .cross_reference( # step 3 "Match images to property records and code requirements", depends_on=[0, 1, 2], ) .run())
print(result.final.summary)How depends_on Works
Each step is referenced by its 0-based index. When you specify depends_on=[0, 1], the step waits for steps 0 and 1 to complete before executing. Steps without shared dependencies are grouped into parallel waves automatically.
Starting from Known IDs
Use with_images() or with_documents() to seed a pipeline with file IDs you already have, skipping the search step.
Organize Known Images
result = await ( client.pipeline() .with_images(["img-1", "img-2", "img-3", "img-4"]) .organize("Sort by location") .run())
print(result.final.summary)Analyze Known Documents
result = await ( client.pipeline() .with_documents(["doc-contract-a", "doc-contract-b"]) .analyze_documents("Compare warranty terms between contracts") .run())
print(result.final.summary)Mixed Seed Data with Synthesis
result = await ( client.pipeline() .with_images(["img-site-01", "img-site-02"]) .with_documents(["doc-inspection-report"]) .synthesize("Write a site assessment combining photos and report") .run())
print(result.final.summary)Available Agents
Each builder method maps to a specific agent on the server.
| Builder Method | Agent | Description |
|---|---|---|
| .search_images(query) | image_search | Search images by natural language |
| .search_documents(query) | document_search | Semantic search across documents |
| .search_links(query) | link_search | Search saved web links |
| .analyze(intent) | analysis | Categorize and analyze images |
| .analyze_documents(intent) | document_analysis | Analyze and compare documents |
| .analyze_links(intent) | link_analysis | Analyze saved web link content |
| .synthesize(intent) | assistant | Generate reports and summaries |
| .organize(intent) | folder | Organize files into folders |
| .cross_reference(intent) | cross_reference | Cross-reference findings across sources |
Auto-Wiring
In linear pipelines (no depends_on), the server automatically wires data between steps based on their contract types. For example, an image_search step produces FILE_IDS with content type "images", and an analysis step consumes that same type — the wiring happens automatically.
Working with Results
PipelineResult
The run() method returns a PipelineResult with per-step results, timing, and error information.
result = await ( client.pipeline() .search_images("damaged equipment") .organize("Sort by damage type") .run())
# Overall statusresult.success # True if all steps succeededresult.execution_time_ms # Total wall-clock time in millisecondsresult.total_waves # Number of parallel execution wavesresult.errors # List of error messages (empty on success)result.token_usage # {"input": ..., "output": ...} or None
# Access stepsresult.steps # list[StepResult] — all steps in orderresult.step(0) # StepResult for step 0result.step(1) # StepResult for step 1result.final # StepResult for the last stepStepResult
Each step has its own result with status, summary, and outputs.
step = result.step(0)
step.agent # "image_search", "folder", etc.step.status # "completed", "failed", or "skipped"step.summary # Human-readable summary of what happenedstep.outputs # dict of agent-specific output datastep.error # Error message if status == "failed"step.execution_time_ms # Time this step tookError Handling
result = await ( client.pipeline() .search_images("equipment") .organize("Sort by type") .run())
if not result.success: for error in result.errors: print(f"Error: {error}") for step in result.steps: if step.status == "failed": print(f"Step [{step.agent}] failed: {step.error}") elif step.status == "skipped": print(f"Step [{step.agent}] was skipped (upstream failure)")Limits
- Steps: 1–10 steps per pipeline
- Intent: 1–2,000 characters per step
- Timeout: Each agent has a per-node execution timeout
- Agents: Valid agent names only (see table above)
Complete Example
Multi-Source Intelligence Report
Search images, documents, and links in parallel, then synthesize a report.
import asynciofrom aion import AionVision
async def main(): async with AionVision.from_env() as client: result = await ( client.pipeline() .search_images("site inspection photos") # step 0 .search_documents("inspection reports 2024") # step 1 .search_links("building code amendments") # step 2 .synthesize( # step 3 "Write a comprehensive site assessment report " "combining photos, inspection findings, and " "relevant code requirements", depends_on=[0, 1, 2], ) .run() )
if result.success: # Print each step's summary for i, step in enumerate(result.steps): print(f"Step {i} [{step.agent}]: {step.summary}")
print(f"\nTotal time: {result.execution_time_ms}ms") print(f"Execution waves: {result.total_waves}")
if result.token_usage: print(f"Tokens: {result.token_usage}") else: print(f"Pipeline failed: {result.errors}")
asyncio.run(main())