Documentation
Agent Data Flow Types
Contracts, typed context, data types, agent protocol, file references, and content resolution
Build Your Own Agent Workflows
These types let you build multi-agent workflows with typed data passing between agents. Define contracts, share data through an ExecutionContext, and resolve file references to content. All types are importable from the top-level aion package. See the Agent Data Flow guide for usage patterns.
AgentCapability
String enum declaring what an agent can do. Used in AgentContract to specify primary capability and chaining hints.
from aion import AgentCapability
class AgentCapability(str, Enum): SEARCH = "search" FILTER = "filter" ANALYSIS = "analysis" COMPARISON = "comparison" EXPORT = "export" SUMMARIZATION = "summarization" RECOMMENDATION = "recommendation" VISUALIZATION = "visualization" ANALYTICS = "analytics" CROSS_REFERENCE = "cross_reference" SYNTHESIS = "synthesis" ORGANIZATION = "organization" CONVERSATIONAL = "conversational"TypedInput
Declares a typed input that an agent requires. Frozen dataclass.
from aion import TypedInput
@dataclass(frozen=True)class TypedInput: name: str # Slot name, e.g. "file_ids" data_type: str # Registry key, e.g. "FILE_IDS" required: bool = True # Whether the input must be present description: str = "" # Human-readable description content_type_hint: Optional[str] = None # "images", "documents", "links"Methods:
| Method | Returns |
|---|---|
to_dict() | dict[str, Any] |
TypedOutput
Declares a typed output that an agent produces. Frozen dataclass.
from aion import TypedOutput
@dataclass(frozen=True)class TypedOutput: name: str # Slot name, e.g. "results" data_type: str # Registry key, e.g. "FILE_IDS" mergeable: bool = False # Can multiple outputs be merged? description: str = "" # Human-readable description content_type_hint: Optional[str] = None # "images", "documents", "links"Methods:
| Method | Returns |
|---|---|
to_dict() | dict[str, Any] |
AgentContract
Unified contract declaring an agent's identity, capabilities, typed inputs/outputs, and execution hints. Frozen dataclass. Every agent returns one from get_contract().
from aion import AgentContract, AgentCapability, TypedInput, TypedOutput
@dataclass(frozen=True)class AgentContract: name: str # Agent name capability: AgentCapability # Primary capability description: str # Natural language description inputs: tuple[TypedInput, ...] = () # Typed inputs (tuple for immutability) outputs: tuple[TypedOutput, ...] = () # Typed outputs example_intents: tuple[str, ...] = () # Example user intents can_run_parallel: bool = True # Supports parallel execution typical_duration_ms: int = 5000 # Typical execution time can_chain_with: tuple[AgentCapability, ...] = () # Chaining hintsMethods:
| Method | Returns |
|---|---|
get_required_inputs() | list[TypedInput] |
get_input_by_name(name) | Optional[TypedInput] |
get_output_by_name(name) | Optional[TypedOutput] |
to_dict() | dict[str, Any] |
from_dict(data) (classmethod) | AgentContract |
Category
A single category from categorization, containing references to files and documents that belong to a semantic group. Frozen dataclass.
from aion import Category
@dataclass(frozen=True)class Category: name: str # Category name description: Optional[str] = None # Human-readable description file_ids: tuple[str, ...] = () # Image IDs document_ids: tuple[str, ...] = () # Document IDs top_tags: tuple[str, ...] = () # Top tags confidence: float = 1.0 # Confidence score (0.0-1.0)Properties and methods:
| Member | Returns |
|---|---|
count (property) | int — len(file_ids) + len(document_ids) |
content_type (property) | str — "images", "documents", "mixed", or "empty" |
merge_with(other) | Category — merged with deduplication |
to_dict() | dict[str, Any] |
CategorizationResult
Structured categorization output containing categories with file assignments. Frozen dataclass.
from aion import CategorizationResult, Category
@dataclass(frozen=True)class CategorizationResult: categories: tuple[Category, ...] = () total_files: int = 0 clustering_method: str = "unknown" status: str = "success" # "success", "partial", "failed" error_reason: Optional[str] = None source_capability: Optional[str] = NoneMethods:
| Method | Returns |
|---|---|
merge(*results) (classmethod) | CategorizationResult — merges categories by name |
to_dict() | dict[str, Any] |
from_dict(data) (classmethod) | CategorizationResult |
AnalysisResult
Structured analysis output with findings. Frozen dataclass.
from aion import AnalysisResult
@dataclass(frozen=True)class AnalysisResult: summary: str # Analysis summary findings: tuple[str, ...] = () # List of findings document_id: Optional[str] = None # Related document ID metadata: Optional[dict[str, Any]] = None # Additional metadataMethods:
| Method | Returns |
|---|---|
to_dict() | dict[str, Any] |
TextResult
Simple text output. Frozen dataclass.
from aion import TextResult
@dataclass(frozen=True)class TextResult: text: str # Text content metadata: Optional[dict[str, Any]] = None # Additional metadataMethods:
| Method | Returns |
|---|---|
to_dict() | dict[str, Any] |
CrossRefResult
Cross-reference analysis output. Frozen dataclass.
from aion import CrossRefResult
@dataclass(frozen=True)class CrossRefResult: relationships: tuple[dict[str, Any], ...] = () source_files: tuple[str, ...] = () target_files: tuple[str, ...] = () summary: str = "" metadata: Optional[dict[str, Any]] = NoneMethods:
| Method | Returns |
|---|---|
to_dict() | dict[str, Any] |
FolderResult
Result of folder organization operations. Frozen dataclass.
from aion import FolderResult
@dataclass(frozen=True)class FolderResult: folders_created: tuple[dict[str, Any], ...] = () files_moved: int = 0 documents_moved: int = 0 errors: tuple[str, ...] = () success: bool = TrueMethods:
| Method | Returns |
|---|---|
to_dict() | dict[str, Any] |
DataTypeDefinition
Definition of a data type that can flow between agents. Frozen dataclass.
from aion import DataTypeDefinition
@dataclass(frozen=True)class DataTypeDefinition: name: str # Type name, e.g. "FILE_IDS" description: str = "" # Human-readable description mergeable: bool = False # Whether merge is supportedDataTypeRegistry
Mutable registry for data types with optional merge functions.
from aion import DataTypeRegistry, DataTypeDefinition, FileCollection
registry = DataTypeRegistry()
# Register a type with a merge functionregistry.register( DataTypeDefinition(name="FILE_IDS", description="File references"), merge_fn=FileCollection.merge,)
# Register a non-mergeable typeregistry.register( DataTypeDefinition(name="TEXT", description="Text output"),)Methods:
| Method | Returns |
|---|---|
register(definition, merge_fn=None) | None |
get(name) | Optional[DataTypeDefinition] |
has_type(name) | bool |
can_merge(name) | bool |
merge(name, *values) | Any — raises ValueError if not mergeable |
list_types() | list[str] |
get_default_registry()
Factory function that returns a fresh DataTypeRegistry pre-loaded with built-in types. Each call returns a new instance so you can customize without affecting others.
from aion import get_default_registry
registry = get_default_registry()registry.list_types()# ["FILE_IDS", "CATEGORIZATION", "FOLDER_RESULT", "ANALYSIS_RESULT", "TEXT", "CROSS_REF"]
registry.can_merge("FILE_IDS") # Trueregistry.can_merge("CATEGORIZATION") # Trueregistry.can_merge("TEXT") # FalseTypedSlot
A typed slot holding a value with provenance. Frozen dataclass. Used internally by ExecutionContext.
from aion import TypedSlot
@dataclass(frozen=True)class TypedSlot: value: Any # The data (dict, list, etc.) data_type: str # Registry key, e.g. "FILE_IDS" source_node: str # Which agent produced this timestamp: Optional[str] = None # ISO 8601 (auto-set if None)ExecutionContext
Mutable, async-safe shared state container for agent workflows. Agents read inputs from and write outputs to named, typed slots.
from aion import ExecutionContext
ctx = ExecutionContext()
# Write a slotctx.write("results", {"ids": ["img-1", "img-2"]}, "FILE_IDS", "search_agent")
# Read a slot (raises KeyError if missing, TypeError if type mismatch)data = ctx.read("results")data = ctx.read("results", expected_type="FILE_IDS")
# Check existencectx.has_slot("results") # True
# Async-safe methods for parallel executionawait ctx.write_async("results", data, "FILE_IDS", "agent")data = await ctx.read_async("results")exists = await ctx.has_slot_async("results")Methods:
| Method | Description |
|---|---|
write(slot_name, value, data_type, source_node) | Write a value to a typed slot |
read(slot_name, expected_type=None) | Read a slot value |
has_slot(slot_name) | Check if a slot exists |
write_async(...) | Async-safe write (uses asyncio.Lock) |
read_async(...) | Async-safe read |
has_slot_async(slot_name) | Async-safe existence check |
get_slot_type(slot_name) | Returns data type string or None |
get_slots_by_type(data_type) | All slot values matching a type |
list_slots() | All slots with types and sources |
slots (property) | Read-only copy of internal slots |
to_dict() | Serialize for checkpointing |
from_dict(data) (classmethod) | Restore from serialized dict |
AgentResult
Result returned by an agent after execution. Frozen dataclass.
from aion import AgentResult
@dataclass(frozen=True)class AgentResult: success: bool # Whether execution succeeded agent_name: str # Name of the agent capability: str # AgentCapability value outputs: Optional[dict[str, Any]] = None # slot_name -> value error: Optional[str] = None # Error message if failed summary: Optional[str] = None # Human-readable summary execution_time_ms: float = 0 # Execution duration metadata: Optional[dict[str, Any]] = None # Additional metadataMethods:
| Method | Returns |
|---|---|
get_output(slot_name, default=None) | Any |
has_output(slot_name) | bool |
to_dict() | dict[str, Any] |
Agent (Protocol)
Runtime-checkable Protocol for SDK agents. Uses structural typing (duck typing) — no inheritance required, just implement the two methods.
from aion import Agent
@runtime_checkableclass Agent(Protocol): def get_contract(self) -> AgentContract: ... async def execute(self, context: ExecutionContext) -> AgentResult: ...
# Any class with these methods satisfies the protocol:class MyAgent: def get_contract(self) -> AgentContract: return AgentContract( name="MyAgent", capability=AgentCapability.SEARCH, description="My search agent", )
async def execute(self, context: ExecutionContext) -> AgentResult: query = context.read("query") # ... do work ... return AgentResult(success=True, agent_name="MyAgent", capability="search")
isinstance(MyAgent(), Agent) # TrueFileRef
A reference to a single file with optional metadata. Frozen dataclass.
from aion import FileRef
@dataclass(frozen=True)class FileRef: id: str # UUID, content hash, or URI media_type: str = "application/octet-stream" # MIME type filename: Optional[str] = None # Original filename size_bytes: Optional[int] = None # File size in bytes metadata: Optional[dict[str, Any]] = None # User-defined metadataMethods:
| Method | Returns |
|---|---|
from_api_response(data) (classmethod) | FileRef |
to_dict(exclude_none=True) | dict[str, Any] |
FileCollection
Typed, immutable collection of file references with merge, filter, and slice operations. All operations return new instances.
from aion import FileCollection, FileRef
# Create from bare IDscoll = FileCollection.from_ids(["uuid-1", "uuid-2"], content_type="images")
# Create from FileRef objectscoll = FileCollection( refs=[FileRef(id="uuid-1", media_type="image/jpeg")], content_type="images",)
# Propertiescoll.refs # tuple[FileRef, ...]coll.ids # list[str]coll.count # intcoll.content_type # "images", "documents", "mixed", "unknown"coll.source_capability # Optional[str] — which capability produced this
# Iterate, index, slicefor ref in coll: print(ref.id)first = coll[0] # Returns FileRefsubset = coll[0:5] # Returns FileCollection
# Filterimages = coll.filter(lambda r: r.media_type.startswith("image/"))
# Merge with deduplication (preserves richest metadata per ID)merged = FileCollection.merge(search_results, upload_results)
# Serializedata = coll.to_dict()# {"refs": [...], "ids": [...], "count": 2, "content_type": "images"}FileResolver (Protocol)
Runtime-checkable protocol for pluggable content resolution. Implement this to connect your own storage backend (S3, local filesystem, HTTP, etc.).
from aion import FileResolver
@runtime_checkableclass FileResolver(Protocol): async def resolve( self, ref: FileRef, hints: Optional[dict[str, Any]] = None ) -> bytes: ...
async def resolve_batch( self, refs: list[FileRef], batch_size: int = 100 ) -> AsyncIterator[tuple[FileRef, bytes]]: ...
async def resolve_metadata(self, ref: FileRef) -> FileRef: ...
async def store( self, content: bytes, media_type: str, metadata: Optional[dict[str, Any]] = None, ) -> FileRef: ...FileResolutionError
Raised when a file reference cannot be resolved to content.
from aion import FileResolutionError
try: content = await resolver.resolve(ref)except FileResolutionError as e: print(f"Failed to resolve {ref.id}: {e}")Pipeline Types
Types for multi-step agent pipelines. See the Pipelines guide for usage.
PipelineStep
A step in a pipeline workflow. Frozen dataclass.
@dataclass(frozen=True)class PipelineStep: agent: str # Agent name for this step intent: str # Natural language intent params: dict[str, Any] = {} # Additional parameters depends_on: Optional[tuple[int, ...]] = None # Step indices this depends onStepResult
Result from a single pipeline step. Frozen dataclass.
@dataclass(frozen=True)class StepResult: agent: str # Agent that executed the step status: str # "completed", "failed", "skipped" summary: Optional[str] # Human-readable summary outputs: dict[str, Any] # Step outputs error: Optional[str] # Error message if failed execution_time_ms: int # Execution duration in millisecondsPipelineResult
Result of pipeline execution. Frozen dataclass.
@dataclass(frozen=True)class PipelineResult: success: bool # Whether execution succeeded steps: list[StepResult] # Results for each step execution_time_ms: int # Total execution duration total_waves: int # Number of parallel execution waves errors: list[str] # Error messages token_usage: Optional[dict[str, int]] # Token consumption
# Methods def step(self, index: int) -> StepResult: ... # Get step result by index
# Properties @property def final(self) -> StepResult: ... # Get the last step's result