Documentation

Pipelines

Chain agent operations into multi-step workflows with typed data flow

One Call, Many Agents

The Pipeline builder lets you chain multiple agent steps into a single server-side execution. The backend handles data wiring between agents, parallel execution where possible, and returns per-step results — all in one HTTP call.

Pipelines vs Standalone Agents

Use Standalone Agents for single operations (one search, one synthesis). Use Pipelines when you need to chain multiple agents together — the server auto-wires data between steps so you don't have to extract and pass IDs manually.

Quick Start

Search and Organize

Find images and organize them into folders — two agents, one call.

from aion import AionVision
async with AionVision(api_key="aion_...") as client:
result = await (
client.pipeline()
.search_images("damaged utility poles")
.organize("Sort by damage severity")
.run()
)
print(f"Success: {result.success}")
print(f"Steps: {len(result.steps)}")
print(f"Total time: {result.execution_time_ms}ms")
# Access individual step results
search_step = result.step(0)
print(f"Search: {search_step.summary}")
organize_step = result.step(1)
print(f"Organize: {organize_step.summary}")
# Or get the final step directly
print(f"Final: {result.final.summary}")

Linear Pipelines

Steps execute sequentially. Each step receives the outputs of the previous step via typed data flow — no manual ID passing needed.

Search and Synthesize

Search documents, then generate a report from the results.

result = await (
client.pipeline()
.search_documents("safety inspection procedures")
.synthesize("Write a compliance summary")
.run()
)
print(result.final.summary)

Search and Analyze

Search images, then run analysis on the results.

result = await (
client.pipeline()
.search_images("construction site progress photos")
.analyze("Categorize by construction phase")
.run()
)
print(result.step(0).summary) # Search results
print(result.step(1).summary) # Analysis results

Search, Analyze, and Organize

Three-step chain: search, analyze categories, then organize into folders.

result = await (
client.pipeline()
.search_images("equipment photos")
.analyze("Categorize by equipment type and condition")
.organize("Create folders for each category")
.run()
)
for step in result.steps:
print(f"[{step.agent}] {step.status}: {step.summary}")

DAG Pipelines (Parallel Fan-In)

Use depends_on to create parallel branches that feed into a downstream step. Steps with no shared dependencies execute in parallel.

Parallel Searches with Synthesis

Search images and documents in parallel, then synthesize a combined report.

result = await (
client.pipeline()
.search_images("utility poles") # step 0
.search_documents("inspection reports") # step 1
.synthesize( # step 2
"Cross-reference images with reports",
depends_on=[0, 1], # waits for both searches
)
.run()
)
# Steps 0 and 1 run in parallel (same wave)
# Step 2 runs after both complete
print(f"Waves: {result.total_waves}") # 2 waves
print(f"Report: {result.final.summary}")

Cross-Reference Pattern

Search multiple sources and cross-reference the findings.

result = await (
client.pipeline()
.search_images("building facades") # step 0
.search_documents("property records") # step 1
.search_links("building code updates") # step 2
.cross_reference( # step 3
"Match images to property records and code requirements",
depends_on=[0, 1, 2],
)
.run()
)
print(result.final.summary)

How depends_on Works

Each step is referenced by its 0-based index. When you specify depends_on=[0, 1], the step waits for steps 0 and 1 to complete before executing. Steps without shared dependencies are grouped into parallel waves automatically.

Starting from Known IDs

Use with_images() or with_documents() to seed a pipeline with file IDs you already have, skipping the search step.

Organize Known Images

result = await (
client.pipeline()
.with_images(["img-1", "img-2", "img-3", "img-4"])
.organize("Sort by location")
.run()
)
print(result.final.summary)

Analyze Known Documents

result = await (
client.pipeline()
.with_documents(["doc-contract-a", "doc-contract-b"])
.analyze_documents("Compare warranty terms between contracts")
.run()
)
print(result.final.summary)

Mixed Seed Data with Synthesis

result = await (
client.pipeline()
.with_images(["img-site-01", "img-site-02"])
.with_documents(["doc-inspection-report"])
.synthesize("Write a site assessment combining photos and report")
.run()
)
print(result.final.summary)

Available Agents

Each builder method maps to a specific agent on the server.

Builder MethodAgentDescription
.search_images(query)image_searchSearch images by natural language
.search_documents(query)document_searchSemantic search across documents
.search_links(query)link_searchSearch saved web links
.analyze(intent)analysisCategorize and analyze images
.analyze_documents(intent)document_analysisAnalyze and compare documents
.analyze_links(intent)link_analysisAnalyze saved web link content
.synthesize(intent)assistantGenerate reports and summaries
.organize(intent)folderOrganize files into folders
.cross_reference(intent)cross_referenceCross-reference findings across sources

Auto-Wiring

In linear pipelines (no depends_on), the server automatically wires data between steps based on their contract types. For example, an image_search step produces FILE_IDS with content type "images", and an analysis step consumes that same type — the wiring happens automatically.

Working with Results

PipelineResult

The run() method returns a PipelineResult with per-step results, timing, and error information.

result = await (
client.pipeline()
.search_images("damaged equipment")
.organize("Sort by damage type")
.run()
)
# Overall status
result.success # True if all steps succeeded
result.execution_time_ms # Total wall-clock time in milliseconds
result.total_waves # Number of parallel execution waves
result.errors # List of error messages (empty on success)
result.token_usage # {"input": ..., "output": ...} or None
# Access steps
result.steps # list[StepResult] — all steps in order
result.step(0) # StepResult for step 0
result.step(1) # StepResult for step 1
result.final # StepResult for the last step

StepResult

Each step has its own result with status, summary, and outputs.

step = result.step(0)
step.agent # "image_search", "folder", etc.
step.status # "completed", "failed", or "skipped"
step.summary # Human-readable summary of what happened
step.outputs # dict of agent-specific output data
step.error # Error message if status == "failed"
step.execution_time_ms # Time this step took

Error Handling

result = await (
client.pipeline()
.search_images("equipment")
.organize("Sort by type")
.run()
)
if not result.success:
for error in result.errors:
print(f"Error: {error}")
for step in result.steps:
if step.status == "failed":
print(f"Step [{step.agent}] failed: {step.error}")
elif step.status == "skipped":
print(f"Step [{step.agent}] was skipped (upstream failure)")

Limits

  • Steps: 1–10 steps per pipeline
  • Intent: 1–2,000 characters per step
  • Timeout: Each agent has a per-node execution timeout
  • Agents: Valid agent names only (see table above)

Complete Example

Multi-Source Intelligence Report

Search images, documents, and links in parallel, then synthesize a report.

import asyncio
from aion import AionVision
async def main():
async with AionVision.from_env() as client:
result = await (
client.pipeline()
.search_images("site inspection photos") # step 0
.search_documents("inspection reports 2024") # step 1
.search_links("building code amendments") # step 2
.synthesize( # step 3
"Write a comprehensive site assessment report "
"combining photos, inspection findings, and "
"relevant code requirements",
depends_on=[0, 1, 2],
)
.run()
)
if result.success:
# Print each step's summary
for i, step in enumerate(result.steps):
print(f"Step {i} [{step.agent}]: {step.summary}")
print(f"\nTotal time: {result.execution_time_ms}ms")
print(f"Execution waves: {result.total_waves}")
if result.token_usage:
print(f"Tokens: {result.token_usage}")
else:
print(f"Pipeline failed: {result.errors}")
asyncio.run(main())