Documentation

Agent Data Flow Types

Contracts, typed context, data types, agent protocol, file references, and content resolution

Build Your Own Agent Workflows

These types let you build multi-agent workflows with typed data passing between agents. Define contracts, share data through an ExecutionContext, and resolve file references to content. All types are importable from the top-level aion package. See the Agent Data Flow guide for usage patterns.

AgentCapability

String enum declaring what an agent can do. Used in AgentContract to specify primary capability and chaining hints.

from aion import AgentCapability
class AgentCapability(str, Enum):
SEARCH = "search"
FILTER = "filter"
ANALYSIS = "analysis"
COMPARISON = "comparison"
EXPORT = "export"
SUMMARIZATION = "summarization"
RECOMMENDATION = "recommendation"
VISUALIZATION = "visualization"
ANALYTICS = "analytics"
CROSS_REFERENCE = "cross_reference"
SYNTHESIS = "synthesis"
ORGANIZATION = "organization"
CONVERSATIONAL = "conversational"

TypedInput

Declares a typed input that an agent requires. Frozen dataclass.

from aion import TypedInput
@dataclass(frozen=True)
class TypedInput:
name: str # Slot name, e.g. "file_ids"
data_type: str # Registry key, e.g. "FILE_IDS"
required: bool = True # Whether the input must be present
description: str = "" # Human-readable description
content_type_hint: Optional[str] = None # "images", "documents", "links"

Methods:

MethodReturns
to_dict()dict[str, Any]

TypedOutput

Declares a typed output that an agent produces. Frozen dataclass.

from aion import TypedOutput
@dataclass(frozen=True)
class TypedOutput:
name: str # Slot name, e.g. "results"
data_type: str # Registry key, e.g. "FILE_IDS"
mergeable: bool = False # Can multiple outputs be merged?
description: str = "" # Human-readable description
content_type_hint: Optional[str] = None # "images", "documents", "links"

Methods:

MethodReturns
to_dict()dict[str, Any]

AgentContract

Unified contract declaring an agent's identity, capabilities, typed inputs/outputs, and execution hints. Frozen dataclass. Every agent returns one from get_contract().

from aion import AgentContract, AgentCapability, TypedInput, TypedOutput
@dataclass(frozen=True)
class AgentContract:
name: str # Agent name
capability: AgentCapability # Primary capability
description: str # Natural language description
inputs: tuple[TypedInput, ...] = () # Typed inputs (tuple for immutability)
outputs: tuple[TypedOutput, ...] = () # Typed outputs
example_intents: tuple[str, ...] = () # Example user intents
can_run_parallel: bool = True # Supports parallel execution
typical_duration_ms: int = 5000 # Typical execution time
can_chain_with: tuple[AgentCapability, ...] = () # Chaining hints

Methods:

MethodReturns
get_required_inputs()list[TypedInput]
get_input_by_name(name)Optional[TypedInput]
get_output_by_name(name)Optional[TypedOutput]
to_dict()dict[str, Any]
from_dict(data) (classmethod)AgentContract

Category

A single category from categorization, containing references to files and documents that belong to a semantic group. Frozen dataclass.

from aion import Category
@dataclass(frozen=True)
class Category:
name: str # Category name
description: Optional[str] = None # Human-readable description
file_ids: tuple[str, ...] = () # Image IDs
document_ids: tuple[str, ...] = () # Document IDs
top_tags: tuple[str, ...] = () # Top tags
confidence: float = 1.0 # Confidence score (0.0-1.0)

Properties and methods:

MemberReturns
count (property)intlen(file_ids) + len(document_ids)
content_type (property)str — "images", "documents", "mixed", or "empty"
merge_with(other)Category — merged with deduplication
to_dict()dict[str, Any]

CategorizationResult

Structured categorization output containing categories with file assignments. Frozen dataclass.

from aion import CategorizationResult, Category
@dataclass(frozen=True)
class CategorizationResult:
categories: tuple[Category, ...] = ()
total_files: int = 0
clustering_method: str = "unknown"
status: str = "success" # "success", "partial", "failed"
error_reason: Optional[str] = None
source_capability: Optional[str] = None

Methods:

MethodReturns
merge(*results) (classmethod)CategorizationResult — merges categories by name
to_dict()dict[str, Any]
from_dict(data) (classmethod)CategorizationResult

AnalysisResult

Structured analysis output with findings. Frozen dataclass.

from aion import AnalysisResult
@dataclass(frozen=True)
class AnalysisResult:
summary: str # Analysis summary
findings: tuple[str, ...] = () # List of findings
document_id: Optional[str] = None # Related document ID
metadata: Optional[dict[str, Any]] = None # Additional metadata

Methods:

MethodReturns
to_dict()dict[str, Any]

TextResult

Simple text output. Frozen dataclass.

