Skip to content

Backend Python API Reference

taskbridge

taskbridge

Reliable AI Task Streaming library for FastAPI.

This package provides infrastructure for long-running AI tasks with WebSocket and HTTP polling support, event replay, and idempotency.

ActionReceiptStore

Bases: ABC

Interface for persisting and retrieving task action receipts.

Receipts are used to ensure idempotency for task actions submitted by clients.

delete_expired_receipts(expires_before) abstractmethod async

Remove receipts that have passed their expiration time.

Parameters:

Name Type Description Default
expires_before datetime

Threshold timestamp for expiration.

required

Returns:

Type Description
int

Number of deleted receipts.

get_receipt(task_id, client_action_id) abstractmethod async

Retrieve a receipt by task ID and client-provided action ID.

Parameters:

Name Type Description Default
task_id str

Unique identifier of the task.

required
client_action_id str

Idempotency key provided by the client.

required

Returns:

Type Description
TaskActionReceipt | None

The matching receipt or None if not found.

save_receipt(receipt) abstractmethod async

Save a new task action receipt.

Parameters:

Name Type Description Default
receipt TaskActionReceipt

The receipt record to persist.

required

Returns:

Type Description
TaskActionReceipt

The persisted receipt.

AllowAllUploadPolicy

Bases: UploadPolicy

Policy that unconditionally allows all file uploads.

assert_upload_allowed(auth_context, attachments) async

Unconditionally allow file uploads.

Parameters:

Name Type Description Default
auth_context AuthContext

Authenticated user context.

required
attachments list[TaskAttachment]

List of attachments to validate.

required

AlwaysReadyProbe

Bases: ReadinessProbe

Probe that always returns ready status.

check_readiness() async

Return True and empty details.

AuthContext

Bases: TaskBridgeModel

Security context derived from the authenticated user or system.

AuthContextResolver

Bases: ABC

Interface for resolving authentication context from requests.

resolve_auth_context(request_context) abstractmethod async

Resolve the security context from an incoming request.

Parameters:

Name Type Description Default
request_context object

The raw request object (e.g. FastAPI Request or WebSocket).

required

Returns:

Type Description
AuthContext

The resolved authentication context.

AuthenticationError

Bases: TaskBridgeError

Raised when request authentication fails.

CancelTaskCommand

Bases: TaskBridgeModel

Internal command to cancel a task.

CancelTaskResult

Bases: TaskBridgeModel

Result of a task cancellation request.

CancelTaskStatus

Bases: str, Enum

Outcome of a task cancellation request.

CompositeReadinessProbe

Bases: ReadinessProbe

Probe that combines multiple probes into one.

__init__(probes)

Initialize composite probe.

Parameters:

Name Type Description Default
probes dict[str, ReadinessProbe]

Dictionary of named probes to check.

required

check_readiness() async

Check all child probes and aggregate their status.

Returns:

Type Description
tuple[bool, dict[str, Any]]

Tuple of (aggregated_is_ready, component_details).

DenyAllAccessPolicy

Bases: OwnershipPolicy

Policy that unconditionally denies all task creation and access.

assert_task_access(auth_context, task) async

Unconditionally deny task access.

Parameters:

Name Type Description Default
auth_context AuthContext

Authenticated user context.

required
task TaskRecord

The task record to access.

required

Raises:

Type Description
PermissionError

Always raised.

assert_task_create(auth_context) async

Unconditionally deny task creation.

Parameters:

Name Type Description Default
auth_context AuthContext

Authenticated user context.

required

Raises:

Type Description
PermissionError

Always raised.

DeterministicFakeExecutor

Bases: TaskExecutor

Asyncio executor that emits TASK_STARTED, TASK_PROGRESS, then a terminal event.

Uses monotonic string eventId values per task ("1-0", "2-0", …) suitable for replay ordering in tests. When backed by RedisStreamEventStore, appended IDs are replaced by Redis stream IDs as usual.

Cancellation: call request_cancellation to stop after the current progress step and emit TASK_CANCELLED (registry status CANCELLED).

