Documentation

Agent Data Flow

How agents pass files, results, and metadata between each other

Architecture Overview

The agentic system uses a typed execution graph to coordinate agents. Each agent declares what data it consumes and produces via contracts. Data flows through typed slots in an ExecutionContext, and agents pass lightweight file references (IDs) and resolve actual content on demand via the FileResolver.

The Pipeline builder exposes this graph engine via a single HTTP call — chain agents with a fluent API and the server handles auto-wiring, parallel execution, and data routing. This page covers the underlying primitives for building custom client-side workflows.

Building Your Own Agent

The Agent Protocol

To create an agent, implement two methods: get_contract() and execute(). No inheritance required — the SDK uses structural typing via a Protocol.

from aion import (
AgentCapability, AgentContract, AgentResult,
ExecutionContext, FileCollection, TypedInput, TypedOutput,
)
class MySearchAgent:
def get_contract(self) -> AgentContract:
return AgentContract(
name="MySearchAgent",
capability=AgentCapability.SEARCH,
description="Searches images by description",
inputs=(
TypedInput(name="query", data_type="TEXT", description="Search query"),
),
outputs=(
TypedOutput(name="results", data_type="FILE_IDS", mergeable=True),
),
can_chain_with=(AgentCapability.ANALYSIS, AgentCapability.ORGANIZATION),
)
async def execute(self, context: ExecutionContext) -> AgentResult:
query = context.read("query")
# ... perform search logic ...
found_ids = ["img-1", "img-2", "img-3"]
results = FileCollection.from_ids(found_ids, content_type="images").to_dict()
# Write outputs to context for downstream agents
context.write("results", results, "FILE_IDS", "MySearchAgent")
return AgentResult(
success=True,
agent_name="MySearchAgent",
capability="search",
summary=f"Found {len(found_ids)} images for '{query}'",
)

Chaining Agents with ExecutionContext

The ExecutionContext is the shared state container. Agent A writes results to a slot, Agent B reads from that slot. Each slot is typed and tracks provenance.

from aion import ExecutionContext
# Create shared context
ctx = ExecutionContext()
# Agent A writes search results
ctx.write("search_results", {"ids": ["img-1", "img-2"]}, "FILE_IDS", "search_agent")
# Agent B reads them and writes categorization
file_data = ctx.read("search_results", expected_type="FILE_IDS")
ctx.write("categorization", {
"categories": [{"name": "Poles", "file_ids": ["img-1", "img-2"]}],
"total_files": 2,
}, "CATEGORIZATION", "categorize_agent")
# Agent C reads both slots
cat = ctx.read("categorization", expected_type="CATEGORIZATION")
files = ctx.read("search_results")
# Inspect context state
ctx.list_slots()
# [{"name": "search_results", "type": "FILE_IDS", "source": "search_agent"},
# {"name": "categorization", "type": "CATEGORIZATION", "source": "categorize_agent"}]
# Serialize/restore for checkpointing
checkpoint = ctx.to_dict()
restored = ExecutionContext.from_dict(checkpoint)

Data Type Registry

The DataTypeRegistry tracks which data types exist and how to merge them. Use get_default_registry() for a pre-loaded registry, or build your own.

from aion import get_default_registry, DataTypeDefinition, DataTypeRegistry
# Pre-loaded with built-in types
registry = get_default_registry()
registry.list_types()
# ["FILE_IDS", "CATEGORIZATION", "FOLDER_RESULT", "ANALYSIS_RESULT", "TEXT", "CROSS_REF"]
# FILE_IDS and CATEGORIZATION support merging
registry.can_merge("FILE_IDS") # True — uses FileCollection.merge()
registry.can_merge("TEXT") # False
# Register your own types
registry.register(
DataTypeDefinition(name="EMBEDDINGS", description="Vector embeddings"),
merge_fn=lambda *arrs: [v for arr in arrs for v in arr], # Concatenate
)

Implementing a FileResolver

Implement the FileResolver Protocol to connect your own storage backend. Agents use it to resolve FileRef objects to actual content bytes.

from aion import FileRef, FileResolver, FileResolutionError
import aiofiles
class LocalFileResolver:
def __init__(self, base_path: str):
self.base_path = base_path
async def resolve(self, ref, hints=None):
path = f"{self.base_path}/{ref.id}"
try:
async with aiofiles.open(path, "rb") as f:
return await f.read()
except FileNotFoundError:
raise FileResolutionError(f"File not found: {ref.id}")
async def resolve_batch(self, refs, batch_size=100):
for ref in refs:
yield ref, await self.resolve(ref)
async def resolve_metadata(self, ref):
return ref # Could enrich with file stats
async def store(self, content, media_type, metadata=None):
import uuid
file_id = str(uuid.uuid4())
path = f"{self.base_path}/{file_id}"
async with aiofiles.open(path, "wb") as f:
await f.write(content)
return FileRef(id=file_id, media_type=media_type)

