Documentation
Agent Data Flow
How agents pass files, results, and metadata between each other
Architecture Overview
The agentic system uses a typed execution graph to coordinate agents. Each agent declares what data it consumes and produces via contracts. Data flows through typed slots in an ExecutionContext, and agents pass lightweight file references (IDs) and resolve actual content on demand via the FileResolver.
The Pipeline builder exposes this graph engine via a single HTTP call — chain agents with a fluent API and the server handles auto-wiring, parallel execution, and data routing. This page covers the underlying primitives for building custom client-side workflows.
Building Your Own Agent
The Agent Protocol
To create an agent, implement two methods: get_contract() and execute(). No inheritance required — the SDK uses structural typing via a Protocol.
from aion import ( AgentCapability, AgentContract, AgentResult, ExecutionContext, FileCollection, TypedInput, TypedOutput,)
class MySearchAgent: def get_contract(self) -> AgentContract: return AgentContract( name="MySearchAgent", capability=AgentCapability.SEARCH, description="Searches images by description", inputs=( TypedInput(name="query", data_type="TEXT", description="Search query"), ), outputs=( TypedOutput(name="results", data_type="FILE_IDS", mergeable=True), ), can_chain_with=(AgentCapability.ANALYSIS, AgentCapability.ORGANIZATION), )
async def execute(self, context: ExecutionContext) -> AgentResult: query = context.read("query")
# ... perform search logic ... found_ids = ["img-1", "img-2", "img-3"]
results = FileCollection.from_ids(found_ids, content_type="images").to_dict()
# Write outputs to context for downstream agents context.write("results", results, "FILE_IDS", "MySearchAgent")
return AgentResult( success=True, agent_name="MySearchAgent", capability="search", summary=f"Found {len(found_ids)} images for '{query}'", )Chaining Agents with ExecutionContext
The ExecutionContext is the shared state container. Agent A writes results to a slot, Agent B reads from that slot. Each slot is typed and tracks provenance.
from aion import ExecutionContext
# Create shared contextctx = ExecutionContext()
# Agent A writes search resultsctx.write("search_results", {"ids": ["img-1", "img-2"]}, "FILE_IDS", "search_agent")
# Agent B reads them and writes categorizationfile_data = ctx.read("search_results", expected_type="FILE_IDS")ctx.write("categorization", { "categories": [{"name": "Poles", "file_ids": ["img-1", "img-2"]}], "total_files": 2,}, "CATEGORIZATION", "categorize_agent")
# Agent C reads both slotscat = ctx.read("categorization", expected_type="CATEGORIZATION")files = ctx.read("search_results")
# Inspect context statectx.list_slots()# [{"name": "search_results", "type": "FILE_IDS", "source": "search_agent"},# {"name": "categorization", "type": "CATEGORIZATION", "source": "categorize_agent"}]
# Serialize/restore for checkpointingcheckpoint = ctx.to_dict()restored = ExecutionContext.from_dict(checkpoint)Data Type Registry
The DataTypeRegistry tracks which data types exist and how to merge them. Use get_default_registry() for a pre-loaded registry, or build your own.
from aion import get_default_registry, DataTypeDefinition, DataTypeRegistry
# Pre-loaded with built-in typesregistry = get_default_registry()registry.list_types()# ["FILE_IDS", "CATEGORIZATION", "FOLDER_RESULT", "ANALYSIS_RESULT", "TEXT", "CROSS_REF"]
# FILE_IDS and CATEGORIZATION support mergingregistry.can_merge("FILE_IDS") # True — uses FileCollection.merge()registry.can_merge("TEXT") # False
# Register your own typesregistry.register( DataTypeDefinition(name="EMBEDDINGS", description="Vector embeddings"), merge_fn=lambda *arrs: [v for arr in arrs for v in arr], # Concatenate)Implementing a FileResolver
Implement the FileResolver Protocol to connect your own storage backend. Agents use it to resolve FileRef objects to actual content bytes.
from aion import FileRef, FileResolver, FileResolutionErrorimport aiofiles
class LocalFileResolver: def __init__(self, base_path: str): self.base_path = base_path
async def resolve(self, ref, hints=None): path = f"{self.base_path}/{ref.id}" try: async with aiofiles.open(path, "rb") as f: return await f.read() except FileNotFoundError: raise FileResolutionError(f"File not found: {ref.id}")
async def resolve_batch(self, refs, batch_size=100): for ref in refs: yield ref, await self.resolve(ref)
async def resolve_metadata(self, ref): return ref # Could enrich with file stats
async def store(self, content, media_type, metadata=None): import uuid file_id = str(uuid.uuid4()) path = f"{self.base_path}/{file_id}" async with aiofiles.open(path, "wb") as f: await f.write(content) return FileRef(id=file_id, media_type=media_type)How It Works (Backend)
Backend Reference
This section describes how Aionvision's backend orchestrates agents automatically. The Pipeline builder exposes this server-side graph engine via a fluent API — you don't need to reimplement any of this logic. The primitives below (contracts, context, registry) are available if you want to build custom client-side orchestration.
The Data Flow Cycle
The orchestrator uses an agentic ReAct loop (Reason → Act → Observe). Each chat message follows this cycle:
- Reason — The orchestrator LLM decides which tool (agent) to invoke next based on the user's query and previous observations
- Contract-driven input building — The agent's contract declares what data it needs via
TypedInput. The system reads matching fields fromConversationStatebased on(data_type, content_type_hint)and wraps them asFileCollection-shaped inputs - Execute — The agent runs with a fresh database session (isolation per tool call) and a per-agent timeout
- Contract-driven output routing — A
ContextRouterreads the agent's contractTypedOutputdeclarations and routes results to the correctConversationStatefields (e.g.FILE_IDSwith hint"images"→current_image_ids) - Observe — The LLM receives a concise summary of the result (not the raw data) and decides whether to invoke another agent or respond
- Context persistence — Updated conversation state carries forward to future turns
Contract-Driven Routing
Adding a new agent requires only defining its AgentContract. The input building and output routing are driven entirely by the contract's TypedInput/TypedOutput declarations and their content_type_hint fields — no hardcoded agent-name wiring needed.
Example Pipeline
A user message like "Find sunset images and organize them into folders" produces this sequence:
Iteration 1: LLM selects → search_images("sunset images") ┌─ Input: contract has no required FILE_IDS inputs ├─ Execute: ImageSearchAgent runs, returns 47 image IDs └─ Route: contract output (FILE_IDS, hint="images") → ContextRouter writes conversation.current_image_ids = [47 IDs] → content_type = "images"
Iteration 2: LLM observes "Found 47 images" → selects analyze_images("categorize") ┌─ Input: contract input (FILE_IDS, hint="images") │ → reads conversation.current_image_ids → wraps as FileCollection ├─ Execute: AnalysisAgent runs, returns 5 categories └─ Route: contract output (CATEGORIZATION) → ContextRouter writes conversation.current_categorization
Iteration 3: LLM observes "5 categories" → selects manage_folders("organize") ┌─ Input: contract input (CATEGORIZATION) │ → reads conversation.current_categorization ├─ Execute: FolderAgent creates folders, moves files └─ Route: contract output (FOLDER_RESULT) → no ConversationState mapping
LLM responds: "Found 47 sunset images and organized into 5 folders"Agent Contracts
Declaring Inputs and Outputs
Every agent declares a contract specifying the data types it consumes and produces. The graph builder uses these contracts to automatically wire edges between agents and verify type compatibility.
from aion import AgentContract, AgentCapability, TypedInput, TypedOutput
# A search agent that PRODUCES file referencescontract = AgentContract( name="ImageSearchAgent", capability=AgentCapability.SEARCH, description="Searches images by natural language", inputs=( TypedInput( name="folder_scope", data_type="FILE_IDS", required=False, description="Optional folder to restrict search scope", ), ), outputs=( TypedOutput( name="results", data_type="FILE_IDS", # Produces a FileCollection mergeable=True, # Can be merged with other FILE_IDS content_type_hint="images", # Contains image references ), ),)# An analysis agent that CONSUMES file references and PRODUCES categorizationcontract = AgentContract( name="AnalysisAgent", capability=AgentCapability.ANALYSIS, description="Categorizes files into semantic groups", inputs=( TypedInput( name="file_ids", data_type="FILE_IDS", # Expects a FileCollection required=False, content_type_hint="images", # Specifically image files ), ), outputs=( TypedOutput(name="categorization", data_type="CATEGORIZATION", mergeable=True), TypedOutput(name="analysis", data_type="ANALYSIS_RESULT", mergeable=False), ),)Content Type Hints
The content_type_hint on contract inputs and outputs tells the graph what kind of files an agent works with. This enables content-aware routing — image search results are routed to image-consuming agents, document results to document agents.
"images"— Image file references (JPG, PNG, etc.)"documents"— Document file references (PDF, DOCX, etc.)"links"— Saved web page/link references"mixed"— Collection containing multiple content types
FileCollection in Practice
Producing Results
Search agents create FileCollection objects and write them as typed output to the execution context. The collection carries the content type and source provenance.
from aion import FileCollection, AgentResult
# Create a collection from search resultscollection = FileCollection.from_ids( ids=image_ids, # ["img_1", "img_2", ..., "img_47"] content_type="images", # Declares what's in the collection source_capability="SEARCH", # Tracks provenance)
# Write to context so downstream agents can read itcontext.write("results", collection.to_dict(), "FILE_IDS", "search_agent")
return AgentResult( success=True, agent_name="SearchAgent", capability="search", summary=f"Found {collection.count} images",)Consuming Results
Downstream agents receive file IDs through the execution context. They read from typed slots and can use a FileResolver to load actual content on demand.
from aion import AgentResult, FileRef, FileResolutionError
class AnalysisAgent: def __init__(self, resolver): self.resolver = resolver # Inject FileResolver via constructor
async def execute(self, context): # Read upstream results from context file_data = context.read("results", expected_type="FILE_IDS") image_ids = file_data.get("ids", [])
if not image_ids: return AgentResult( success=False, agent_name="AnalysisAgent", capability="analysis", error="No images to analyze", )
# Use FileResolver to load actual content when needed for image_id in image_ids[:10]: ref = FileRef(id=image_id, media_type="image/jpeg") try: image_bytes = await self.resolver.resolve(ref) # ... analyze image_bytes ... except FileResolutionError: continue # Skip unresolvable refsMerge Nodes
Fan-In Pattern
When multiple agents produce the same data type and a downstream agent needs all of them, a merge step combines results. The DataTypeRegistry provides the merge function.
ImageSearchAgent ──→ result_0 (FILE_IDS, images) ╲ Merge (FILE_IDS) ──→ merged_0 ──→ AnalysisAgent ╱DocumentSearchAgent ──→ result_1 (FILE_IDS, documents)
# Merge calls FileCollection.merge(result_0, result_1)# Result: merged collection with content_type="mixed",# deduplicated IDsfrom aion import get_default_registry, FileCollection
registry = get_default_registry()
# Merge two FileCollections using the registryc1 = FileCollection.from_ids(["img-1", "img-2"], content_type="images")c2 = FileCollection.from_ids(["doc-1"], content_type="documents")merged = registry.merge("FILE_IDS", c1, c2)# merged.ids → ["img-1", "img-2", "doc-1"]# merged.content_type → "mixed"Lazy Content Loading
IDs Travel, Data Stays
Agents pass lightweight FileCollection objects containing only IDs and metadata — not the actual file content. When an agent needs the real bytes (e.g. to analyze an image), it uses the FileResolver to load content on demand. This avoids bulk-loading hundreds of images through the graph.
# What flows through the graph: lightweight ID references# SearchAgent output → AnalysisAgent input{ "ids": ["img_1", "img_2", ..., "img_47"], # Just UUIDs "content_type": "images", "source_capability": "SEARCH"}# Total size: ~2 KB (just IDs)
# When AnalysisAgent needs actual image data:ref = FileRef(id="img_1")image_bytes = await resolver.resolve(ref)# → loads from storage on demand: 500 KB per image
# Or enrich metadata without loading content:enriched = await resolver.resolve_metadata(ref)# enriched.filename == "sunset_beach.jpg"Design Principles
Key Patterns
IDs travel, content stays
Agents pass lightweight FileCollection objects (just IDs). Content is loaded on-demand via FileResolver only when an agent actually needs the bytes.
Contracts drive wiring
Agents declare typed inputs/outputs via AgentContract. The graph builder uses these contracts to automatically create edges, resolve types, and insert merge nodes.
Content-aware routing
FILE_IDS slots carry a content_type ("images", "documents", "links"). Agents can filter or route based on what kind of files they receive.
Immutable data types
Most SDK data types (FileRef, AgentContract, Category, etc.) are frozen dataclasses. FileCollection enforces immutability via __slots__. Operations return new instances, preventing accidental mutation.
Structural typing
The Agent and FileResolver protocols use duck typing — no inheritance required. Any class with the right methods satisfies the protocol.
What's Next?
Pipelines
Chain agents with a fluent builder API and server-side execution
Type Reference
AgentContract, ExecutionContext, FileRef, FileCollection, FileResolver API reference
Standalone Agents
Direct access to search and operation agents
LangGraph Integration
Build autonomous agents with LangGraph and Aionvision tools
Agent Operation Types
SynthesizeResult, DocumentAnalysisResult, OrganizeResult