Documentation
CloudStorageResource
Access via client.cloud_storage
Auth & Connections
initiate_auth()
Initiate OAuth flow for a cloud storage provider
async def initiate_auth( provider: str, # e.g. "google_drive" *, redirect_uri: Optional[str] = None, # Custom OAuth callback URI) -> InitiateAuthResultReturns: InitiateAuthResult - Contains authorization_url and state for CSRF verification
complete_auth()
Complete OAuth flow after user grants permissions
async def complete_auth( provider: str, # e.g. "google_drive" *, code: str, # Authorization code from callback state: str, # State parameter for CSRF verification redirect_uri: Optional[str] = None, # Must match initiate_auth redirect_uri) -> CompleteAuthResultReturns: CompleteAuthResult - Contains connection and is_new flag
list_connections()
List cloud storage connections
async def list_connections( *, provider: Optional[str] = None, # Filter by provider type active_only: bool = True, # Only return active connections) -> ConnectionListReturns: ConnectionList
disconnect()
Disconnect a cloud storage account
async def disconnect(connection_id: str) -> DisconnectResultReturns: DisconnectResult
Import & Export
start_import()
Start importing files from cloud storage
async def start_import( connection_id: str, files: list[Union[CloudFileInput, dict[str, Any]]], *, auto_describe: bool = True, # Auto-generate AI descriptions tags: Optional[list[str]] = None, # Tags to apply to imported files collection_id: Optional[str] = None, # Target collection) -> ImportResultReturns: ImportResult - Contains job_id for async imports or image_ids for sync imports
start_export()
Start exporting files to cloud storage
async def start_export( connection_id: str, image_ids: list[str], *, folder_id: Optional[str] = None, # Target folder ID in cloud storage folder_name: Optional[str] = None, # Create a new folder with this name) -> ExportResultReturns: ExportResult
Job Tracking
get_job()
Get the status of a cloud storage job
async def get_job(job_id: str) -> CloudStorageJobReturns: CloudStorageJob - Current status and progress
wait_for_job()
Poll until a job reaches a terminal state
async def wait_for_job( job_id: str, *, timeout: Optional[float] = None, # Default: config polling_timeout poll_interval: Optional[float] = None, # Default: config polling_interval on_progress: Optional[Callable[[CloudStorageJobProgressEvent], None]] = None,) -> CloudStorageJobReturns: CloudStorageJob - Final job status
Error Behavior
Raises CloudStorageError if the job fails completely. Returns normally for partial status (some files succeeded, some failed). Raises AionvisionTimeoutError if the timeout is exceeded.
Convenience Methods
import_and_wait()
Import files and wait for completion (combines start_import + wait_for_job)
async def import_and_wait( connection_id: str, files: list[Union[CloudFileInput, dict[str, Any]]], *, auto_describe: bool = True, tags: Optional[list[str]] = None, collection_id: Optional[str] = None, timeout: Optional[float] = None, poll_interval: Optional[float] = None, on_progress: Optional[Callable[[CloudStorageJobProgressEvent], None]] = None,) -> CloudStorageJobReturns: CloudStorageJob - Final job status
export_and_wait()
Export files and wait for completion (combines start_export + wait_for_job)
async def export_and_wait( connection_id: str, image_ids: list[str], *, folder_id: Optional[str] = None, folder_name: Optional[str] = None, timeout: Optional[float] = None, poll_interval: Optional[float] = None, on_progress: Optional[Callable[[CloudStorageJobProgressEvent], None]] = None,) -> CloudStorageJobReturns: CloudStorageJob - Final job status