Documentation
DocumentsResource
Access via client.documents
High-Level Upload
upload_one()
Upload a single document with automatic text extraction (preferred for single files)
async def upload_one( file: Union[str, Path, bytes], *, filename: Optional[str] = None, # Required if file is bytes wait_for_processing: bool = True, raise_on_failure: bool = True, processing_timeout: Optional[float] = None, storage_target: str = "default", # "default" | "custom") -> DocumentUploadResultupload()
Upload one or more documents (files, directories, or bytes). Directories auto-expand to supported types (.pdf, .docx, .txt, .md)
async def upload( files: Union[str, Path, bytes, list[Union[str, Path, bytes]]], *, filename: Optional[str] = None, filenames: Optional[list[str]] = None, recursive: bool = True, include_hidden: bool = False, wait_for_processing: bool = True, raise_on_failure: bool = True, processing_timeout: Optional[float] = None, on_progress: Optional[Callable[[DocumentUploadProgressEvent], None]] = None, on_file_complete: Optional[Callable[[DocumentFileCompleteEvent], None]] = None, on_processing_progress: Optional[Callable[[DocumentProcessingProgressEvent], None]] = None, on_processing_failed: Optional[Callable[[DocumentProcessingFailedEvent], None]] = None, storage_target: str = "default",) -> BatchDocumentUploadResultsRetrieval
list()
List documents with pagination and optional status filtering
async def list( *, page: int = 1, # 1-based page number page_size: int = 20, # 1-100 status_filter: Optional[str] = None, # "pending" | "processing" | "completed" | "failed") -> DocumentListlist_all()
Auto-paginating async iterator that yields all documents
async def list_all( *, page_size: int = 50, status_filter: Optional[str] = None,) -> AsyncIterator[DocumentItem]get()
Get detailed information about a document
async def get(document_id: str) -> DocumentDetailsget_text()
Get full extracted text from a document
async def get_text(document_id: str) -> strget_chunks()
Get all text chunks for a document (used for semantic search)
async def get_chunks( document_id: str, *, include_embeddings: bool = False,) -> DocumentChunksResponsedownload()
Get download URL for a document
async def download(document_id: str) -> strSearch
search()
Search documents using semantic similarity
async def search( query: str, *, limit: int = 20, # 1-100 similarity_threshold: float = 0.3, # 0.0-1.0 document_ids: Optional[list[str]] = None, # Scope to specific documents) -> DocumentSearchResponseDeletion
delete()
Delete a document
async def delete(document_id: str) -> Nonebatch_delete()
Delete multiple documents in one operation (max 100, no duplicates)
async def batch_delete(document_ids: list[str]) -> DocumentBatchDeleteResponseUpload Workflow (Low-Level)
request_upload()
Request a presigned URL for document upload (step 1 of 3)
async def request_upload( filename: str, content_type: str, # application/pdf, application/vnd.openxmlformats-officedocument.wordprocessingml.document, text/plain, text/markdown size_bytes: int, # Max 50MB *, storage_target: str = "default", # "default" | "custom") -> DocumentPresignedUploadResultconfirm_upload()
Confirm a document upload after uploading to S3 (step 3 of 3)
async def confirm_upload( object_key: str, size_bytes: int, content_type: str, *, checksum: Optional[str] = None,) -> DocumentConfirmResultget_status()
Get document processing status (for polling after upload)
async def get_status(document_id: str) -> DocumentStatusResultquota_check()
Check if upload quota allows for new document uploads
async def quota_check(file_count: int = 1) -> DocumentQuotaCheckwait_for_processing()
Poll until document text extraction completes or fails
async def wait_for_processing( document_id: str, *, timeout: Optional[float] = None, # Default: 600s poll_interval: Optional[float] = None, # Default: from config) -> DocumentStatusResultBatch Upload (Low-Level)
batch_prepare()
Prepare batch upload for multiple documents (max 100)
async def batch_prepare( files: list[dict[str, Any]], # [{filename, content_type, size_bytes}, ...]) -> DocumentBatchPrepareResultbatch_confirm()
Confirm batch upload after uploading files to S3
async def batch_confirm( batch_id: str, confirmations: list[dict[str, Any]], # [{object_key, size_bytes, content_type, checksum?}, ...]) -> DocumentBatchConfirmResultbatch_status()
Get processing status for a batch of documents
async def batch_status(batch_id: str) -> DocumentBatchStatusResult