from aion import TextResult
@dataclass(frozen=True)
class TextResult:
text: str # Text content
metadata: Optional[dict[str, Any]] = None # Additional metadata

Methods:

MethodReturns
to_dict()dict[str, Any]

CrossRefResult

Cross-reference analysis output. Frozen dataclass.

from aion import CrossRefResult
@dataclass(frozen=True)
class CrossRefResult:
relationships: tuple[dict[str, Any], ...] = ()
source_files: tuple[str, ...] = ()
target_files: tuple[str, ...] = ()
summary: str = ""
metadata: Optional[dict[str, Any]] = None

Methods:

MethodReturns
to_dict()dict[str, Any]

FolderResult

Result of folder organization operations. Frozen dataclass.

from aion import FolderResult
@dataclass(frozen=True)
class FolderResult:
folders_created: tuple[dict[str, Any], ...] = ()
files_moved: int = 0
documents_moved: int = 0
errors: tuple[str, ...] = ()
success: bool = True

Methods:

MethodReturns
to_dict()dict[str, Any]

DataTypeDefinition

Definition of a data type that can flow between agents. Frozen dataclass.

from aion import DataTypeDefinition
@dataclass(frozen=True)
class DataTypeDefinition:
name: str # Type name, e.g. "FILE_IDS"
description: str = "" # Human-readable description
mergeable: bool = False # Whether merge is supported

DataTypeRegistry

Mutable registry for data types with optional merge functions.

from aion import DataTypeRegistry, DataTypeDefinition, FileCollection
registry = DataTypeRegistry()
# Register a type with a merge function
registry.register(
DataTypeDefinition(name="FILE_IDS", description="File references"),
merge_fn=FileCollection.merge,
)
# Register a non-mergeable type
registry.register(
DataTypeDefinition(name="TEXT", description="Text output"),
)

Methods:

MethodReturns
register(definition, merge_fn=None)None
get(name)Optional[DataTypeDefinition]
has_type(name)bool
can_merge(name)bool
merge(name, *values)Any — raises ValueError if not mergeable
list_types()list[str]

get_default_registry()

Factory function that returns a fresh DataTypeRegistry pre-loaded with built-in types. Each call returns a new instance so you can customize without affecting others.

from aion import get_default_registry
registry = get_default_registry()
registry.list_types()
# ["FILE_IDS", "CATEGORIZATION", "FOLDER_RESULT", "ANALYSIS_RESULT", "TEXT", "CROSS_REF"]
registry.can_merge("FILE_IDS") # True
registry.can_merge("CATEGORIZATION") # True
registry.can_merge("TEXT") # False

TypedSlot

A typed slot holding a value with provenance. Frozen dataclass. Used internally by ExecutionContext.

from aion import TypedSlot
@dataclass(frozen=True)
class TypedSlot:
value: Any # The data (dict, list, etc.)
data_type: str # Registry key, e.g. "FILE_IDS"
source_node: str # Which agent produced this
timestamp: Optional[str] = None # ISO 8601 (auto-set if None)

ExecutionContext

Mutable, async-safe shared state container for agent workflows. Agents read inputs from and write outputs to named, typed slots.

from aion import ExecutionContext
ctx = ExecutionContext()
# Write a slot
ctx.write("results", {"ids": ["img-1", "img-2"]}, "FILE_IDS", "search_agent")
# Read a slot (raises KeyError if missing, TypeError if type mismatch)
data = ctx.read("results")
data = ctx.read("results", expected_type="FILE_IDS")
# Check existence
ctx.has_slot("results") # True
# Async-safe methods for parallel execution
await ctx.write_async("results", data, "FILE_IDS", "agent")
data = await ctx.read_async("results")
exists = await ctx.has_slot_async("results")

Methods:

MethodDescription
write(slot_name, value, data_type, source_node)Write a value to a typed slot
read(slot_name, expected_type=None)Read a slot value
has_slot(slot_name)Check if a slot exists
write_async(...)Async-safe write (uses asyncio.Lock)
read_async(...)Async-safe read
has_slot_async(slot_name)Async-safe existence check
get_slot_type(slot_name)Returns data type string or None
get_slots_by_type(data_type)All slot values matching a type
list_slots()All slots with types and sources
slots (property)Read-only copy of internal slots
to_dict()Serialize for checkpointing
from_dict(data) (classmethod)Restore from serialized dict

AgentResult

Result returned by an agent after execution. Frozen dataclass.

from aion import AgentResult
@dataclass(frozen=True)
class AgentResult:
success: bool # Whether execution succeeded
agent_name: str # Name of the agent
capability: str # AgentCapability value
outputs: Optional[dict[str, Any]] = None # slot_name -> value
error: Optional[str] = None # Error message if failed
summary: Optional[str] = None # Human-readable summary
execution_time_ms: float = 0 # Execution duration
metadata: Optional[dict[str, Any]] = None # Additional metadata