How It Works (Backend)

Backend Reference

This section describes how Aionvision's backend orchestrates agents automatically. The Pipeline builder exposes this server-side graph engine via a fluent API — you don't need to reimplement any of this logic. The primitives below (contracts, context, registry) are available if you want to build custom client-side orchestration.

The Data Flow Cycle

The orchestrator uses an agentic ReAct loop (Reason → Act → Observe). Each chat message follows this cycle:

  1. Reason — The orchestrator LLM decides which tool (agent) to invoke next based on the user's query and previous observations
  2. Contract-driven input building — The agent's contract declares what data it needs via TypedInput. The system reads matching fields from ConversationState based on (data_type, content_type_hint) and wraps them as FileCollection-shaped inputs
  3. Execute — The agent runs with a fresh database session (isolation per tool call) and a per-agent timeout
  4. Contract-driven output routing — A ContextRouter reads the agent's contract TypedOutput declarations and routes results to the correct ConversationState fields (e.g. FILE_IDS with hint "images"current_image_ids)
  5. Observe — The LLM receives a concise summary of the result (not the raw data) and decides whether to invoke another agent or respond
  6. Context persistence — Updated conversation state carries forward to future turns

Contract-Driven Routing

Adding a new agent requires only defining its AgentContract. The input building and output routing are driven entirely by the contract's TypedInput/TypedOutput declarations and their content_type_hint fields — no hardcoded agent-name wiring needed.

Example Pipeline

A user message like "Find sunset images and organize them into folders" produces this sequence:

Iteration 1: LLM selects → search_images("sunset images")
┌─ Input: contract has no required FILE_IDS inputs
├─ Execute: ImageSearchAgent runs, returns 47 image IDs
└─ Route: contract output (FILE_IDS, hint="images")
→ ContextRouter writes conversation.current_image_ids = [47 IDs]
→ content_type = "images"
Iteration 2: LLM observes "Found 47 images" → selects analyze_images("categorize")
┌─ Input: contract input (FILE_IDS, hint="images")
│ → reads conversation.current_image_ids → wraps as FileCollection
├─ Execute: AnalysisAgent runs, returns 5 categories
└─ Route: contract output (CATEGORIZATION)
→ ContextRouter writes conversation.current_categorization
Iteration 3: LLM observes "5 categories" → selects manage_folders("organize")
┌─ Input: contract input (CATEGORIZATION)
│ → reads conversation.current_categorization
├─ Execute: FolderAgent creates folders, moves files
└─ Route: contract output (FOLDER_RESULT) → no ConversationState mapping
LLM responds: "Found 47 sunset images and organized into 5 folders"

Agent Contracts

Declaring Inputs and Outputs

Every agent declares a contract specifying the data types it consumes and produces. The graph builder uses these contracts to automatically wire edges between agents and verify type compatibility.

from aion import AgentContract, AgentCapability, TypedInput, TypedOutput
# A search agent that PRODUCES file references
contract = AgentContract(
name="ImageSearchAgent",
capability=AgentCapability.SEARCH,
description="Searches images by natural language",
inputs=(
TypedInput(
name="folder_scope",
data_type="FILE_IDS",
required=False,
description="Optional folder to restrict search scope",
),
),
outputs=(
TypedOutput(
name="results",
data_type="FILE_IDS", # Produces a FileCollection
mergeable=True, # Can be merged with other FILE_IDS
content_type_hint="images", # Contains image references
),
),
)
# An analysis agent that CONSUMES file references and PRODUCES categorization
contract = AgentContract(
name="AnalysisAgent",
capability=AgentCapability.ANALYSIS,
description="Categorizes files into semantic groups",
inputs=(
TypedInput(
name="file_ids",
data_type="FILE_IDS", # Expects a FileCollection
required=False,
content_type_hint="images", # Specifically image files
),
),
outputs=(
TypedOutput(name="categorization", data_type="CATEGORIZATION", mergeable=True),
TypedOutput(name="analysis", data_type="ANALYSIS_RESULT", mergeable=False),
),
)

Content Type Hints

