Documentation
Agent Data Flow
How agents pass files, results, and metadata between each other
Architecture Overview
Every agent declares what data it consumes and produces via an AgentContract. Both server-side execution paths (the chat loop and the Pipeline API) use these contracts to populate each agent's inputs from a shared per-request state and route its outputs back into the same state. Agents pass lightweight file references (IDs) and resolve actual content on demand via a FileResolver.
The Pipeline builder gives you a fluent API to declare a multi-step run; the server schedules steps into topological waves and threads data between them through that shared state. This page covers the underlying SDK primitives (AgentContract, ExecutionContext, FileCollection, FileResolver) for building custom client-side orchestration around the same data shapes.
Building Your Own Agent
The Agent Protocol
To create an agent, implement two methods: get_contract() and execute(). No inheritance required — the SDK uses structural typing via a Protocol.
from scopix.types.contracts import ( AgentCapability, AgentContract, TypedInput, TypedOutput,)from scopix.types.agent import AgentResultfrom scopix.types.context import ExecutionContextfrom scopix.types.file_collection import FileCollection
class MySearchAgent: def get_contract(self) -> AgentContract: return AgentContract( name="MySearchAgent", capability=AgentCapability.SEARCH, description="Searches images by description", inputs=( TypedInput(name="query", data_type="TEXT", description="Search query"), ), outputs=( TypedOutput(name="results", data_type="FILE_IDS", mergeable=True), ), can_chain_with=(AgentCapability.ANALYSIS, AgentCapability.ORGANIZATION), )
async def execute(self, context: ExecutionContext) -> AgentResult: query = context.read("query")
# ... perform search logic ... found_ids = ["img-1", "img-2", "img-3"]
results = FileCollection.from_ids(found_ids, content_type="images").to_dict()
# Write outputs to context for downstream agents context.write("results", results, "FILE_IDS", "MySearchAgent")
return AgentResult( success=True, agent_name="MySearchAgent", capability="search", summary=f"Found {len(found_ids)} images for '{query}'", )Chaining Agents with ExecutionContext
The ExecutionContext is the shared state container. Agent A writes results to a slot, Agent B reads from that slot. Each slot is typed and tracks provenance.
from scopix.types.context import ExecutionContext
# Create shared contextctx = ExecutionContext()
# Agent A writes search resultsctx.write("search_results", {"ids": ["img-1", "img-2"]}, "FILE_IDS", "search_agent")
# Agent B reads them and writes categorizationfile_data = ctx.read("search_results", expected_type="FILE_IDS")ctx.write("categorization", { "categories": [{"name": "Poles", "file_ids": ["img-1", "img-2"]}], "total_files": 2,}, "CATEGORIZATION", "categorize_agent")
# Agent C reads both slotscat = ctx.read("categorization", expected_type="CATEGORIZATION")files = ctx.read("search_results")
# Inspect context statectx.list_slots()# [{"name": "search_results", "type": "FILE_IDS", "source": "search_agent"},# {"name": "categorization", "type": "CATEGORIZATION", "source": "categorize_agent"}]
# Serialize/restore for checkpointingcheckpoint = ctx.to_dict()restored = ExecutionContext.from_dict(checkpoint)Data Type Registry
The DataTypeRegistry tracks which data types exist and how to merge them. Use get_default_registry() for a pre-loaded registry, or build your own.
from scopix.types.data_types import ( DataTypeDefinition, DataTypeRegistry, get_default_registry,)
# Pre-loaded with built-in typesregistry = get_default_registry()registry.list_types()# ["FILE_IDS", "CATEGORIZATION", "FOLDER_RESULT", "ANALYSIS_RESULT", "TEXT", "CROSS_REF"]
# FILE_IDS and CATEGORIZATION support mergingregistry.can_merge("FILE_IDS") # True — uses FileCollection.merge()registry.can_merge("TEXT") # False
# Register your own typesregistry.register( DataTypeDefinition(name="EMBEDDINGS", description="Vector embeddings"), merge_fn=lambda *arrs: [v for arr in arrs for v in arr], # Concatenate)Implementing a FileResolver
Implement the FileResolver Protocol to connect your own storage backend. Agents use it to resolve FileRef objects to actual content bytes.
from scopix.types.file_collection import FileReffrom scopix.types.file_resolver import FileResolver, FileResolutionErrorimport aiofiles
class LocalFileResolver: def __init__(self, base_path: str): self.base_path = base_path
async def resolve(self, ref, hints=None): path = f"{self.base_path}/{ref.id}" try: async with aiofiles.open(path, "rb") as f: return await f.read() except FileNotFoundError: raise FileResolutionError(f"File not found: {ref.id}")
async def resolve_batch(self, refs, batch_size=100): for ref in refs: yield ref, await self.resolve(ref)
async def resolve_metadata(self, ref): return ref # Could enrich with file stats
async def store(self, content, media_type, metadata=None): import uuid file_id = str(uuid.uuid4()) path = f"{self.base_path}/{file_id}" async with aiofiles.open(path, "wb") as f: await f.write(content) return FileRef(id=file_id, media_type=media_type)How It Works (Backend)
Backend Reference
This section describes how Scopix's backend orchestrates agents in the chat path (and analogously in the Pipeline API). The Pipeline builder lets you drive this same per-call agent execution path from a single HTTP request — you don't need to reimplement any of this logic. The primitives below (contracts, context, registry) are available if you want to build custom client-side orchestration around the same data shapes.
The Data Flow Cycle
The orchestrator uses an agentic ReAct loop (Reason → Act → Observe). Each chat message follows this cycle:
- Reason — The orchestrator LLM decides which tool (agent) to invoke next based on the user's query and previous observations
- Contract-driven input building — The agent's contract declares what data it needs via
TypedInput. The system reads matching fields fromConversationStatebased on(data_type, content_type_hint)and wraps them asFileCollection-shaped inputs - Execute — The agent runs with a fresh database session (isolation per tool call) and a per-agent timeout
- Contract-driven output routing — A
ContextRouterreads the agent's contractTypedOutputdeclarations and routes results to the correctConversationStatefields (e.g.FILE_IDSwith hint"images"→current_image_ids) - Observe — The LLM receives a concise summary of the result (not the raw data) and decides whether to invoke another agent or respond
- Context persistence — Updated conversation state carries forward to future turns
Contract-Driven Routing
Adding a new agent requires only defining its AgentContract. The input building and output routing are driven entirely by the contract's TypedInput/TypedOutput declarations and their content_type_hint fields — no hardcoded agent-name wiring needed.
Example Pipeline
A user message like "Find sunset images and organize them into folders" produces this sequence:
Iteration 1: LLM selects → search_images("sunset images") ┌─ Input: contract has no required FILE_IDS inputs ├─ Execute: ImageSearchAgent runs, returns 47 image IDs └─ Route: contract output (FILE_IDS, hint="images") → ContextRouter writes conversation.current_image_ids = [47 IDs] → content_type = "images"
Iteration 2: LLM observes "Found 47 images" → selects analyze_images("categorize") ┌─ Input: contract input (FILE_IDS, hint="images") │ → reads conversation.current_image_ids → wraps as FileCollection ├─ Execute: AnalysisAgent runs, returns 5 categories └─ Route: contract output (CATEGORIZATION) → ContextRouter writes conversation.current_categorization
Iteration 3: LLM observes "5 categories" → selects manage_folders("organize") ┌─ Input: contract input (CATEGORIZATION) │ → reads conversation.current_categorization ├─ Execute: FolderAgent creates folders, moves files └─ Route: contract output (FOLDER_RESULT) → no ConversationState mapping
LLM responds: "Found 47 sunset images and organized into 5 folders"Agent Contracts
Declaring Inputs and Outputs
Every agent declares a contract specifying the data types it consumes and produces. On the server, these contracts drive the content-type-hint-based input population and output routing that lets chained agents communicate without hard-coded wiring.
from scopix.types.contracts import AgentContract, AgentCapability, TypedInput, TypedOutput
# A search agent that PRODUCES file referencescontract = AgentContract( name="ImageSearchAgent", capability=AgentCapability.SEARCH, description="Searches images by natural language", inputs=( TypedInput( name="folder_scope", data_type="FILE_IDS", required=False, description="Optional folder to restrict search scope", ), ), outputs=( TypedOutput( name="results", data_type="FILE_IDS", # Produces a FileCollection mergeable=True, # Can be merged with other FILE_IDS content_type_hint="images", # Contains image references ), ),)# An analysis agent that CONSUMES file references and PRODUCES categorizationcontract = AgentContract( name="AnalysisAgent", capability=AgentCapability.ANALYSIS, description="Categorizes files into semantic groups", inputs=( TypedInput( name="file_ids", data_type="FILE_IDS", # Expects a FileCollection required=False, content_type_hint="images", # Specifically image files ), ), outputs=( TypedOutput(name="categorization", data_type="CATEGORIZATION", mergeable=True), TypedOutput(name="analysis", data_type="ANALYSIS_RESULT", mergeable=False), ),)Content Type Hints
The content_type_hint on contract inputs and outputs tells the graph what kind of files an agent works with. This enables content-aware routing — image search results are routed to image-consuming agents, document results to document agents.
"images"— Image file references (JPG, PNG, etc.)"documents"— Document file references (PDF, DOCX, etc.)"links"— Saved web page/link references"mixed"— Collection containing multiple content types
FileCollection in Practice
Producing Results
Search agents create FileCollection objects and write them as typed output to the execution context. The collection carries the content type and source provenance.
from scopix.types.file_collection import FileCollectionfrom scopix.types.agent import AgentResult
# Create a collection from search resultscollection = FileCollection.from_ids( ids=image_ids, # ["img_1", "img_2", ..., "img_47"] content_type="images", # Declares what's in the collection source_capability="SEARCH", # Tracks provenance)
# Write to context so downstream agents can read itcontext.write("results", collection.to_dict(), "FILE_IDS", "search_agent")
return AgentResult( success=True, agent_name="SearchAgent", capability="search", summary=f"Found {collection.count} images",)Consuming Results
Downstream agents receive file IDs through the execution context. They read from typed slots and can use a FileResolver to load actual content on demand.
from scopix.types.agent import AgentResultfrom scopix.types.file_collection import FileReffrom scopix.types.file_resolver import FileResolutionError
class AnalysisAgent: def __init__(self, resolver): self.resolver = resolver # Inject FileResolver via constructor
async def execute(self, context): # Read upstream results from context file_data = context.read("results", expected_type="FILE_IDS") image_ids = file_data.get("ids", [])
if not image_ids: return AgentResult( success=False, agent_name="AnalysisAgent", capability="analysis", error="No images to analyze", )
# Use FileResolver to load actual content when needed for image_id in image_ids[:10]: ref = FileRef(id=image_id, media_type="image/jpeg") try: image_bytes = await self.resolver.resolve(ref) # ... analyze image_bytes ... except FileResolutionError: continue # Skip unresolvable refsCombining FileCollections (Client-Side Fan-In)
Merging file references in your own code
On the server, the Pipeline API rejects two parallel parents that produce the same content type into one downstream step (the shared per-pipeline state has only one slot per content type, so a merge would last-write-wins). When you want that pattern, run the searches in separate pipelines and combine the results client-side using FileCollection.merge() or the DataTypeRegistry:
from scopix.types.data_types import get_default_registryfrom scopix.types.file_collection import FileCollection
registry = get_default_registry()
# Merge two FileCollections using the registryc1 = FileCollection.from_ids(["img-1", "img-2"], content_type="images")c2 = FileCollection.from_ids(["img-3", "img-4"], content_type="images")merged = registry.merge("FILE_IDS", c1, c2)# merged.ids → ["img-1", "img-2", "img-3", "img-4"]# merged.content_type → "images"
# Or directly:merged = FileCollection.merge(c1, c2)You can then feed the merged collection into a follow-up pipeline as seed data via .with_images(merged.ids).
Lazy Content Loading
IDs Travel, Data Stays
Agents pass lightweight FileCollection objects containing only IDs and metadata — not the actual file content. When an agent needs the real bytes (e.g. to analyze an image), it uses the FileResolver to load content on demand. This avoids bulk-loading hundreds of images through the graph.
# What flows through the graph: lightweight ID references# SearchAgent output → AnalysisAgent input{ "ids": ["img_1", "img_2", ..., "img_47"], # Just UUIDs "content_type": "images", "source_capability": "SEARCH"}# Total size: ~2 KB (just IDs)
# When AnalysisAgent needs actual image data:ref = FileRef(id="img_1")image_bytes = await resolver.resolve(ref)# → loads from storage on demand: 500 KB per image
# Or enrich metadata without loading content:enriched = await resolver.resolve_metadata(ref)# enriched.filename == "sunset_beach.jpg"Design Principles
Key Patterns
IDs travel, content stays
Agents pass lightweight FileCollection objects (just IDs). Content is loaded on-demand via FileResolver only when an agent actually needs the bytes.
Contracts drive wiring
Agents declare typed inputs/outputs via AgentContract. The server uses these contracts to populate each agent's typed inputs from the shared per-request state and route its outputs back into the same state — no hand-wired edges, no manual ID passing.
Content-aware routing
FILE_IDS slots carry a content_type ("images", "documents", "links"). Agents can filter or route based on what kind of files they receive.
Immutable data types
Most SDK data types (FileRef, AgentContract, Category, etc.) are frozen dataclasses. FileCollection enforces immutability via __slots__. Operations return new instances, preventing accidental mutation.
Structural typing
The Agent and FileResolver protocols use duck typing — no inheritance required. Any class with the right methods satisfies the protocol.
What's Next?
Pipelines
Chain agents with a fluent builder API and server-side execution
Type Reference
AgentContract, ExecutionContext, FileRef, FileCollection, FileResolver API reference
Standalone Agents
Direct access to search and operation agents
LangGraph Integration
Build autonomous agents with LangGraph and Scopix tools
Agent Operation Types
SynthesizeResult, DocumentAnalysisResult, ChunkReference

