Documentation
Agent Data Flow Types
Contracts, typed context, data types, agent protocol, file references, and content resolution
Build Your Own Agent Workflows
These types let you build multi-agent workflows with typed data passing between agents. Define contracts, share data through an ExecutionContext, and resolve file references to content. These types live in the scopix.types submodules (e.g. scopix.types.contracts, scopix.types.context, scopix.types.file_collection, scopix.types.file_resolver, scopix.types.data_types, scopix.types.agent) — they arenot re-exported at the top of scopix. Each example below uses the correct submodule path. See the Agent Data Flow guide for usage patterns.
AgentCapability
String enum declaring what an agent can do. Used in AgentContract to specify primary capability and chaining hints.
from scopix.types.contracts import AgentCapability
class AgentCapability(str, Enum): SEARCH = "search" FILTER = "filter" ANALYSIS = "analysis" COMPARISON = "comparison" EXPORT = "export" SUMMARIZATION = "summarization" RECOMMENDATION = "recommendation" VISUALIZATION = "visualization" ANALYTICS = "analytics" CROSS_REFERENCE = "cross_reference" SYNTHESIS = "synthesis" ORGANIZATION = "organization" CONVERSATIONAL = "conversational"TypedInput
Declares a typed input that an agent requires. Frozen dataclass.
from scopix.types.contracts import TypedInput
@dataclass(frozen=True)class TypedInput: name: str # Slot name, e.g. "file_ids" data_type: str # Registry key, e.g. "FILE_IDS" required: bool = True # Whether the input must be present description: str = "" # Human-readable description content_type_hint: Optional[str] = None # "images", "documents", "links"Methods:
| Method | Returns |
|---|---|
to_dict() | dict[str, Any] |
TypedOutput
Declares a typed output that an agent produces. Frozen dataclass.
from scopix.types.contracts import TypedOutput
@dataclass(frozen=True)class TypedOutput: name: str # Slot name, e.g. "results" data_type: str # Registry key, e.g. "FILE_IDS" mergeable: bool = False # Can multiple outputs be merged? description: str = "" # Human-readable description content_type_hint: Optional[str] = None # "images", "documents", "links"Methods:
| Method | Returns |
|---|---|
to_dict() | dict[str, Any] |
AgentContract
Unified contract declaring an agent's identity, capabilities, typed inputs/outputs, and execution hints. Frozen dataclass. Every agent returns one from get_contract().
from scopix.types.contracts import AgentContract, AgentCapability, TypedInput, TypedOutput
@dataclass(frozen=True)class AgentContract: name: str # Agent name capability: AgentCapability # Primary capability description: str # Natural language description inputs: tuple[TypedInput, ...] = () # Typed inputs (tuple for immutability) outputs: tuple[TypedOutput, ...] = () # Typed outputs example_intents: tuple[str, ...] = () # Example user intents can_run_parallel: bool = True # Supports parallel execution typical_duration_ms: int = 5000 # Typical execution time can_chain_with: tuple[AgentCapability, ...] = () # Chaining hintsMethods:
| Method | Returns |
|---|---|
get_required_inputs() | list[TypedInput] |
get_input_by_name(name) | Optional[TypedInput] |
get_output_by_name(name) | Optional[TypedOutput] |
to_dict() | dict[str, Any] |
from_dict(data) (classmethod) | AgentContract |
Category
A single category from categorization, containing references to files and documents that belong to a semantic group. Frozen dataclass.
from scopix.types.data_types import Category
@dataclass(frozen=True)class Category: name: str # Category name description: Optional[str] = None # Human-readable description file_ids: tuple[str, ...] = () # Image IDs document_ids: tuple[str, ...] = () # Document IDs video_ids: tuple[str, ...] = () # Video IDs top_tags: tuple[str, ...] = () # Top tags confidence: float = 1.0 # Confidence score (0.0-1.0)Properties and methods:
| Member | Returns |
|---|---|
count (property) | int — len(file_ids) + len(document_ids) |
content_type (property) | str — "images", "documents", "mixed", or "empty" |
merge_with(other) | Category — merged with deduplication |
to_dict() | dict[str, Any] |
CategorizationResult
Structured categorization output containing categories with file assignments. Frozen dataclass.
from scopix.types.data_types import CategorizationResult, Category
@dataclass(frozen=True)class CategorizationResult: categories: tuple[Category, ...] = () total_files: int = 0 clustering_method: str = "unknown" status: str = "success" # "success", "partial", "failed" error_reason: Optional[str] = None source_capability: Optional[str] = NoneMethods:
| Method | Returns |
|---|---|
merge(*results) (classmethod) | CategorizationResult — merges categories by name |
to_dict() | dict[str, Any] |
from_dict(data) (classmethod) | CategorizationResult |
AnalysisResult
Structured analysis output with findings. Frozen dataclass.
from scopix.types.data_types import AnalysisResult
@dataclass(frozen=True)class AnalysisResult: summary: str # Analysis summary findings: tuple[str, ...] = () # List of findings document_id: Optional[str] = None # Related document ID metadata: Optional[dict[str, Any]] = None # Additional metadataMethods:
| Method | Returns |
|---|---|
to_dict() | dict[str, Any] |
TextResult
Simple text output. Frozen dataclass.
from scopix.types.data_types import TextResult
@dataclass(frozen=True)class TextResult: text: str # Text content metadata: Optional[dict[str, Any]] = None # Additional metadataMethods:
| Method | Returns |
|---|---|
to_dict() | dict[str, Any] |
CrossRefResult
Cross-reference analysis output. Frozen dataclass.
from scopix.types.data_types import CrossRefResult
@dataclass(frozen=True)class CrossRefResult: relationships: tuple[dict[str, Any], ...] = () source_files: tuple[str, ...] = () target_files: tuple[str, ...] = () summary: str = "" metadata: Optional[dict[str, Any]] = NoneMethods:
| Method | Returns |
|---|---|
to_dict() | dict[str, Any] |
FolderResult
Result of folder organization operations. Frozen dataclass.
from scopix.types.data_types import FolderResult
@dataclass(frozen=True)class FolderResult: folders_created: tuple[dict[str, Any], ...] = () files_moved: int = 0 documents_moved: int = 0 videos_moved: int = 0 errors: tuple[str, ...] = () success: bool = TrueMethods:
| Method | Returns |
|---|---|
to_dict() | dict[str, Any] |
DataTypeDefinition
Definition of a data type that can flow between agents. Frozen dataclass.
from scopix.types.data_types import DataTypeDefinition
@dataclass(frozen=True)class DataTypeDefinition: name: str # Type name, e.g. "FILE_IDS" description: str = "" # Human-readable description mergeable: bool = False # Whether merge is supportedDataTypeRegistry
Mutable registry for data types with optional merge functions.
from scopix.types.data_types import DataTypeRegistry, DataTypeDefinitionfrom scopix.types.file_collection import FileCollection
registry = DataTypeRegistry()
# Register a type with a merge functionregistry.register( DataTypeDefinition(name="FILE_IDS", description="File references"), merge_fn=FileCollection.merge,)
# Register a non-mergeable typeregistry.register( DataTypeDefinition(name="TEXT", description="Text output"),)Methods:
| Method | Returns |
|---|---|
register(definition, merge_fn=None) | None |
get(name) | Optional[DataTypeDefinition] |
has_type(name) | bool |
can_merge(name) | bool |
merge(name, *values) | Any — raises ValueError if not mergeable |
list_types() | list[str] |
get_default_registry()
Factory function that returns a fresh DataTypeRegistry pre-loaded with built-in types. Each call returns a new instance so you can customize without affecting others.
from scopix.types.data_types import get_default_registry
registry = get_default_registry()registry.list_types()# ["FILE_IDS", "CATEGORIZATION", "FOLDER_RESULT", "ANALYSIS_RESULT", "TEXT", "CROSS_REF"]
registry.can_merge("FILE_IDS") # Trueregistry.can_merge("CATEGORIZATION") # Trueregistry.can_merge("TEXT") # FalseTypedSlot
A typed slot holding a value with provenance. Frozen dataclass. Used internally by ExecutionContext.
from scopix.types.context import TypedSlot
@dataclass(frozen=True)class TypedSlot: value: Any # The data (dict, list, etc.) data_type: str # Registry key, e.g. "FILE_IDS" source_node: str # Which agent produced this timestamp: Optional[str] = None # ISO 8601 (auto-set if None)ExecutionContext
Mutable, async-safe shared state container for agent workflows. Agents read inputs from and write outputs to named, typed slots.
from scopix.types.context import ExecutionContext
ctx = ExecutionContext()
# Write a slotctx.write("results", {"ids": ["img-1", "img-2"]}, "FILE_IDS", "search_agent")
# Read a slot (raises KeyError if missing, TypeError if type mismatch)data = ctx.read("results")data = ctx.read("results", expected_type="FILE_IDS")
# Check existencectx.has_slot("results") # True
# Async-safe methods for parallel executionawait ctx.write_async("results", data, "FILE_IDS", "agent")data = await ctx.read_async("results")exists = await ctx.has_slot_async("results")Methods:
| Method | Description |
|---|---|
write(slot_name, value, data_type, source_node) | Write a value to a typed slot |
read(slot_name, expected_type=None) | Read a slot value |
has_slot(slot_name) | Check if a slot exists |
write_async(...) | Async-safe write (uses asyncio.Lock) |
read_async(...) | Async-safe read |
has_slot_async(slot_name) | Async-safe existence check |
get_slot_type(slot_name) | Returns data type string or None |
get_slots_by_type(data_type) | All slot values matching a type |
list_slots() | All slots with types and sources |
slots (property) | Read-only copy of internal slots |
to_dict() | Serialize for checkpointing |
from_dict(data) (classmethod) | Restore from serialized dict |
AgentResult
Result returned by an agent after execution. Frozen dataclass.
from scopix.types.agent import AgentResult
@dataclass(frozen=True)class AgentResult: success: bool # Whether execution succeeded agent_name: str # Name of the agent capability: str # AgentCapability value outputs: Optional[dict[str, Any]] = None # slot_name -> value error: Optional[str] = None # Error message if failed summary: Optional[str] = None # Human-readable summary execution_time_ms: float = 0 # Execution duration metadata: Optional[dict[str, Any]] = None # Additional metadataMethods:
| Method | Returns |
|---|---|
get_output(slot_name, default=None) | Any |
has_output(slot_name) | bool |
to_dict() | dict[str, Any] |
Agent (Protocol)
Runtime-checkable Protocol for SDK agents. Uses structural typing (duck typing) — no inheritance required, just implement the two methods.
from scopix.types.agent import Agent
@runtime_checkableclass Agent(Protocol): def get_contract(self) -> AgentContract: ... async def execute(self, context: ExecutionContext) -> AgentResult: ...
# Any class with these methods satisfies the protocol:class MyAgent: def get_contract(self) -> AgentContract: return AgentContract( name="MyAgent", capability=AgentCapability.SEARCH, description="My search agent", )
async def execute(self, context: ExecutionContext) -> AgentResult: query = context.read("query") # ... do work ... return AgentResult(success=True, agent_name="MyAgent", capability="search")
isinstance(MyAgent(), Agent) # TrueFileRef
A reference to a single file with optional metadata. Frozen dataclass.
from scopix.types.file_collection import FileRef
@dataclass(frozen=True)class FileRef: id: str # UUID, content hash, or URI media_type: str = "application/octet-stream" # MIME type filename: Optional[str] = None # Original filename size_bytes: Optional[int] = None # File size in bytes metadata: Optional[dict[str, Any]] = None # User-defined metadataMethods:
| Method | Returns |
|---|---|
from_api_response(data) (classmethod) | FileRef |
to_dict(exclude_none=True) | dict[str, Any] |
FileCollection
Typed, immutable collection of file references with merge, filter, and slice operations. All operations return new instances.
from scopix.types.file_collection import FileCollection, FileRef
# Create from bare IDscoll = FileCollection.from_ids(["uuid-1", "uuid-2"], content_type="images")
# Create from FileRef objectscoll = FileCollection( refs=[FileRef(id="uuid-1", media_type="image/jpeg")], content_type="images",)
# Propertiescoll.refs # tuple[FileRef, ...]coll.ids # list[str]coll.count # intcoll.content_type # "images", "documents", "mixed", "unknown"coll.source_capability # Optional[str] — which capability produced this
# Iterate, index, slicefor ref in coll: print(ref.id)first = coll[0] # Returns FileRefsubset = coll[0:5] # Returns FileCollection
# Filterimages = coll.filter(lambda r: r.media_type.startswith("image/"))
# Merge with deduplication (preserves richest metadata per ID)merged = FileCollection.merge(search_results, upload_results)
# Serializedata = coll.to_dict()# {"refs": [...], "ids": [...], "count": 2, "content_type": "images"}FileResolver (Protocol)
Runtime-checkable protocol for pluggable content resolution. Implement this to connect your own storage backend (S3, local filesystem, HTTP, etc.).
from scopix.types.file_resolver import FileResolver
@runtime_checkableclass FileResolver(Protocol): async def resolve( self, ref: FileRef, hints: Optional[dict[str, Any]] = None ) -> bytes: ...
async def resolve_batch( self, refs: list[FileRef], batch_size: int = 100 ) -> AsyncIterator[tuple[FileRef, bytes]]: ...
async def resolve_metadata(self, ref: FileRef) -> FileRef: ...
async def store( self, content: bytes, media_type: str, metadata: Optional[dict[str, Any]] = None, ) -> FileRef: ...FileResolutionError
Raised when a file reference cannot be resolved to content.
from scopix.types.file_resolver import FileResolutionError
try: content = await resolver.resolve(ref)except FileResolutionError as e: print(f"Failed to resolve {ref.id}: {e}")Pipeline Types
Types for multi-step agent pipelines. See the Pipelines guide for usage.
PipelineStep
A step in a pipeline workflow. Frozen dataclass.
@dataclass(frozen=True)class PipelineStep: agent: str # Agent name for this step intent: str # Natural language intent params: dict[str, Any] = {} # Additional parameters depends_on: Optional[tuple[int, ...]] = None # Step indices this depends onStepResult
Result from a single pipeline step. Frozen dataclass.
@dataclass(frozen=True)class StepResult: agent: str # Agent that executed the step status: str # "completed", "failed", "skipped" summary: Optional[str] # Human-readable summary outputs: dict[str, Any] # Step outputs error: Optional[str] # Error message if failed execution_time_ms: int # Execution duration in milliseconds node_id: Optional[str] = None # Stable pipeline node identifierPipelineResult
Result of pipeline execution. Frozen dataclass.
@dataclass(frozen=True)class PipelineResult: success: bool # Whether execution succeeded steps: list[StepResult] # Results for each step execution_time_ms: int # Total execution duration total_waves: int # Number of topological waves executed errors: list[str] # Error messages token_usage: Optional[dict[str, int]] # Token consumption
# Methods def step(self, index: int) -> StepResult: ... # Get step result by index
# Properties @property def final(self) -> StepResult: ... # Get the last step's result