The content_type_hint on contract inputs and outputs tells the graph what kind of files an agent works with. This enables content-aware routing — image search results are routed to image-consuming agents, document results to document agents.

  • "images" — Image file references (JPG, PNG, etc.)
  • "documents" — Document file references (PDF, DOCX, etc.)
  • "links" — Saved web page/link references
  • "mixed" — Collection containing multiple content types

FileCollection in Practice

Producing Results

Search agents create FileCollection objects and write them as typed output to the execution context. The collection carries the content type and source provenance.

from aion import FileCollection, AgentResult
# Create a collection from search results
collection = FileCollection.from_ids(
ids=image_ids, # ["img_1", "img_2", ..., "img_47"]
content_type="images", # Declares what's in the collection
source_capability="SEARCH", # Tracks provenance
)
# Write to context so downstream agents can read it
context.write("results", collection.to_dict(), "FILE_IDS", "search_agent")
return AgentResult(
success=True,
agent_name="SearchAgent",
capability="search",
summary=f"Found {collection.count} images",
)

Consuming Results

Downstream agents receive file IDs through the execution context. They read from typed slots and can use a FileResolver to load actual content on demand.

from aion import AgentResult, FileRef, FileResolutionError
class AnalysisAgent:
def __init__(self, resolver):
self.resolver = resolver # Inject FileResolver via constructor
async def execute(self, context):
# Read upstream results from context
file_data = context.read("results", expected_type="FILE_IDS")
image_ids = file_data.get("ids", [])
if not image_ids:
return AgentResult(
success=False, agent_name="AnalysisAgent",
capability="analysis", error="No images to analyze",
)
# Use FileResolver to load actual content when needed
for image_id in image_ids[:10]:
ref = FileRef(id=image_id, media_type="image/jpeg")
try:
image_bytes = await self.resolver.resolve(ref)
# ... analyze image_bytes ...
except FileResolutionError:
continue # Skip unresolvable refs

Merge Nodes

Fan-In Pattern

When multiple agents produce the same data type and a downstream agent needs all of them, a merge step combines results. The DataTypeRegistry provides the merge function.

ImageSearchAgent ──→ result_0 (FILE_IDS, images)
Merge (FILE_IDS) ──→ merged_0 ──→ AnalysisAgent
DocumentSearchAgent ──→ result_1 (FILE_IDS, documents)
# Merge calls FileCollection.merge(result_0, result_1)
# Result: merged collection with content_type="mixed",
# deduplicated IDs
from aion import get_default_registry, FileCollection
registry = get_default_registry()
# Merge two FileCollections using the registry
c1 = FileCollection.from_ids(["img-1", "img-2"], content_type="images")
c2 = FileCollection.from_ids(["doc-1"], content_type="documents")
merged = registry.merge("FILE_IDS", c1, c2)
# merged.ids → ["img-1", "img-2", "doc-1"]
# merged.content_type → "mixed"

Lazy Content Loading

IDs Travel, Data Stays

Agents pass lightweight FileCollection objects containing only IDs and metadata — not the actual file content. When an agent needs the real bytes (e.g. to analyze an image), it uses the FileResolver to load content on demand. This avoids bulk-loading hundreds of images through the graph.

# What flows through the graph: lightweight ID references
# SearchAgent output → AnalysisAgent input
{
"ids": ["img_1", "img_2", ..., "img_47"], # Just UUIDs
"content_type": "images",
"source_capability": "SEARCH"
}
# Total size: ~2 KB (just IDs)
# When AnalysisAgent needs actual image data:
ref = FileRef(id="img_1")
image_bytes = await resolver.resolve(ref)
# → loads from storage on demand: 500 KB per image
# Or enrich metadata without loading content:
enriched = await resolver.resolve_metadata(ref)
# enriched.filename == "sunset_beach.jpg"

Design Principles

Key Patterns

IDs travel, content stays

Agents pass lightweight FileCollection objects (just IDs). Content is loaded on-demand via FileResolver only when an agent actually needs the bytes.

Contracts drive wiring

Agents declare typed inputs/outputs via AgentContract. The graph builder uses these contracts to automatically create edges, resolve types, and insert merge nodes.

Content-aware routing

FILE_IDS slots carry a content_type ("images", "documents", "links"). Agents can filter or route based on what kind of files they receive.

Immutable data types

Most SDK data types (FileRef, AgentContract, Category, etc.) are frozen dataclasses. FileCollection enforces immutability via __slots__. Operations return new instances, preventing accidental mutation.

Structural typing

The Agent and FileResolver protocols use duck typing — no inheritance required. Any class with the right methods satisfies the protocol.