Methods:

MethodReturns
get_output(slot_name, default=None)Any
has_output(slot_name)bool
to_dict()dict[str, Any]

Agent (Protocol)

Runtime-checkable Protocol for SDK agents. Uses structural typing (duck typing) — no inheritance required, just implement the two methods.

from aion import Agent
@runtime_checkable
class Agent(Protocol):
def get_contract(self) -> AgentContract: ...
async def execute(self, context: ExecutionContext) -> AgentResult: ...
# Any class with these methods satisfies the protocol:
class MyAgent:
def get_contract(self) -> AgentContract:
return AgentContract(
name="MyAgent",
capability=AgentCapability.SEARCH,
description="My search agent",
)
async def execute(self, context: ExecutionContext) -> AgentResult:
query = context.read("query")
# ... do work ...
return AgentResult(success=True, agent_name="MyAgent", capability="search")
isinstance(MyAgent(), Agent) # True

FileRef

A reference to a single file with optional metadata. Frozen dataclass.

from aion import FileRef
@dataclass(frozen=True)
class FileRef:
id: str # UUID, content hash, or URI
media_type: str = "application/octet-stream" # MIME type
filename: Optional[str] = None # Original filename
size_bytes: Optional[int] = None # File size in bytes
metadata: Optional[dict[str, Any]] = None # User-defined metadata

Methods:

MethodReturns
from_api_response(data) (classmethod)FileRef
to_dict(exclude_none=True)dict[str, Any]

FileCollection

Typed, immutable collection of file references with merge, filter, and slice operations. All operations return new instances.

from aion import FileCollection, FileRef
# Create from bare IDs
coll = FileCollection.from_ids(["uuid-1", "uuid-2"], content_type="images")
# Create from FileRef objects
coll = FileCollection(
refs=[FileRef(id="uuid-1", media_type="image/jpeg")],
content_type="images",
)
# Properties
coll.refs # tuple[FileRef, ...]
coll.ids # list[str]
coll.count # int
coll.content_type # "images", "documents", "mixed", "unknown"
coll.source_capability # Optional[str] — which capability produced this
# Iterate, index, slice
for ref in coll: print(ref.id)
first = coll[0] # Returns FileRef
subset = coll[0:5] # Returns FileCollection
# Filter
images = coll.filter(lambda r: r.media_type.startswith("image/"))
# Merge with deduplication (preserves richest metadata per ID)
merged = FileCollection.merge(search_results, upload_results)
# Serialize
data = coll.to_dict()
# {"refs": [...], "ids": [...], "count": 2, "content_type": "images"}

FileResolver (Protocol)

Runtime-checkable protocol for pluggable content resolution. Implement this to connect your own storage backend (S3, local filesystem, HTTP, etc.).

from aion import FileResolver
@runtime_checkable
class FileResolver(Protocol):
async def resolve(
self, ref: FileRef, hints: Optional[dict[str, Any]] = None
) -> bytes: ...
async def resolve_batch(
self, refs: list[FileRef], batch_size: int = 100
) -> AsyncIterator[tuple[FileRef, bytes]]: ...
async def resolve_metadata(self, ref: FileRef) -> FileRef: ...
async def store(
self, content: bytes, media_type: str,
metadata: Optional[dict[str, Any]] = None,
) -> FileRef: ...

FileResolutionError

Raised when a file reference cannot be resolved to content.

from aion import FileResolutionError
try:
content = await resolver.resolve(ref)
except FileResolutionError as e:
print(f"Failed to resolve {ref.id}: {e}")

Pipeline Types

Types for multi-step agent pipelines. See the Pipelines guide for usage.

PipelineStep

A step in a pipeline workflow. Frozen dataclass.

@dataclass(frozen=True)
class PipelineStep:
agent: str # Agent name for this step
intent: str # Natural language intent
params: dict[str, Any] = {} # Additional parameters
depends_on: Optional[tuple[int, ...]] = None # Step indices this depends on

StepResult

Result from a single pipeline step. Frozen dataclass.

@dataclass(frozen=True)
class StepResult:
agent: str # Agent that executed the step
status: str # "completed", "failed", "skipped"
summary: Optional[str] # Human-readable summary
outputs: dict[str, Any] # Step outputs
error: Optional[str] # Error message if failed
execution_time_ms: int # Execution duration in milliseconds

PipelineResult

Result of pipeline execution. Frozen dataclass.

@dataclass(frozen=True)
class PipelineResult:
success: bool # Whether execution succeeded
steps: list[StepResult] # Results for each step
execution_time_ms: int # Total execution duration
total_waves: int # Number of parallel execution waves
errors: list[str] # Error messages
token_usage: Optional[dict[str, int]] # Token consumption
# Methods
def step(self, index: int) -> StepResult: ... # Get step result by index
# Properties
@property
def final(self) -> StepResult: ... # Get the last step's result