Failure: set fail_after_progress_emissions to emit TASK_FAILED after that many TASK_PROGRESS events (registry FAILED).

__init__(*, registry, event_store, progress_ticks=2, step_delay_seconds=0.001, fail_after_progress_emissions=None, sleep_fn=None)

Initialize deterministic fake executor.

Parameters:

Name Type Description Default
registry TaskRegistry

Task registry for status updates.

required
event_store EventStore

Event store for appending events.

required
progress_ticks int

Number of progress events to emit.

2
step_delay_seconds float

Delay between events.

0.001
fail_after_progress_emissions int | None

Emit failure after this many progress ticks.

None
sleep_fn Callable[[float], Awaitable[None]] | None

Custom sleep function (asyncio.sleep by default).

None

request_cancellation(task) async

Request cancellation of a running task.

Parameters:

Name Type Description Default
task TaskRecord

The task to cancel.

required

submit_task(task) async

Submit a task for deterministic execution.

Parameters:

Name Type Description Default
task TaskRecord

The task to run.

required

ErrorResponse

Bases: TaskBridgeModel

Standard error response payload for API endpoints.

EventStore

Bases: ABC

Abstract base class for task event storage.

append_event(event) abstractmethod async

Append an event to the store.

Parameters:

Name Type Description Default
event TaskEvent

The event to append.

required

Returns:

Type Description
TaskEvent

The appended event with its assigned event ID.

read_events_after(task_id, after_event_id, limit) abstractmethod async

Read events for a task after a specific event ID.

Parameters:

Name Type Description Default
task_id str

The ID of the task.

required
after_event_id str | None

The event ID to start after. If None, read from the beginning.

required
limit int

Maximum number of events to read.

required

Returns:

Type Description
list[TaskEvent]

List of task events.

wait_for_events(task_id, after_event_id, limit, timeout_ms) abstractmethod async

Wait for new events to be appended for a task.

Parameters:

Name Type Description Default
task_id str

The ID of the task.

required
after_event_id str | None

The event ID to start after.

required
limit int

Maximum number of events to return.

required
timeout_ms int

Maximum time to wait in milliseconds.

required

Returns:

Type Description
list[TaskEvent]

List of task events.

ExecutorReadinessProbe

Bases: ReadinessProbe

Probe for checking task executor availability.

__init__(executor_adapter, method_name='check_health')

Initialize executor probe.

Parameters:

Name Type Description Default
executor_adapter Any

Adapter instance for the task executor.

required
method_name str

Name of the health check method to call on adapter.

'check_health'

check_readiness() async

Call the executor's health check method and return status.

Returns:

Type Description
tuple[bool, dict[str, Any]]

Readiness status reported by the executor.

HealthResponse

Bases: TaskBridgeModel

Basic health check response.

HttpCancelTaskRequest

Bases: TaskBridgeModel

HTTP request body for task cancellation.

HttpRouteSettings dataclass

Settings for HTTP routes.

HttpSubmitActionRequest

Bases: TaskBridgeModel

HTTP request body for submitting an action to a suspended task.

HttpTaskCreateRequest

Bases: TaskBridgeModel

HTTP request body for task creation.

IdempotencyConflictError

Bases: TaskConflictError

Raised when an idempotent request has a mismatching payload.

InvalidRequestError

Bases: TaskBridgeError

Raised when the client request is malformed or invalid.

InvalidTaskStateError

Bases: TaskBridgeError

Raised when a task is in an unexpected state for the requested operation.

MetricsSink

Bases: ABC

Abstract interface for application metrics.

increment_counter(metric_name, value=1, *, tags=None) abstractmethod

Increment a monotonic counter.

Parameters:

Name Type Description Default
metric_name str

Name of the metric.

required
value int

Amount to increment by.

1
tags dict[str, str] | None

Optional dimensional tags.

None

set_gauge(metric_name, value, *, tags=None) abstractmethod

Set a gauge value.

Parameters:

Name Type Description Default
metric_name str

Name of the metric.

required
value float

New value for the gauge.

required
tags dict[str, str] | None

Optional dimensional tags.

None

NoOpMetricsSink

Bases: MetricsSink

