Documentation

CloudStorageResource

Access via client.cloud_storage

Auth & Connections

initiate_auth()

Initiate OAuth flow for a cloud storage provider

async def initiate_auth(
provider: str, # e.g. "google_drive"
*,
redirect_uri: Optional[str] = None, # Custom OAuth callback URI
) -> InitiateAuthResult
Returns: InitiateAuthResult - Contains authorization_url and state for CSRF verification

complete_auth()

Complete OAuth flow after user grants permissions

async def complete_auth(
provider: str, # e.g. "google_drive"
*,
code: str, # Authorization code from callback
state: str, # State parameter for CSRF verification
redirect_uri: Optional[str] = None, # Must match initiate_auth redirect_uri
) -> CompleteAuthResult
Returns: CompleteAuthResult - Contains connection and is_new flag

list_connections()

List cloud storage connections

async def list_connections(
*,
provider: Optional[str] = None, # Filter by provider type
active_only: bool = True, # Only return active connections
) -> ConnectionList

disconnect()

Disconnect a cloud storage account

async def disconnect(connection_id: str) -> DisconnectResult

Import & Export

start_import()

Start importing files from cloud storage

async def start_import(
connection_id: str,
files: list[Union[CloudFileInput, dict[str, Any]]],
*,
auto_describe: bool = True, # Auto-generate AI descriptions
tags: Optional[list[str]] = None, # Tags to apply to imported files
collection_id: Optional[str] = None, # Target collection
) -> ImportResult
Returns: ImportResult - Contains job_id for async imports or image_ids for sync imports

start_export()

Start exporting files to cloud storage

async def start_export(
connection_id: str,
image_ids: list[str],
*,
folder_id: Optional[str] = None, # Target folder ID in cloud storage
folder_name: Optional[str] = None, # Create a new folder with this name
) -> ExportResult
Returns: ExportResult

Job Tracking

get_job()

Get the status of a cloud storage job

async def get_job(job_id: str) -> CloudStorageJob
Returns: CloudStorageJob - Current status and progress

wait_for_job()

Poll until a job reaches a terminal state

async def wait_for_job(
job_id: str,
*,
timeout: Optional[float] = None, # Default: config polling_timeout
poll_interval: Optional[float] = None, # Default: config polling_interval
on_progress: Optional[Callable[[CloudStorageJobProgressEvent], None]] = None,
) -> CloudStorageJob
Returns: CloudStorageJob - Final job status

Error Behavior

Raises CloudStorageError if the job fails completely. Returns normally for partial status (some files succeeded, some failed). Raises AionvisionTimeoutError if the timeout is exceeded.

Convenience Methods

import_and_wait()

Import files and wait for completion (combines start_import + wait_for_job)

async def import_and_wait(
connection_id: str,
files: list[Union[CloudFileInput, dict[str, Any]]],
*,
auto_describe: bool = True,
tags: Optional[list[str]] = None,
collection_id: Optional[str] = None,
timeout: Optional[float] = None,
poll_interval: Optional[float] = None,
on_progress: Optional[Callable[[CloudStorageJobProgressEvent], None]] = None,
) -> CloudStorageJob
Returns: CloudStorageJob - Final job status

export_and_wait()

Export files and wait for completion (combines start_export + wait_for_job)

async def export_and_wait(
connection_id: str,
image_ids: list[str],
*,
folder_id: Optional[str] = None,
folder_name: Optional[str] = None,
timeout: Optional[float] = None,
poll_interval: Optional[float] = None,
on_progress: Optional[Callable[[CloudStorageJobProgressEvent], None]] = None,
) -> CloudStorageJob
Returns: CloudStorageJob - Final job status