Documentation
Pipelines
Chain agent operations into multi-step workflows with typed data flow
Pipelines vs Standalone Agents
Use Standalone Agents for single operations (one search, one synthesis). Use Pipelines when you need to chain multiple agents together — downstream steps read upstream outputs from the shared per-pipeline state via their typed input contracts, so you don't have to extract and pass IDs manually.
Quick Start
Search and Synthesize
Find images and generate a report — two agents, one call.
from scopix import Scopix
async with Scopix(api_key="scopix_...") as client: result = await ( client.pipeline() .search_images("damaged utility poles") .synthesize("Summarize damage findings") .run() )
print(f"Success: {result.success}") print(f"Steps: {len(result.steps)}") print(f"Total time: {result.execution_time_ms}ms")
# Access individual step results search_step = result.step(0) print(f"Search: {search_step.summary}")
synthesis_step = result.step(1) print(f"Report: {synthesis_step.summary}")
# Or get the final step directly print(f"Final: {result.final.summary}")Linear Pipelines
Steps execute sequentially. Each step receives the outputs of the previous step via typed data flow — no manual ID passing needed.
Search and Synthesize
Search documents, then generate a report from the results.
result = await ( client.pipeline() .search_documents("safety inspection procedures") .synthesize("Write a compliance summary") .run())
print(result.final.summary)Search and Analyze
Search images, then run analysis on the results.
result = await ( client.pipeline() .search_images("construction site progress photos") .analyze("Categorize by construction phase") .run())
print(result.step(0).summary) # Search resultsprint(result.step(1).summary) # Analysis resultsDAG Pipelines (Parallel Fan-In)
Use depends_on to create parallel branches that feed into a downstream step. Steps with no shared dependencies execute in parallel.
Parallel Searches with Synthesis
Search images and documents in parallel, then synthesize a combined report.
result = await ( client.pipeline() .search_images("utility poles") # step 0 .search_documents("inspection reports") # step 1 .synthesize( # step 2 "Cross-reference images with reports", depends_on=[0, 1], # waits for both searches ) .run())
# Steps 0 and 1 run in parallel (same wave)# Step 2 runs after both completeprint(f"Report: {result.final.summary}")Cross-Reference Pattern
Search multiple sources and cross-reference the findings.
result = await ( client.pipeline() .search_images("building facades") # step 0 .search_documents("property records") # step 1 .search_links("building code updates") # step 2 .cross_reference( # step 3 "Match images to property records and code requirements", depends_on=[0, 1, 2], ) .run())
print(result.final.summary)How depends_on Works
Each step is referenced by its 0-based index, and only earlier steps may be referenced (forward references are rejected at validation time). When you specify depends_on=[0, 1], the step waits for steps 0 and 1 to complete before executing. Steps without shared dependencies are grouped into parallel waves automatically by Kahn's algorithm.
Same-type fan-in is rejected
Two parallel parents that produce the same content type (e.g. two search_images steps) feeding one downstream step are rejected at validation time, because the shared per-pipeline state uses a single slot per content type and would last-write-wins. The DAG example above is fine because each search produces a different content type (images, documents, links). For multi-query search of the same type, do it in one search step or run separate pipelines and combine the results client-side with FileCollection.merge().
Starting from Known IDs
Use with_images() or with_documents() to seed a pipeline with file IDs you already have, skipping the search step.
Analyze Known Documents
result = await ( client.pipeline() .with_documents(["doc-contract-a", "doc-contract-b"]) .analyze_documents("Compare warranty terms between contracts") .run())
print(result.final.summary)Mixed Seed Data with Synthesis
result = await ( client.pipeline() .with_images(["img-site-01", "img-site-02"]) .with_documents(["doc-inspection-report"]) .synthesize("Write a site assessment combining photos and report") .run())
print(result.final.summary)Available Agents
Each builder method maps to a specific agent on the server.
| Builder Method | Agent | Description |
|---|---|---|
| .search_images(query) | image_search | Search images by natural language |
| .search_documents(query) | document_search | Semantic search across documents |
| .search_links(query) | link_search | Search saved web links |
| .analyze(intent) | analysis | Categorize and analyze images |
| .analyze_documents(intent) | document_analysis | Analyze and compare documents |
| .analyze_links(intent) | link_analysis | Analyze saved web link content |
| .synthesize(intent) | assistant | Generate reports and summaries |
| .cross_reference(intent) | cross_reference | Cross-reference findings across sources |
How data flows between steps
Each agent declares typed inputs and outputs in its AgentContract with a content_type_hint like "images" or "documents". On the server, an image_search step writes its results into the shared per-pipeline state under current_image_ids; a downstream analysis step declares it consumes FILE_IDS with hint "images", so it reads current_image_ids automatically. Different content types use different slots — that's why parallel branches like search_images + search_documents + search_links can fan into a single synthesis step without colliding, and why two parallel search_images steps cannot.
Working with Results
PipelineResult
The run() method returns a PipelineResult with per-step results, timing, and error information.
result = await ( client.pipeline() .search_images("damaged equipment") .synthesize("Summarize damage findings") .run())
# Overall statusresult.success # True if all steps succeededresult.execution_time_ms # Total wall-clock time in millisecondsresult.errors # List of error messages (empty on success)result.token_usage # {"input": ..., "output": ...} or None
# Access stepsresult.steps # list[StepResult] — all steps in orderresult.step(0) # StepResult for step 0result.step(1) # StepResult for step 1result.final # StepResult for the last stepStepResult
Each step has its own result with status, summary, and outputs.
step = result.step(0)
step.agent # "image_search", "assistant", etc.step.status # "completed", "failed", or "skipped"step.summary # Human-readable summary of what happenedstep.outputs # dict of agent-specific output datastep.error # Error message if status == "failed"step.execution_time_ms # Time this step tookError Handling
result = await ( client.pipeline() .search_images("equipment") .analyze("Categorize by equipment type") .run())
if not result.success: for error in result.errors: print(f"Error: {error}") for step in result.steps: if step.status == "failed": print(f"Step [{step.agent}] failed: {step.error}") elif step.status == "skipped": print(f"Step [{step.agent}] was skipped (upstream failure)")Limits
- Steps: 1–10 steps per pipeline
- Intent: 1–2,000 characters per step
- Timeout: Each agent has a per-node execution timeout
- Agents: Valid agent names only (see table above)
Complete Example
Multi-Source Intelligence Report
Search images, documents, and links in parallel, then synthesize a report.
import asynciofrom scopix import Scopix
async def main(): async with Scopix.from_env() as client: result = await ( client.pipeline() .search_images("site inspection photos") # step 0 .search_documents("inspection reports 2024") # step 1 .search_links("building code amendments") # step 2 .synthesize( # step 3 "Write a comprehensive site assessment report " "combining photos, inspection findings, and " "relevant code requirements", depends_on=[0, 1, 2], ) .run() )
if result.success: # Print each step's summary for i, step in enumerate(result.steps): print(f"Step {i} [{step.agent}]: {step.summary}")
print(f"\nTotal time: {result.execution_time_ms}ms")
if result.token_usage: print(f"Tokens: {result.token_usage}") else: print(f"Pipeline failed: {result.errors}")
asyncio.run(main())