Metrics sink that drops all metrics.

increment_counter(metric_name, value=1, *, tags=None)

Drop counter increment.

set_gauge(metric_name, value, *, tags=None)

Drop gauge update.

NoOpTransportDiagnosticsSink

Bases: TransportDiagnosticsSink

Transport diagnostics sink that drops all events.

OwnershipPolicy

Bases: ABC

Interface for enforcing task ownership and access rules.

assert_task_access(auth_context, task) abstractmethod async

Verify that the user is allowed to access an existing task.

Parameters:

Name Type Description Default
auth_context AuthContext

Authenticated user context.

required
task TaskRecord

The task record to access.

required

Raises:

Type Description
PermissionError

If access is denied.

assert_task_create(auth_context) abstractmethod async

Verify that the user is allowed to create a new task.

Parameters:

Name Type Description Default
auth_context AuthContext

Authenticated user context.

required

Raises:

Type Description
PermissionError

If creation is denied.

PollEventsResult

Bases: TaskBridgeModel

Result of a long-polling request for task events.

ReadinessProbe

Bases: ABC

Abstract base class for readiness checks.

check_readiness() abstractmethod async

Perform a readiness check.

Returns:

Type Description
tuple[bool, dict[str, Any]]

A tuple of (is_ready, details).

ReadinessResponse

Bases: TaskBridgeModel

Detailed readiness probe response.

RedisEventStoreSettings dataclass

Settings for Redis Stream event store.

RedisReadinessProbe

Bases: ReadinessProbe

Probe for checking Redis connectivity.

__init__(redis_client)

Initialize Redis probe.

Parameters:

Name Type Description Default
redis_client Any

Redis client instance.

required

check_readiness() async

Ping Redis and return readiness status.

Returns:

Type Description
tuple[bool, dict[str, Any]]

True if Redis responded to ping, False otherwise.

RedisStreamEventStore

Bases: EventStore

Event store implementation using Redis Streams.

__init__(redis_client, settings=None)

Initialize Redis Stream event store.

Parameters:

Name Type Description Default
redis_client Any

Redis client instance.

required
settings RedisEventStoreSettings | None

Optional storage settings.

None

append_event(event) async

Append an event to the Redis Stream.

Parameters:

Name Type Description Default
event TaskEvent

The event to append.

required

Returns:

Type Description
TaskEvent

The event with assigned Redis Stream ID.

read_events_after(task_id, after_event_id, limit) async

Read events from Redis Stream after a checkpoint.

Parameters:

Name Type Description Default
task_id str

The ID of the task.

required
after_event_id str | None

The stream ID to start after.

required
limit int

Max entries to read.

required

Returns:

Type Description
list[TaskEvent]

Deserialized task events.

stream_key(task_id)

Get the Redis key for a task's event stream.

Parameters:

Name Type Description Default
task_id str

The ID of the task.

required

Returns:

Type Description
str

The Redis key string.

wait_for_events(task_id, after_event_id, limit, timeout_ms) async

Perform a blocking read on Redis Stream for new events.

Parameters:

Name Type Description Default
task_id str

The ID of the task.

required
after_event_id str | None

The stream ID to start after.

required
limit int

Max entries to return.

required
timeout_ms int

Block timeout in milliseconds.

required

Returns:

Type Description
list[TaskEvent]

Deserialized task events.

ResumeHandoffStatus

Bases: str, Enum

Status of handing off a resumed task back to the executor.

SseStreamSettings dataclass

Settings for the Server-Sent Events (SSE) stream runtime.

StreamRuntimeSettings dataclass

Aggregated settings for all supported stream runtimes.

SubmitActionCommand

Bases: TaskBridgeModel

Internal command to submit a task action.

SubmitActionResult

Bases: TaskBridgeModel

Result of a task action submission.

SubmitActionResultStatus

Bases: str, Enum

Status of a task action submission.

SuspensionStore

Bases: ABC

Interface for managing the persistent lifecycle of task suspensions.

Suspensions represent points in task execution where the system is waiting for client input.

accept_action_if_open(task_id, suspend_id, client_action_id) abstractmethod async

