Documentation

Pipelines

Chain agent operations into multi-step workflows with typed data flow

Pipelines vs Standalone Agents

Use Standalone Agents for single operations (one search, one synthesis). Use Pipelines when you need to chain multiple agents together — downstream steps read upstream outputs from the shared per-pipeline state via their typed input contracts, so you don't have to extract and pass IDs manually.

Quick Start

Search and Synthesize

Find images and generate a report — two agents, one call.

python
from scopix import Scopix
async with Scopix(api_key="scopix_...") as client:
result = await (
client.pipeline()
.search_images("damaged utility poles")
.synthesize("Summarize damage findings")
.run()
)
print(f"Success: {result.success}")
print(f"Steps: {len(result.steps)}")
print(f"Total time: {result.execution_time_ms}ms")
# Access individual step results
search_step = result.step(0)
print(f"Search: {search_step.summary}")
synthesis_step = result.step(1)
print(f"Report: {synthesis_step.summary}")
# Or get the final step directly
print(f"Final: {result.final.summary}")

Linear Pipelines

Steps execute sequentially. Each step receives the outputs of the previous step via typed data flow — no manual ID passing needed.

Search and Synthesize

Search documents, then generate a report from the results.

python
result = await (
client.pipeline()
.search_documents("safety inspection procedures")
.synthesize("Write a compliance summary")
.run()
)
print(result.final.summary)

Search and Analyze

Search images, then run analysis on the results.

python
result = await (
client.pipeline()
.search_images("construction site progress photos")
.analyze("Categorize by construction phase")
.run()
)
print(result.step(0).summary) # Search results
print(result.step(1).summary) # Analysis results

DAG Pipelines (Parallel Fan-In)

Use depends_on to create parallel branches that feed into a downstream step. Steps with no shared dependencies execute in parallel.

Parallel Searches with Synthesis

Search images and documents in parallel, then synthesize a combined report.

python
result = await (
client.pipeline()
.search_images("utility poles") # step 0
.search_documents("inspection reports") # step 1
.synthesize( # step 2
"Cross-reference images with reports",
depends_on=[0, 1], # waits for both searches
)
.run()
)
# Steps 0 and 1 run in parallel (same wave)
# Step 2 runs after both complete
print(f"Report: {result.final.summary}")

Cross-Reference Pattern

Search multiple sources and cross-reference the findings.

python
result = await (
client.pipeline()
.search_images("building facades") # step 0
.search_documents("property records") # step 1
.search_links("building code updates") # step 2
.cross_reference( # step 3
"Match images to property records and code requirements",
depends_on=[0, 1, 2],
)
.run()
)
print(result.final.summary)

How depends_on Works

Each step is referenced by its 0-based index, and only earlier steps may be referenced (forward references are rejected at validation time). When you specify depends_on=[0, 1], the step waits for steps 0 and 1 to complete before executing. Steps without shared dependencies are grouped into parallel waves automatically by Kahn's algorithm.

Same-type fan-in is rejected

Two parallel parents that produce the same content type (e.g. two search_images steps) feeding one downstream step are rejected at validation time, because the shared per-pipeline state uses a single slot per content type and would last-write-wins. The DAG example above is fine because each search produces a different content type (images, documents, links). For multi-query search of the same type, do it in one search step or run separate pipelines and combine the results client-side with FileCollection.merge().

Starting from Known IDs

Use with_images() or with_documents() to seed a pipeline with file IDs you already have, skipping the search step.

Analyze Known Documents

python
result = await (
client.pipeline()
.with_documents(["doc-contract-a", "doc-contract-b"])
.analyze_documents("Compare warranty terms between contracts")
.run()
)
print(result.final.summary)

Mixed Seed Data with Synthesis

python
result = await (
client.pipeline()
.with_images(["img-site-01", "img-site-02"])
.with_documents(["doc-inspection-report"])
.synthesize("Write a site assessment combining photos and report")
.run()
)
print(result.final.summary)

Available Agents

Each builder method maps to a specific agent on the server.

Builder MethodAgentDescription
.search_images(query)image_searchSearch images by natural language
.search_documents(query)document_searchSemantic search across documents
.search_links(query)link_searchSearch saved web links
.analyze(intent)analysisCategorize and analyze images
.analyze_documents(intent)document_analysisAnalyze and compare documents
.analyze_links(intent)link_analysisAnalyze saved web link content
.synthesize(intent)assistantGenerate reports and summaries
.cross_reference(intent)cross_referenceCross-reference findings across sources

How data flows between steps

Each agent declares typed inputs and outputs in its AgentContract with a content_type_hint like "images" or "documents". On the server, an image_search step writes its results into the shared per-pipeline state under current_image_ids; a downstream analysis step declares it consumes FILE_IDS with hint "images", so it reads current_image_ids automatically. Different content types use different slots — that's why parallel branches like search_images + search_documents + search_links can fan into a single synthesis step without colliding, and why two parallel search_images steps cannot.

Working with Results

PipelineResult

The run() method returns a PipelineResult with per-step results, timing, and error information.

python
result = await (
client.pipeline()
.search_images("damaged equipment")
.synthesize("Summarize damage findings")
.run()
)
# Overall status
result.success # True if all steps succeeded
result.execution_time_ms # Total wall-clock time in milliseconds
result.errors # List of error messages (empty on success)
result.token_usage # {"input": ..., "output": ...} or None
# Access steps
result.steps # list[StepResult] — all steps in order
result.step(0) # StepResult for step 0
result.step(1) # StepResult for step 1
result.final # StepResult for the last step

StepResult

Each step has its own result with status, summary, and outputs.

python
step = result.step(0)
step.agent # "image_search", "assistant", etc.
step.status # "completed", "failed", or "skipped"
step.summary # Human-readable summary of what happened
step.outputs # dict of agent-specific output data
step.error # Error message if status == "failed"
step.execution_time_ms # Time this step took

Error Handling

python
result = await (
client.pipeline()
.search_images("equipment")
.analyze("Categorize by equipment type")
.run()
)
if not result.success:
for error in result.errors:
print(f"Error: {error}")
for step in result.steps:
if step.status == "failed":
print(f"Step [{step.agent}] failed: {step.error}")
elif step.status == "skipped":
print(f"Step [{step.agent}] was skipped (upstream failure)")

Limits

  • Steps: 1–10 steps per pipeline
  • Intent: 1–2,000 characters per step
  • Timeout: Each agent has a per-node execution timeout
  • Agents: Valid agent names only (see table above)

Complete Example

Multi-Source Intelligence Report

Search images, documents, and links in parallel, then synthesize a report.

python
import asyncio
from scopix import Scopix
async def main():
async with Scopix.from_env() as client:
result = await (
client.pipeline()
.search_images("site inspection photos") # step 0
.search_documents("inspection reports 2024") # step 1
.search_links("building code amendments") # step 2
.synthesize( # step 3
"Write a comprehensive site assessment report "
"combining photos, inspection findings, and "
"relevant code requirements",
depends_on=[0, 1, 2],
)
.run()
)
if result.success:
# Print each step's summary
for i, step in enumerate(result.steps):
print(f"Step {i} [{step.agent}]: {step.summary}")
print(f"\nTotal time: {result.execution_time_ms}ms")
if result.token_usage:
print(f"Tokens: {result.token_usage}")
else:
print(f"Pipeline failed: {result.errors}")
asyncio.run(main())