Atomically transition a suspension to RESOLVED if it is currently OPEN.

Parameters:

Name Type Description Default
task_id str

Unique task identifier.

required
suspend_id str

Unique suspension identifier.

required
client_action_id str

The ID of the action that resolved the suspension.

required

Returns:

Type Description
TaskSuspensionRecord | None

The updated record if transition was successful, None otherwise.

expire_open_suspensions(expires_before) abstractmethod async

Transition OPEN suspensions to EXPIRED if they have passed their deadline.

Parameters:

Name Type Description Default
expires_before datetime

Threshold timestamp for expiration.

required

Returns:

Type Description
int

Number of suspensions that were expired.

get_suspension(task_id, suspend_id) abstractmethod async

Retrieve a specific suspension record.

Parameters:

Name Type Description Default
task_id str

Unique task identifier.

required
suspend_id str

Unique suspension identifier.

required

Returns:

Type Description
TaskSuspensionRecord

The suspension record.

Raises:

Type Description
KeyError

If the suspension is not found.

list_pending_resume_handoffs() abstractmethod async

List all resolved suspensions that are waiting for handoff.

Used by reconciliation services to recover from crashes after resolution but before handoff.

Returns:

Type Description
list[TaskSuspensionRecord]

List of suspensions with PENDING handoff status.

mark_resume_dispatched(task_id, suspend_id) abstractmethod async

Mark a resolved suspension as successfully handed off back to the executor.

Parameters:

Name Type Description Default
task_id str

Unique task identifier.

required
suspend_id str

Unique suspension identifier.

required

Returns:

Type Description
TaskSuspensionRecord

The updated record.

TaskActionReceipt

Bases: TaskBridgeModel

Idempotency record for a submitted task action.

TaskActionReceiptStatus

Bases: str, Enum

Processing status of an action receipt.

TaskActionService

Service for handling client interactions with suspended tasks.

__init__(registry, suspension_store, receipt_store, event_store, ownership_policy, resume_service=None, metrics_sink=None)

Initialize the action service.

submit_action(command) async

Validate and apply a client action to a suspended task.

Parameters:

Name Type Description Default
command SubmitActionCommand

The action submission request.

required

Returns:

Type Description
SubmitActionResult

Outcome of the submission.

Raises:

Type Description
TaskNotFoundError

If task or suspension is missing.

IdempotencyConflictError

If request conflicts with existing receipt.

TaskAlreadyTerminalError

Bases: TaskBridgeError

Raised when attempting to modify a task that has already finished.

TaskAttachment

Bases: TaskBridgeModel

Metadata and content for an attached file in task creation.

TaskBridgeError

Bases: Exception

Base class for all TaskBridge-related exceptions.

TaskCancellationService

Service for requesting task cancellation.

__init__(registry, executor, ownership_policy, metrics_sink=None)

Initialize the cancellation service.

cancel_task(command) async

Request cancellation of an active task.

Parameters:

Name Type Description Default
command CancelTaskCommand

The cancellation request.

required

Returns:

Type Description
CancelTaskResult

Result of the cancellation request.

TaskConflictError

Bases: TaskBridgeError

Raised when a task operation conflicts with the current state.

TaskCreateCommand

Bases: TaskBridgeModel

Internal command to create a new task.

TaskCreatedResult

Bases: TaskBridgeModel

Result of a task creation request.

TaskCreationService

Service for orchestrating new task creation and submission.

__init__(registry, executor, ownership_policy, metrics_sink=None)

Initialize the creation service.

create_task(command) async

Validate, persist, and submit a new task.

Parameters:

Name Type Description Default
command TaskCreateCommand

The task creation request.

required

Returns:

Type Description
TaskCreatedResult

Assigned task ID and initial status.

Raises:

Type Description
TaskOwnershipError

If user is not allowed to create tasks.

TaskSubmissionError

If submission to executor fails.

TaskEvent

Bases: TaskBridgeModel

A discrete event emitted during the lifecycle of a task.

failure property

Return typed failure data if this is a failure event.

progress property

Return typed progress data if this is a progress event.

TaskEventType

Bases: str, Enum

Types of events that can occur during task execution.

TaskExecutor

Bases: ABC

Host-provided execution adapter between TaskBridge services and a runtime.

TaskBridge core stays ignorant of Temporal, Celery, queues, or process pools. The host implements this contract and wires it into FastAPI dependencies.

Lifecycle:

  1. TaskCreationService.create_task persists the task via TaskRegistry, then calls submit_task once for that TaskRecord.
  2. The executor is responsible for driving work and emitting domain events by appending TaskEvent instances through the host's EventStore (typically progress, completion, failure, cancellation outcomes).
  3. The executor should transition TaskRecord.status via TaskRegistry in line with those events (e.g. RUNNING after start, COMPLETED / FAILED / CANCELLED on terminals).
  4. request_cancellation is invoked after TaskRegistry.request_cancellation; implementations should cooperate asynchronously (signal a worker, cancel a workflow, set a flag checked by the task loop, etc.).

Hooks (conceptual mapping for adapters):

  • Submission: submit_task
  • Progress: emit TASK_PROGRESS (and optionally TASK_MESSAGE) via EventStore
  • Completion: registry terminal status + TASK_COMPLETED event
  • Failure: registry FAILED + TASK_FAILED event
  • Cancel: request_cancellation then terminal CANCELLED + TASK_CANCELLED

Implementations must not block the HTTP stack indefinitely; offload long work to background tasks, workers, or workflows owned by the host.

request_cancellation(task) abstractmethod async

Request cooperative cancellation for a task already marked for cancel in the registry.

submit_task(task) abstractmethod async

Accept a newly created task and begin asynchronous execution.

TaskNotFoundError

Bases: TaskBridgeError

Raised when a requested task does not exist or is inaccessible.

TaskOwnershipError

Bases: TaskBridgeError

Raised when a user attempts to create a task they are not allowed to own.

Note: Read/cancel/subscribe operations use TaskNotFoundError instead of this exception to avoid leaking task existence via enumeration.

TaskPollingService

Service for long-polling task events.

__init__(registry, event_store, ownership_policy, metrics_sink=None)

Initialize the polling service.

poll_events(task_id, auth_context, after_event_id, limit, wait_timeout_ms=None) async

Poll for new events for a specific task.

Parameters:

Name Type Description Default
task_id str

Unique task identifier.

required
auth_context AuthContext

Authenticated user context.

required
after_event_id str | None

Resume marker for events.

required
limit int

Maximum number of events to return.

required
wait_timeout_ms int | None

Optional blocking wait time for new events.

None

Returns:

Type Description
PollEventsResult

Batch of events and the next resume marker.

TaskRecord

Bases: TaskBridgeModel

Persistent state of a task in the registry.

from_command(task_id, command) classmethod

Create a new record instance from a creation command.

Parameters:

Name Type Description Default
task_id str

Assigned unique identifier for the task.

required
command TaskCreateCommand

The command containing task input and metadata.

required

Returns:

Type Description
'TaskRecord'

A new TaskRecord initialized with status ACCEPTED.

TaskRegistry

Bases: ABC

Interface for managing the persistent state of tasks.

The registry is the source of truth for task existence, status, and ownership.

create_task(task) abstractmethod async

Persist a new task record.

Parameters:

Name Type Description Default
task TaskRecord

Initial task state to persist.

required

Returns:

Type Description
TaskRecord

The persisted task record.

get_task(task_id) abstractmethod async

Retrieve a task by its unique ID.

Parameters:

Name Type Description Default
task_id str

Unique task identifier.

required

Returns:

Type Description
TaskRecord

The task record.

Raises:

Type Description
KeyError

If task does not exist.

get_task_by_client_request_id(owner_id, client_request_id) abstractmethod async

Lookup task by user ID and their client-provided request ID.

Used for task creation idempotency.

Parameters:

Name Type Description Default
owner_id str

Subject ID of the task owner.

required
client_request_id str

Idempotency key provided by the client.

required

Returns:

Type Description
TaskRecord | None

Matching task record or None.

request_cancellation(task_id) abstractmethod async

Mark a task as cancellation requested.

Parameters:

Name Type Description Default
task_id str

Unique task identifier.

required

Returns:

Type Description
TaskRecord

The updated task record.

same_idempotency_payload(task, command) abstractmethod async

Compare a stored task with a new creation command for idempotency.

Parameters:

Name Type Description Default
task TaskRecord

Existing task record.

required
command TaskCreateCommand

New task creation request.

required

Returns:

Type Description
bool

True if payloads match sufficiently to be considered identical.

update_task_status(task_id, status) abstractmethod async

Update the current status of a task.

Parameters:

Name Type Description Default
task_id str

Unique task identifier.

required
status TaskStatus

New status to apply.

required

Returns:

Type Description
TaskRecord

The updated task record.

TaskResumeReconciliationService

Service to retry pending task resumes that failed to dispatch.

__init__(receipt_store, suspension_store, resume_service=None)

Initialize the reconciliation service.

reconcile_pending_resumes() async

Find and retry all task suspensions in PENDING handoff state.

Returns:

Type Description
int

Number of successfully reconciled resumes.

TaskResumeService

Service for resuming tasks after client interaction.

resume_task(command, suspension) async

Host hook for resuming a suspended task after a durable action accept.

Parameters:

Name Type Description Default
command SubmitActionCommand

The validated action submission command.

required
suspension Any

The suspension record that was resolved.

required

TaskRetentionService

Service for pruning expired task state and receipts.

__init__(receipt_store, suspension_store)

Initialize the retention service.

prune_expired_state(*, now=None) async

Delete expired action receipts and expire open suspensions.

Parameters:

Name Type Description Default
now datetime | None

Optional override for current time.

None

Returns:

Type Description
dict[str, int]

Dictionary with counts of pruned records.

TaskStatus

Bases: str, Enum

Current state of a task in the system lifecycle.

is_terminal property

Return True if the status represents a final, unchangeable state.

TaskSubmissionError

Bases: TaskBridgeError

Raised when submitting a task to the executor fails.

TaskSuspensionRecord

Bases: TaskBridgeModel

Persistent record of a task suspension waiting for client interaction.

TaskSuspensionStatus

Bases: str, Enum

Lifecycle status of a task suspension.

TransportDiagnosticsSink

Sink for low-level transport events (heartbeats, delivery, etc.).

on_delivery(transport, task_id, event_id)

Record successful event delivery.

on_disconnect(transport, task_id)

Record client disconnection.

on_heartbeat(transport, task_id)

Record a heartbeat sent to client.

on_replay_start(transport, task_id, after_event_id)

Record start of an event replay.

UploadPolicy

Bases: ABC

Interface for validating file uploads for tasks.

assert_upload_allowed(auth_context, attachments) abstractmethod async

Verify that the requested file uploads are allowed.

Parameters:

Name Type Description Default
auth_context AuthContext

Authenticated user context.

required
attachments list[TaskAttachment]

List of attachments to validate.

required

Raises:

Type Description
PermissionError

If upload is denied.

UploadValidationError

Bases: TaskBridgeError

Raised when an uploaded file fails validation.

WebSocketHeartbeat

Bases: TaskBridgeModel

Periodic heartbeat message sent over WebSocket.

WebSocketRouteSettings dataclass

Settings for WebSocket routes.

WebSocketStreamSettings dataclass

Settings for the WebSocket stream runtime.

WebSocketSubscribeRequest

Bases: TaskBridgeModel

Client request to subscribe to task events via WebSocket.

WebSocketSubscriptionConfirmed

Bases: TaskBridgeModel

Server confirmation of a WebSocket subscription.

WebSocketSubscriptionService

Service for managing WebSocket-based event streaming.

__init__(registry, event_store, ownership_policy, metrics_sink=None)

Initialize the WebSocket service.

prepare_subscription(task_id, auth_context, last_event_id, limit) async

Validate ownership and read replay events for a new subscription.

Parameters:

Name Type Description Default
task_id str

Unique task identifier.

required
auth_context AuthContext

Authenticated user context.

required
last_event_id str | None

Client's last seen event ID for replay.

required
limit int

Maximum replay batch size.

required

Returns:

Type Description
PollEventsResult

Initial set of replay events.

wait_for_live_events(task_id, after_event_id, limit, timeout_ms) async

Block until new live events are available for delivery.

Parameters:

Name Type Description Default
task_id str

Unique task identifier.

required
after_event_id str | None

Marker after which to read new events.

required
limit int

Maximum number of events to return.

required
timeout_ms int

Maximum blocking time.

required

Returns:

Type Description
list[TaskEvent]

Batch of new live events.

build_http_router(settings=None)

Build and return an APIRouter with TaskBridge HTTP routes.

Parameters:

Name Type Description Default
settings HttpRouteSettings | None

Optional HTTP route settings.

None

Returns:

Type Description
APIRouter

APIRouter configured with TaskBridge routes.

build_sse_stream_response(event_iterator, settings)

Wrap an event generator in a FastAPI StreamingResponse.

Parameters:

Name Type Description Default
event_iterator AsyncIterator[str]

The generator yielding SSE messages.

required
settings SseStreamSettings

Configuration providing necessary HTTP headers.

required

Returns:

Type Description
StreamingResponse

A StreamingResponse configured for text/event-stream.

build_ws_router(settings=None)

Build and return an APIRouter with TaskBridge WebSocket routes.

Parameters:

Name Type Description Default
settings WebSocketRouteSettings | None

Optional WebSocket route settings.

None

Returns:

Type Description
APIRouter

APIRouter configured with TaskBridge WebSocket routes.

ensure_transition(current, new)

Validate that a task can transition from one status to another.

Parameters:

Name Type Description Default
current TaskStatus

The existing status of the task.

required
new TaskStatus

The requested new status.

required

Raises:

Type Description
InvalidTaskStateError

If the transition is not allowed by lifecycle rules.

get_auth_context_resolver()

Get the auth context resolver.

get_metrics_sink()

Get the metrics sink.

get_ownership_policy()

Provide ownership policy for host apps that prefer DI over manual wiring.

get_readiness_probe()

Get the readiness probe.

get_task_action_service()

Get the task action service.

get_task_cancellation_service()

Get the task cancellation service.

get_task_creation_service()

Get the task creation service.

get_task_polling_service()

Get the task polling service.

get_upload_policy()

Get the upload policy.

get_websocket_subscription_service()

Get the websocket subscription service.

install_http_exception_handlers(app)

Register exception handlers for TaskBridge errors on a FastAPI app.

Parameters:

Name Type Description Default
app FastAPI

The FastAPI application instance.

required

resolve_http_auth_context(request, resolver=AUTH_CONTEXT_RESOLVER) async

Resolve authentication context for HTTP request.

resolve_ws_auth_context(websocket, resolver=AUTH_CONTEXT_RESOLVER) async

Resolve authentication context for WebSocket connection.

same_idempotency_payload(task, command)

Check if a new creation request matches an existing task's payload.

Used to detect conflicting idempotency keys.

Parameters:

Name Type Description Default
task TaskRecord

Existing task record.

required
command TaskCreateCommand

New creation request.

required

Returns:

Type Description
bool

True if payloads are considered identical for idempotency purposes.

sse_event_generator(*, request, task_id, last_event_id, auth_context, service, settings, prepared_replay=None, diagnostics=None) async

Generate a sequence of formatted SSE messages.

Handles initial replay of missed events followed by a live subscription loop.

Parameters:

Name Type Description Default
request Request

The incoming FastAPI request (used to detect disconnection).

required
task_id str

Unique task identifier.

required
last_event_id str | None

Resume marker provided by the client.

required
auth_context AuthContext

Authenticated user context.

required
service WebSocketSubscriptionService

Service instance providing event retrieval logic.

required
settings SseStreamSettings

Configuration for the SSE stream.

required
prepared_replay PollEventsResult | None

Pre-fetched events to avoid redundant DB reads.

None
diagnostics TransportDiagnosticsSink | None

Sink for transport-level metrics and tracing.

None

Yields:

Type Description
AsyncIterator[str]

Formatted SSE messages as strings.