diff --git a/docs/api/cli.md b/docs/api/cli.md index 66e722f..fef3fcb 100644 --- a/docs/api/cli.md +++ b/docs/api/cli.md @@ -151,7 +151,7 @@ The server exposes three routes: taskito scaler --app myapp:queue --port 9091 --target-queue-depth 5 ``` -See the [KEDA Integration guide](../guide/keda.md) for Kubernetes deploy templates. +See the [KEDA Integration guide](../guide/operations/keda.md) for Kubernetes deploy templates. ## Error Messages diff --git a/docs/api/index.md b/docs/api/index.md new file mode 100644 index 0000000..c78a4ab --- /dev/null +++ b/docs/api/index.md @@ -0,0 +1,13 @@ +# API Reference + +Complete Python API reference for all public classes and methods. + +| Class | Description | +|-------|-------------| +| [Queue](queue/index.md) | Central orchestrator — task registration, enqueue, workers, and all queue operations | +| [TaskWrapper](task.md) | Handle returned by `@queue.task()` — `delay()`, `apply_async()`, `map()`, signatures | +| [JobResult](result.md) | Handle for an enqueued job — status polling, result retrieval, dependencies | +| [JobContext](context.md) | Runtime context inside a running task — job ID, retry count, progress updates | +| [Canvas](canvas.md) | Workflow primitives — `Signature`, `chain`, `group`, `chord` | +| [Testing](testing.md) | Test mode, `TestResult`, `MockResource` for unit testing tasks | +| [CLI](cli.md) | `taskito` command-line interface — `worker`, `info`, `scaler` | diff --git a/docs/api/queue.md b/docs/api/queue.md deleted file mode 100644 index aea2f2e..0000000 --- a/docs/api/queue.md +++ /dev/null @@ -1,820 +0,0 @@ -# Queue - -::: taskito.app.Queue - -The central class for creating and managing a task queue. - -## Constructor - -```python -Queue( - db_path: str = ".taskito/taskito.db", - workers: int = 0, - default_retry: int = 3, - default_timeout: int = 300, - default_priority: int = 0, - result_ttl: int | None = None, - middleware: list[TaskMiddleware] | None = None, - drain_timeout: int = 30, - interception: str = "off", - max_intercept_depth: int = 10, - recipe_signing_key: str | None = None, - max_reconstruction_timeout: int = 10, - file_path_allowlist: list[str] | None = None, - disabled_proxies: list[str] | None = None, - async_concurrency: int = 100, - event_workers: int = 4, - scheduler_poll_interval_ms: int = 50, - scheduler_reap_interval: int = 100, - scheduler_cleanup_interval: int = 1200, - namespace: str | None = None, -) -``` - -| Parameter | Type | Default | Description | -|---|---|---|---| -| `db_path` | `str` | `".taskito/taskito.db"` | Path to SQLite database file. Parent directories are created automatically. | -| `workers` | `int` | `0` | Number of worker threads (`0` = auto-detect CPU count) | -| `default_retry` | `int` | `3` | Default max retry attempts for tasks | -| `default_timeout` | `int` | `300` | Default task timeout in seconds | -| `default_priority` | `int` | `0` | Default task priority (higher = more urgent) | -| `result_ttl` | `int \| None` | `None` | Auto-cleanup completed/dead jobs older than this many seconds. `None` disables. | -| `middleware` | `list[TaskMiddleware] \| None` | `None` | Queue-level middleware applied to all tasks. | -| `drain_timeout` | `int` | `30` | Seconds to wait for in-flight tasks during graceful shutdown. | -| `interception` | `str` | `"off"` | Argument interception mode: `"strict"`, `"lenient"`, or `"off"`. See [Resource System](../guide/resources.md#layer-1-argument-interception). | -| `max_intercept_depth` | `int` | `10` | Max recursion depth for argument walking. | -| `recipe_signing_key` | `str \| None` | `None` | HMAC-SHA256 key for proxy recipe integrity. Falls back to `TASKITO_RECIPE_SECRET` env var. | -| `max_reconstruction_timeout` | `int` | `10` | Max seconds allowed for proxy reconstruction. | -| `file_path_allowlist` | `list[str] \| None` | `None` | Allowed file path prefixes for the file proxy handler. | -| `disabled_proxies` | `list[str] \| None` | `None` | Handler names to skip when registering built-in proxy handlers. | -| `async_concurrency` | `int` | `100` | Maximum number of `async def` tasks running concurrently on the native async executor. | -| `event_workers` | `int` | `4` | Thread pool size for the event bus. Increase for high event volume. | -| `scheduler_poll_interval_ms` | `int` | `50` | Milliseconds between scheduler poll cycles. Lower values improve scheduling precision at the cost of CPU. | -| `scheduler_reap_interval` | `int` | `100` | Reap stale/timed-out jobs every N poll cycles. | -| `scheduler_cleanup_interval` | `int` | `1200` | Clean up old completed jobs every N poll cycles. | -| `namespace` | `str \| None` | `None` | Namespace for multi-tenant isolation. Jobs enqueued on this queue carry this namespace; workers only dequeue matching jobs. `None` means no namespace (default). | - -## Task Registration - -### `@queue.task()` - -```python -@queue.task( - name: str | None = None, - max_retries: int = 3, - retry_backoff: float = 1.0, - retry_delays: list[float] | None = None, - max_retry_delay: int | None = None, - timeout: int = 300, - soft_timeout: float | None = None, - priority: int = 0, - rate_limit: str | None = None, - queue: str = "default", - circuit_breaker: dict | None = None, - middleware: list[TaskMiddleware] | None = None, - expires: float | None = None, - inject: list[str] | None = None, - serializer: Serializer | None = None, - max_concurrent: int | None = None, -) -> TaskWrapper -``` - -Register a function as a background task. Returns a [`TaskWrapper`](task.md). - -| Parameter | Type | Default | Description | -|---|---|---|---| -| `name` | `str \| None` | Auto-generated | Explicit task name. Defaults to `module.qualname`. | -| `max_retries` | `int` | `3` | Max retry attempts before moving to DLQ. | -| `retry_backoff` | `float` | `1.0` | Base delay in seconds for exponential backoff. | -| `retry_delays` | `list[float] \| None` | `None` | Per-attempt delays in seconds, overrides backoff. e.g. `[1, 5, 30]`. | -| `max_retry_delay` | `int \| None` | `None` | Cap on backoff delay in seconds. Defaults to 300 s. | -| `timeout` | `int` | `300` | Hard execution time limit in seconds. | -| `soft_timeout` | `float \| None` | `None` | Cooperative time limit checked via `current_job.check_timeout()`. | -| `priority` | `int` | `0` | Default priority (higher = more urgent). | -| `rate_limit` | `str \| None` | `None` | Rate limit string, e.g. `"100/m"`. | -| `queue` | `str` | `"default"` | Named queue to submit to. | -| `circuit_breaker` | `dict \| None` | `None` | Circuit breaker config: `{"threshold": 5, "window": 60, "cooldown": 120}`. | -| `middleware` | `list[TaskMiddleware] \| None` | `None` | Per-task middleware, applied in addition to queue-level middleware. | -| `expires` | `float \| None` | `None` | Seconds until the job expires if not started. | -| `inject` | `list[str] \| None` | `None` | Resource names to inject as keyword arguments. See [Resource System](../guide/resources.md). | -| `serializer` | `Serializer \| None` | `None` | Per-task serializer override. Falls back to queue-level serializer. | -| `max_concurrent` | `int \| None` | `None` | Max concurrent running instances. `None` = no limit. | - -### `@queue.periodic()` - -```python -@queue.periodic( - cron: str, - name: str | None = None, - args: tuple = (), - kwargs: dict | None = None, - queue: str = "default", - timezone: str | None = None, -) -> TaskWrapper -``` - -Register a periodic (cron-scheduled) task. Uses 6-field cron expressions with seconds. - -| Parameter | Type | Default | Description | -|---|---|---|---| -| `cron` | `str` | — | 6-field cron expression (seconds precision). | -| `name` | `str \| None` | Auto-generated | Explicit task name. | -| `args` | `tuple` | `()` | Positional arguments passed to the task on each run. | -| `kwargs` | `dict \| None` | `None` | Keyword arguments passed to the task on each run. | -| `queue` | `str` | `"default"` | Named queue to submit to. | -| `timezone` | `str \| None` | `None` | IANA timezone name (e.g. `"America/New_York"`). Defaults to UTC. | - -## Enqueue Methods - -### `queue.enqueue()` - -```python -queue.enqueue( - task_name: str, - args: tuple = (), - kwargs: dict | None = None, - priority: int | None = None, - delay: float | None = None, - queue: str | None = None, - max_retries: int | None = None, - timeout: int | None = None, - unique_key: str | None = None, - metadata: str | None = None, - depends_on: str | list[str] | None = None, -) -> JobResult -``` - -Enqueue a task for execution. Returns a [`JobResult`](result.md) handle. - -| Parameter | Type | Default | Description | -|---|---|---|---| -| `depends_on` | `str \| list[str] \| None` | `None` | Job ID(s) this job depends on. See [Dependencies](../guide/dependencies.md). | - -### `queue.enqueue_many()` - -```python -queue.enqueue_many( - task_name: str, - args_list: list[tuple], - kwargs_list: list[dict] | None = None, - priority: int | None = None, - queue: str | None = None, - max_retries: int | None = None, - timeout: int | None = None, - delay: float | None = None, - delay_list: list[float | None] | None = None, - unique_keys: list[str | None] | None = None, - metadata: str | None = None, - metadata_list: list[str | None] | None = None, - expires: float | None = None, - expires_list: list[float | None] | None = None, - result_ttl: int | None = None, - result_ttl_list: list[int | None] | None = None, -) -> list[JobResult] -``` - -Enqueue multiple jobs in a single transaction for high throughput. Supports both uniform parameters (applied to all jobs) and per-job lists. - -| Parameter | Type | Default | Description | -|---|---|---|---| -| `delay` | `float \| None` | `None` | Uniform delay in seconds for all jobs | -| `delay_list` | `list[float \| None] \| None` | `None` | Per-job delays in seconds | -| `unique_keys` | `list[str \| None] \| None` | `None` | Per-job deduplication keys | -| `metadata` | `str \| None` | `None` | Uniform metadata JSON for all jobs | -| `metadata_list` | `list[str \| None] \| None` | `None` | Per-job metadata JSON | -| `expires` | `float \| None` | `None` | Uniform expiry in seconds for all jobs | -| `expires_list` | `list[float \| None] \| None` | `None` | Per-job expiry in seconds | -| `result_ttl` | `int \| None` | `None` | Uniform result TTL in seconds | -| `result_ttl_list` | `list[int \| None] \| None` | `None` | Per-job result TTL in seconds | - -Per-job lists (`*_list`) take precedence over uniform values when both are provided. - -## Job Management - -### `queue.get_job()` - -```python -queue.get_job(job_id: str) -> JobResult | None -``` - -Retrieve a job by ID. Returns `None` if not found. - -### `queue.list_jobs()` - -```python -queue.list_jobs( - status: str | None = None, - queue: str | None = None, - task_name: str | None = None, - limit: int = 50, - offset: int = 0, - namespace: str | None = _UNSET, -) -> list[JobResult] -``` - -List jobs with optional filters. Returns newest first. Defaults to the queue's namespace — pass `namespace=None` to see all namespaces. - -| Parameter | Type | Default | Description | -|---|---|---|---| -| `status` | `str \| None` | `None` | Filter by status: `pending`, `running`, `completed`, `failed`, `dead`, `cancelled` | -| `queue` | `str \| None` | `None` | Filter by queue name | -| `task_name` | `str \| None` | `None` | Filter by task name | -| `limit` | `int` | `50` | Maximum results to return | -| `offset` | `int` | `0` | Pagination offset | - -### `queue.list_jobs_filtered()` - -```python -queue.list_jobs_filtered( - status: str | None = None, - queue: str | None = None, - task_name: str | None = None, - metadata_like: str | None = None, - error_like: str | None = None, - created_after: float | None = None, - created_before: float | None = None, - limit: int = 50, - offset: int = 0, - namespace: str | None = _UNSET, -) -> list[JobResult] -``` - -Extended filtering with metadata and error pattern matching and time range constraints. Defaults to the queue's namespace — pass `namespace=None` to see all namespaces. - -### `queue.cancel_job()` - -```python -queue.cancel_job(job_id: str) -> bool -``` - -Cancel a pending job. Returns `True` if cancelled, `False` if not pending. Cascade-cancels dependents. - -### `queue.update_progress()` - -```python -queue.update_progress(job_id: str, progress: int) -> None -``` - -Update progress for a running job (0–100). - -### `queue.job_errors()` - -```python -queue.job_errors(job_id: str) -> list[dict] -``` - -Get error history for a job. Returns a list of dicts with `id`, `job_id`, `attempt`, `error`, `failed_at`. - -### `queue.job_dag()` - -```python -queue.job_dag(job_id: str) -> dict -``` - -Return a dependency graph for a job, including all ancestors and descendants. Useful for visualizing workflow chains. - -## Queue Management - -### `queue.set_queue_rate_limit()` - -```python -queue.set_queue_rate_limit(queue_name: str, rate_limit: str) -> None -``` - -Set a rate limit for all jobs in a queue. Checked by the scheduler before per-task rate limits. - -| Parameter | Type | Description | -|---|---|---| -| `queue_name` | `str` | Queue name (e.g. `"default"`). | -| `rate_limit` | `str` | Rate limit string: `"N/s"`, `"N/m"`, or `"N/h"`. | - -### `queue.set_queue_concurrency()` - -```python -queue.set_queue_concurrency(queue_name: str, max_concurrent: int) -> None -``` - -Set a maximum number of concurrently running jobs for a queue across all workers. Checked by the scheduler before per-task `max_concurrent` limits. - -| Parameter | Type | Description | -|---|---|---| -| `queue_name` | `str` | Queue name (e.g. `"default"`). | -| `max_concurrent` | `int` | Maximum simultaneous running jobs from this queue. | - -### `queue.pause()` - -```python -queue.pause(queue_name: str) -> None -``` - -Pause a named queue. Workers continue running but skip jobs in this queue until it is resumed. - -### `queue.resume()` - -```python -queue.resume(queue_name: str) -> None -``` - -Resume a previously paused queue. - -### `queue.paused_queues()` - -```python -queue.paused_queues() -> list[str] -``` - -Return the names of all currently paused queues. - -### `queue.purge()` - -```python -queue.purge( - queue: str | None = None, - task_name: str | None = None, - status: str | None = None, -) -> int -``` - -Delete jobs matching the given filters. Returns the count deleted. - -### `queue.revoke_task()` - -```python -queue.revoke_task(task_name: str) -> None -``` - -Prevent all future enqueues of the given task name. Existing pending jobs are not affected. - -## Archival - -### `queue.archive()` - -```python -queue.archive(job_id: str) -> None -``` - -Move a completed or failed job to the archive for long-term retention. - -### `queue.list_archived()` - -```python -queue.list_archived( - task_name: str | None = None, - limit: int = 50, - offset: int = 0, -) -> list[dict] -``` - -List archived jobs with optional task name filter. - -## Replay - -### `queue.replay()` - -```python -queue.replay(job_id: str) -> JobResult -``` - -Re-enqueue a completed or failed job with its original arguments. Returns the new job handle. - -### `queue.replay_history()` - -```python -queue.replay_history(job_id: str) -> list[dict] -``` - -Return the replay log for a job — every time it has been replayed and the resulting new job IDs. - -## Statistics - -### `queue.stats()` - -```python -queue.stats() -> dict[str, int] -``` - -Returns `{"pending": N, "running": N, "completed": N, "failed": N, "dead": N, "cancelled": N}`. - -### `queue.stats_by_queue()` - -```python -queue.stats_by_queue() -> dict[str, dict[str, int]] -``` - -Returns per-queue status counts: `{queue_name: {"pending": N, ...}}`. - -### `queue.stats_all_queues()` - -```python -queue.stats_all_queues() -> dict[str, dict[str, int]] -``` - -Returns stats for all queues including those with zero jobs. - -### `queue.metrics()` - -```python -queue.metrics() -> dict -``` - -Returns current throughput and latency snapshot. - -### `queue.metrics_timeseries()` - -```python -queue.metrics_timeseries( - window: int = 3600, - bucket: int = 60, -) -> list[dict] -``` - -Returns historical metrics bucketed by time. `window` is the lookback period in seconds; `bucket` is the bucket size in seconds. - -## Dead Letter Queue - -### `queue.dead_letters()` - -```python -queue.dead_letters(limit: int = 10, offset: int = 0) -> list[dict] -``` - -List dead letter entries. Each dict contains: `id`, `original_job_id`, `queue`, `task_name`, `error`, `retry_count`, `failed_at`, `metadata`. - -### `queue.retry_dead()` - -```python -queue.retry_dead(dead_id: str) -> str -``` - -Re-enqueue a dead letter job. Returns the new job ID. - -### `queue.purge_dead()` - -```python -queue.purge_dead(older_than: int = 86400) -> int -``` - -Purge dead letter entries older than `older_than` seconds. Returns count deleted. - -## Cleanup - -### `queue.purge_completed()` - -```python -queue.purge_completed(older_than: int = 86400) -> int -``` - -Purge completed jobs older than `older_than` seconds. Returns count deleted. - -## Distributed Locking - -### `queue.lock()` - -```python -queue.lock( - name: str, - ttl: int = 30, - auto_extend: bool = True, - owner_id: str | None = None, - timeout: float | None = None, - retry_interval: float = 0.1, -) -> contextlib.AbstractContextManager -``` - -Acquire a distributed lock. Use as a context manager: - -```python -with queue.lock("my-resource", ttl=60): - # exclusive section - ... -``` - -Raises `LockNotAcquired` if acquisition fails (when `timeout` is `None` or expires). - -### `queue.alock()` - -```python -queue.alock( - name: str, - ttl: float = 30.0, - auto_extend: bool = True, - owner_id: str | None = None, - timeout: float | None = None, - retry_interval: float = 0.1, -) -> AsyncDistributedLock -``` - -Async context manager version of `lock()`. Returns an `AsyncDistributedLock` directly — use `async with`, not `await`: - -```python -async with queue.alock("my-resource"): - ... -``` - -## Workers - -### `queue.run_worker()` - -```python -queue.run_worker( - queues: Sequence[str] | None = None, - tags: list[str] | None = None, - pool: str = "thread", - app: str | None = None, -) -> None -``` - -Start the worker loop. **Blocks** until interrupted. - -| Parameter | Type | Default | Description | -|---|---|---|---| -| `queues` | `Sequence[str] \| None` | `None` | Queue names to consume from. `None` = all. | -| `tags` | `list[str] \| None` | `None` | Tags for worker specialization / routing. | -| `pool` | `str` | `"thread"` | Worker pool type: `"thread"` or `"prefork"`. | -| `app` | `str \| None` | `None` | Import path to Queue (e.g. `"myapp:queue"`). Required when `pool="prefork"`. | - -### `queue.workers()` - -```python -queue.workers() -> list[dict] -``` - -Return live state of all registered workers. Each dict contains: - -| Key | Type | Description | -|-----|------|-------------| -| `worker_id` | `str` | Unique worker ID | -| `hostname` | `str` | OS hostname | -| `pid` | `int` | Process ID | -| `status` | `str` | `"active"` or `"draining"` | -| `pool_type` | `str` | `"thread"`, `"prefork"`, or `"native-async"` | -| `started_at` | `int` | Registration timestamp (ms since epoch) | -| `last_heartbeat` | `int` | Last heartbeat timestamp (ms) | -| `queues` | `str` | Comma-separated queue names | -| `threads` | `int` | Worker thread/process count | -| `tags` | `str \| None` | Worker specialization tags | -| `resources` | `str \| None` | Registered resource names (JSON) | -| `resource_health` | `str \| None` | Per-resource health status (JSON) | - -### `await queue.aworkers()` - -```python -await queue.aworkers() -> list[dict] -``` - -Async version of `workers()`. - -## Circuit Breakers - -### `queue.circuit_breakers()` - -```python -queue.circuit_breakers() -> list[dict] -``` - -Return current state of all circuit breakers: task name, state (`closed`/`open`/`half-open`), failure count, last failure time. - -## Resource System - -### `@queue.worker_resource()` - -```python -@queue.worker_resource( - name: str, - depends_on: list[str] | None = None, - teardown: Callable | None = None, - health_check: Callable | None = None, - health_check_interval: float = 0.0, - max_recreation_attempts: int = 3, - scope: str = "worker", - pool_size: int | None = None, - pool_min: int = 0, - acquire_timeout: float = 10.0, - max_lifetime: float = 3600.0, - idle_timeout: float = 300.0, - reloadable: bool = False, - frozen: bool = False, -) -> Callable -``` - -Decorator to register a resource factory initialized at worker startup. - -| Parameter | Type | Default | Description | -|---|---|---|---| -| `name` | `str` | — | Resource name used in `inject=["name"]` or `Inject["name"]`. | -| `depends_on` | `list[str] \| None` | `None` | Names of resources this one depends on. | -| `teardown` | `Callable \| None` | `None` | Called with the resource instance on shutdown. | -| `health_check` | `Callable \| None` | `None` | Called periodically; returns truthy if healthy. | -| `health_check_interval` | `float` | `0.0` | Seconds between health checks (0 = disabled). | -| `max_recreation_attempts` | `int` | `3` | Max times to recreate on health failure. | -| `scope` | `str` | `"worker"` | Lifetime scope: `"worker"`, `"task"`, `"thread"`, or `"request"`. | -| `pool_size` | `int \| None` | `None` | Pool capacity for task-scoped resources (default = worker thread count). | -| `pool_min` | `int` | `0` | Pre-warmed instances for task-scoped resources. | -| `acquire_timeout` | `float` | `10.0` | Max seconds to wait for a pool instance. | -| `max_lifetime` | `float` | `3600.0` | Max seconds a pooled instance can live. | -| `idle_timeout` | `float` | `300.0` | Max idle seconds before pool eviction. | -| `reloadable` | `bool` | `False` | Allow hot-reload via SIGHUP. | -| `frozen` | `bool` | `False` | Wrap in a read-only proxy that blocks attribute writes. | - -### `queue.register_resource()` - -```python -queue.register_resource(definition: ResourceDefinition) -> None -``` - -Programmatically register a `ResourceDefinition`. Equivalent to `@queue.worker_resource()` but accepts a pre-built definition object. - -### `queue.load_resources()` - -```python -queue.load_resources(toml_path: str) -> None -``` - -Load resource definitions from a TOML file. Must be called before `run_worker()`. See [TOML Configuration](../guide/resources.md#toml-configuration). - -### `queue.health_check()` - -```python -queue.health_check(name: str) -> bool -``` - -Run a resource's health check immediately. Returns `True` if healthy, `False` otherwise or if the runtime is not initialized. - -### `queue.resource_status()` - -```python -queue.resource_status() -> list[dict] -``` - -Return per-resource status. Each entry contains: `name`, `scope`, `health`, `init_duration_ms`, `recreations`, `depends_on`. Task-scoped entries also include `pool` stats. - -### `queue.register_type()` - -```python -queue.register_type( - python_type: type, - strategy: str, - *, - resource: str | None = None, - message: str | None = None, - converter: Callable | None = None, - type_key: str | None = None, - proxy_handler: str | None = None, -) -> None -``` - -Register a custom type with the interception system. Requires `interception="strict"` or `"lenient"`. - -| Parameter | Type | Description | -|---|---|---| -| `python_type` | `type` | The type to register. | -| `strategy` | `str` | `"pass"`, `"convert"`, `"redirect"`, `"reject"`, or `"proxy"`. | -| `resource` | `str \| None` | Resource name for `"redirect"` strategy. | -| `message` | `str \| None` | Rejection reason for `"reject"` strategy. | -| `converter` | `Callable \| None` | Converter callable for `"convert"` strategy. | -| `type_key` | `str \| None` | Dispatch key for the converter reconstructor. | -| `proxy_handler` | `str \| None` | Handler name for `"proxy"` strategy. | - -### `queue.interception_stats()` - -```python -queue.interception_stats() -> dict -``` - -Return interception metrics: total call count, per-strategy counts, average duration in ms, max depth reached. Returns an empty dict if interception is disabled. - -### `queue.proxy_stats()` - -```python -queue.proxy_stats() -> list[dict] -``` - -Return per-handler proxy metrics: handler name, deconstruction count, reconstruction count, error count, average reconstruction time in ms. - -## Logs - -### `queue.task_logs()` - -```python -queue.task_logs(job_id: str, limit: int = 100) -> list[dict] -``` - -Return structured log entries emitted by `current_job.log()` during the given job's execution. - -### `queue.query_logs()` - -```python -queue.query_logs( - task_name: str | None = None, - level: str | None = None, - message_like: str | None = None, - since: float | None = None, - limit: int = 100, - offset: int = 0, -) -> list[dict] -``` - -Query task logs across all jobs with optional filters. - -## Events & Webhooks - -### `queue.on_event()` - -```python -queue.on_event(event: str) -> Callable -``` - -Register a callback for a queue event. Supported events: `job.completed`, `job.failed`, `job.retried`, `job.dead`. - -```python -@queue.on_event("job.failed") -def handle_failure(job_id: str, task_name: str, error: str) -> None: - ... -``` - -### `queue.add_webhook()` - -```python -queue.add_webhook( - url: str, - events: list[EventType] | None = None, - headers: dict[str, str] | None = None, - secret: str | None = None, - max_retries: int = 3, - timeout: float = 10.0, - retry_backoff: float = 2.0, -) -> None -``` - -Register a webhook URL for one or more events. 4xx responses are not retried; 5xx responses are retried with exponential backoff. - -| Parameter | Type | Default | Description | -|---|---|---|---| -| `url` | `str` | — | URL to POST to. Must be `http://` or `https://`. | -| `events` | `list[EventType] \| None` | `None` | Event types to subscribe to. `None` means all events. | -| `headers` | `dict[str, str] \| None` | `None` | Extra HTTP headers to include. | -| `secret` | `str \| None` | `None` | HMAC-SHA256 signing secret for `X-Taskito-Signature`. | -| `max_retries` | `int` | `3` | Maximum delivery attempts. | -| `timeout` | `float` | `10.0` | HTTP request timeout in seconds. | -| `retry_backoff` | `float` | `2.0` | Base for exponential backoff between retries. | - -## Worker - -### `queue.arun_worker()` - -```python -await queue.arun_worker( - queues: Sequence[str] | None = None, - tags: list[str] | None = None, -) -> None -``` - -Async version of `run_worker()`. Runs the worker in a thread pool without blocking the event loop. - -## Hooks - -### `@queue.before_task` - -```python -@queue.before_task -def hook(task_name: str, args: tuple, kwargs: dict) -> None: ... -``` - -### `@queue.after_task` - -```python -@queue.after_task -def hook(task_name: str, args: tuple, kwargs: dict, result: Any, error: Exception | None) -> None: ... -``` - -### `@queue.on_success` - -```python -@queue.on_success -def hook(task_name: str, args: tuple, kwargs: dict, result: Any) -> None: ... -``` - -### `@queue.on_failure` - -```python -@queue.on_failure -def hook(task_name: str, args: tuple, kwargs: dict, error: Exception) -> None: ... -``` - -## Async Methods - -| Sync | Async | -|---|---| -| `queue.stats()` | `await queue.astats()` | -| `queue.stats_by_queue()` | `await queue.astats_by_queue()` | -| `queue.stats_all_queues()` | `await queue.astats_all_queues()` | -| `queue.dead_letters()` | `await queue.adead_letters()` | -| `queue.retry_dead()` | `await queue.aretry_dead()` | -| `queue.cancel_job()` | `await queue.acancel_job()` | -| `queue.run_worker()` | `await queue.arun_worker()` | -| `queue.metrics()` | `await queue.ametrics()` | -| `queue.workers()` | `await queue.aworkers()` | -| `queue.circuit_breakers()` | `await queue.acircuit_breakers()` | -| `queue.replay()` | `await queue.areplay()` | -| `queue.lock()` | `queue.alock()` (async context manager, not a coroutine) | -| `queue.resource_status()` | `await queue.aresource_status()` | diff --git a/docs/api/queue/events.md b/docs/api/queue/events.md new file mode 100644 index 0000000..67bf0b3 --- /dev/null +++ b/docs/api/queue/events.md @@ -0,0 +1,70 @@ +# Queue — Events & Logs + +Methods for event callbacks, webhook registration, and structured task logging. + +## Events & Webhooks + +### `queue.on_event()` + +```python +queue.on_event(event: str) -> Callable +``` + +Register a callback for a queue event. Supported events: `job.completed`, `job.failed`, `job.retried`, `job.dead`. + +```python +@queue.on_event("job.failed") +def handle_failure(job_id: str, task_name: str, error: str) -> None: + ... +``` + +### `queue.add_webhook()` + +```python +queue.add_webhook( + url: str, + events: list[EventType] | None = None, + headers: dict[str, str] | None = None, + secret: str | None = None, + max_retries: int = 3, + timeout: float = 10.0, + retry_backoff: float = 2.0, +) -> None +``` + +Register a webhook URL for one or more events. 4xx responses are not retried; 5xx responses are retried with exponential backoff. + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `url` | `str` | — | URL to POST to. Must be `http://` or `https://`. | +| `events` | `list[EventType] | None` | `None` | Event types to subscribe to. `None` means all events. | +| `headers` | `dict[str, str] | None` | `None` | Extra HTTP headers to include. | +| `secret` | `str | None` | `None` | HMAC-SHA256 signing secret for `X-Taskito-Signature`. | +| `max_retries` | `int` | `3` | Maximum delivery attempts. | +| `timeout` | `float` | `10.0` | HTTP request timeout in seconds. | +| `retry_backoff` | `float` | `2.0` | Base for exponential backoff between retries. | + +## Logs + +### `queue.task_logs()` + +```python +queue.task_logs(job_id: str, limit: int = 100) -> list[dict] +``` + +Return structured log entries emitted by `current_job.log()` during the given job's execution. + +### `queue.query_logs()` + +```python +queue.query_logs( + task_name: str | None = None, + level: str | None = None, + message_like: str | None = None, + since: float | None = None, + limit: int = 100, + offset: int = 0, +) -> list[dict] +``` + +Query task logs across all jobs with optional filters. diff --git a/docs/api/queue/index.md b/docs/api/queue/index.md new file mode 100644 index 0000000..f6779a4 --- /dev/null +++ b/docs/api/queue/index.md @@ -0,0 +1,199 @@ +# Queue + +::: taskito.app.Queue + +The central class for creating and managing a task queue. + +!!! tip "Sub-pages" + The Queue API is split across several pages for readability: + + - **[Job Management](jobs.md)** — get, list, cancel, archive, replay jobs + - **[Queue & Stats](queues.md)** — rate limits, concurrency, pause/resume, statistics, dead letters + - **[Workers & Hooks](workers.md)** — run workers, lifecycle hooks, circuit breakers, async methods + - **[Resources & Locking](resources.md)** — resource system, distributed locks + - **[Events & Logs](events.md)** — event callbacks, webhooks, structured logs + +## Constructor + +```python +Queue( + db_path: str = ".taskito/taskito.db", + workers: int = 0, + default_retry: int = 3, + default_timeout: int = 300, + default_priority: int = 0, + result_ttl: int | None = None, + middleware: list[TaskMiddleware] | None = None, + drain_timeout: int = 30, + interception: str = "off", + max_intercept_depth: int = 10, + recipe_signing_key: str | None = None, + max_reconstruction_timeout: int = 10, + file_path_allowlist: list[str] | None = None, + disabled_proxies: list[str] | None = None, + async_concurrency: int = 100, + event_workers: int = 4, + scheduler_poll_interval_ms: int = 50, + scheduler_reap_interval: int = 100, + scheduler_cleanup_interval: int = 1200, + namespace: str | None = None, +) +``` + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `db_path` | `str` | `".taskito/taskito.db"` | Path to SQLite database file. Parent directories are created automatically. | +| `workers` | `int` | `0` | Number of worker threads (`0` = auto-detect CPU count) | +| `default_retry` | `int` | `3` | Default max retry attempts for tasks | +| `default_timeout` | `int` | `300` | Default task timeout in seconds | +| `default_priority` | `int` | `0` | Default task priority (higher = more urgent) | +| `result_ttl` | `int | None` | `None` | Auto-cleanup completed/dead jobs older than this many seconds. `None` disables. | +| `middleware` | `list[TaskMiddleware] | None` | `None` | Queue-level middleware applied to all tasks. | +| `drain_timeout` | `int` | `30` | Seconds to wait for in-flight tasks during graceful shutdown. | +| `interception` | `str` | `"off"` | Argument interception mode: `"strict"`, `"lenient"`, or `"off"`. See [Resource System](../../resources/interception.md). | +| `max_intercept_depth` | `int` | `10` | Max recursion depth for argument walking. | +| `recipe_signing_key` | `str | None` | `None` | HMAC-SHA256 key for proxy recipe integrity. Falls back to `TASKITO_RECIPE_SECRET` env var. | +| `max_reconstruction_timeout` | `int` | `10` | Max seconds allowed for proxy reconstruction. | +| `file_path_allowlist` | `list[str] | None` | `None` | Allowed file path prefixes for the file proxy handler. | +| `disabled_proxies` | `list[str] | None` | `None` | Handler names to skip when registering built-in proxy handlers. | +| `async_concurrency` | `int` | `100` | Maximum number of `async def` tasks running concurrently on the native async executor. | +| `event_workers` | `int` | `4` | Thread pool size for the event bus. Increase for high event volume. | +| `scheduler_poll_interval_ms` | `int` | `50` | Milliseconds between scheduler poll cycles. Lower values improve scheduling precision at the cost of CPU. | +| `scheduler_reap_interval` | `int` | `100` | Reap stale/timed-out jobs every N poll cycles. | +| `scheduler_cleanup_interval` | `int` | `1200` | Clean up old completed jobs every N poll cycles. | +| `namespace` | `str | None` | `None` | Namespace for multi-tenant isolation. Jobs enqueued on this queue carry this namespace; workers only dequeue matching jobs. `None` means no namespace (default). | + +## Task Registration + +### `@queue.task()` + +```python +@queue.task( + name: str | None = None, + max_retries: int = 3, + retry_backoff: float = 1.0, + retry_delays: list[float] | None = None, + max_retry_delay: int | None = None, + timeout: int = 300, + soft_timeout: float | None = None, + priority: int = 0, + rate_limit: str | None = None, + queue: str = "default", + circuit_breaker: dict | None = None, + middleware: list[TaskMiddleware] | None = None, + expires: float | None = None, + inject: list[str] | None = None, + serializer: Serializer | None = None, + max_concurrent: int | None = None, +) -> TaskWrapper +``` + +Register a function as a background task. Returns a [`TaskWrapper`](../task.md). + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `name` | `str | None` | Auto-generated | Explicit task name. Defaults to `module.qualname`. | +| `max_retries` | `int` | `3` | Max retry attempts before moving to DLQ. | +| `retry_backoff` | `float` | `1.0` | Base delay in seconds for exponential backoff. | +| `retry_delays` | `list[float] | None` | `None` | Per-attempt delays in seconds, overrides backoff. e.g. `[1, 5, 30]`. | +| `max_retry_delay` | `int | None` | `None` | Cap on backoff delay in seconds. Defaults to 300 s. | +| `timeout` | `int` | `300` | Hard execution time limit in seconds. | +| `soft_timeout` | `float | None` | `None` | Cooperative time limit checked via `current_job.check_timeout()`. | +| `priority` | `int` | `0` | Default priority (higher = more urgent). | +| `rate_limit` | `str | None` | `None` | Rate limit string, e.g. `"100/m"`. | +| `queue` | `str` | `"default"` | Named queue to submit to. | +| `circuit_breaker` | `dict | None` | `None` | Circuit breaker config: `{"threshold": 5, "window": 60, "cooldown": 120}`. | +| `middleware` | `list[TaskMiddleware] | None` | `None` | Per-task middleware, applied in addition to queue-level middleware. | +| `expires` | `float | None` | `None` | Seconds until the job expires if not started. | +| `inject` | `list[str] | None` | `None` | Resource names to inject as keyword arguments. See [Resource System](../../resources/index.md). | +| `serializer` | `Serializer | None` | `None` | Per-task serializer override. Falls back to queue-level serializer. | +| `max_concurrent` | `int | None` | `None` | Max concurrent running instances. `None` = no limit. | + +### `@queue.periodic()` + +```python +@queue.periodic( + cron: str, + name: str | None = None, + args: tuple = (), + kwargs: dict | None = None, + queue: str = "default", + timezone: str | None = None, +) -> TaskWrapper +``` + +Register a periodic (cron-scheduled) task. Uses 6-field cron expressions with seconds. + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `cron` | `str` | — | 6-field cron expression (seconds precision). | +| `name` | `str | None` | Auto-generated | Explicit task name. | +| `args` | `tuple` | `()` | Positional arguments passed to the task on each run. | +| `kwargs` | `dict | None` | `None` | Keyword arguments passed to the task on each run. | +| `queue` | `str` | `"default"` | Named queue to submit to. | +| `timezone` | `str | None` | `None` | IANA timezone name (e.g. `"America/New_York"`). Defaults to UTC. | + +## Enqueue Methods + +### `queue.enqueue()` + +```python +queue.enqueue( + task_name: str, + args: tuple = (), + kwargs: dict | None = None, + priority: int | None = None, + delay: float | None = None, + queue: str | None = None, + max_retries: int | None = None, + timeout: int | None = None, + unique_key: str | None = None, + metadata: str | None = None, + depends_on: str | list[str] | None = None, +) -> JobResult +``` + +Enqueue a task for execution. Returns a [`JobResult`](../result.md) handle. + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `depends_on` | `str | list[str] | None` | `None` | Job ID(s) this job depends on. See [Dependencies](../../guide/execution/dependencies.md). | + +### `queue.enqueue_many()` + +```python +queue.enqueue_many( + task_name: str, + args_list: list[tuple], + kwargs_list: list[dict] | None = None, + priority: int | None = None, + queue: str | None = None, + max_retries: int | None = None, + timeout: int | None = None, + delay: float | None = None, + delay_list: list[float | None] | None = None, + unique_keys: list[str | None] | None = None, + metadata: str | None = None, + metadata_list: list[str | None] | None = None, + expires: float | None = None, + expires_list: list[float | None] | None = None, + result_ttl: int | None = None, + result_ttl_list: list[int | None] | None = None, +) -> list[JobResult] +``` + +Enqueue multiple jobs in a single transaction for high throughput. Supports both uniform parameters (applied to all jobs) and per-job lists. + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `delay` | `float | None` | `None` | Uniform delay in seconds for all jobs | +| `delay_list` | `list[float | None] | None` | `None` | Per-job delays in seconds | +| `unique_keys` | `list[str | None] | None` | `None` | Per-job deduplication keys | +| `metadata` | `str | None` | `None` | Uniform metadata JSON for all jobs | +| `metadata_list` | `list[str | None] | None` | `None` | Per-job metadata JSON | +| `expires` | `float | None` | `None` | Uniform expiry in seconds for all jobs | +| `expires_list` | `list[float | None] | None` | `None` | Per-job expiry in seconds | +| `result_ttl` | `int | None` | `None` | Uniform result TTL in seconds | +| `result_ttl_list` | `list[int | None] | None` | `None` | Per-job result TTL in seconds | + +Per-job lists (`*_list`) take precedence over uniform values when both are provided. diff --git a/docs/api/queue/jobs.md b/docs/api/queue/jobs.md new file mode 100644 index 0000000..3649642 --- /dev/null +++ b/docs/api/queue/jobs.md @@ -0,0 +1,129 @@ +# Queue — Job Management + +Methods for retrieving, filtering, cancelling, archiving, and replaying jobs. + +## Job Retrieval + +### `queue.get_job()` + +```python +queue.get_job(job_id: str) -> JobResult | None +``` + +Retrieve a job by ID. Returns `None` if not found. + +### `queue.list_jobs()` + +```python +queue.list_jobs( + status: str | None = None, + queue: str | None = None, + task_name: str | None = None, + limit: int = 50, + offset: int = 0, + namespace: str | None = _UNSET, +) -> list[JobResult] +``` + +List jobs with optional filters. Returns newest first. Defaults to the queue's namespace — pass `namespace=None` to see all namespaces. + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `status` | `str | None` | `None` | Filter by status: `pending`, `running`, `completed`, `failed`, `dead`, `cancelled` | +| `queue` | `str | None` | `None` | Filter by queue name | +| `task_name` | `str | None` | `None` | Filter by task name | +| `limit` | `int` | `50` | Maximum results to return | +| `offset` | `int` | `0` | Pagination offset | + +### `queue.list_jobs_filtered()` + +```python +queue.list_jobs_filtered( + status: str | None = None, + queue: str | None = None, + task_name: str | None = None, + metadata_like: str | None = None, + error_like: str | None = None, + created_after: float | None = None, + created_before: float | None = None, + limit: int = 50, + offset: int = 0, + namespace: str | None = _UNSET, +) -> list[JobResult] +``` + +Extended filtering with metadata and error pattern matching and time range constraints. Defaults to the queue's namespace — pass `namespace=None` to see all namespaces. + +## Job Operations + +### `queue.cancel_job()` + +```python +queue.cancel_job(job_id: str) -> bool +``` + +Cancel a pending job. Returns `True` if cancelled, `False` if not pending. Cascade-cancels dependents. + +### `queue.update_progress()` + +```python +queue.update_progress(job_id: str, progress: int) -> None +``` + +Update progress for a running job (0–100). + +### `queue.job_errors()` + +```python +queue.job_errors(job_id: str) -> list[dict] +``` + +Get error history for a job. Returns a list of dicts with `id`, `job_id`, `attempt`, `error`, `failed_at`. + +### `queue.job_dag()` + +```python +queue.job_dag(job_id: str) -> dict +``` + +Return a dependency graph for a job, including all ancestors and descendants. Useful for visualizing workflow chains. + +## Archival + +### `queue.archive()` + +```python +queue.archive(job_id: str) -> None +``` + +Move a completed or failed job to the archive for long-term retention. + +### `queue.list_archived()` + +```python +queue.list_archived( + task_name: str | None = None, + limit: int = 50, + offset: int = 0, +) -> list[dict] +``` + +List archived jobs with optional task name filter. + +## Replay + +### `queue.replay()` + +```python +queue.replay(job_id: str) -> JobResult +``` + +Re-enqueue a completed or failed job with its original arguments. Returns the new job handle. + +### `queue.replay_history()` + +```python +queue.replay_history(job_id: str) -> list[dict] +``` + +Return the replay log for a job — every time it has been replayed and the resulting new job IDs. diff --git a/docs/api/queue/queues.md b/docs/api/queue/queues.md new file mode 100644 index 0000000..ece6397 --- /dev/null +++ b/docs/api/queue/queues.md @@ -0,0 +1,156 @@ +# Queue — Queue & Stats + +Methods for managing queues, collecting statistics, and handling dead letters. + +## Queue Management + +### `queue.set_queue_rate_limit()` + +```python +queue.set_queue_rate_limit(queue_name: str, rate_limit: str) -> None +``` + +Set a rate limit for all jobs in a queue. Checked by the scheduler before per-task rate limits. + +| Parameter | Type | Description | +|---|---|---| +| `queue_name` | `str` | Queue name (e.g. `"default"`). | +| `rate_limit` | `str` | Rate limit string: `"N/s"`, `"N/m"`, or `"N/h"`. | + +### `queue.set_queue_concurrency()` + +```python +queue.set_queue_concurrency(queue_name: str, max_concurrent: int) -> None +``` + +Set a maximum number of concurrently running jobs for a queue across all workers. Checked by the scheduler before per-task `max_concurrent` limits. + +| Parameter | Type | Description | +|---|---|---| +| `queue_name` | `str` | Queue name (e.g. `"default"`). | +| `max_concurrent` | `int` | Maximum simultaneous running jobs from this queue. | + +### `queue.pause()` + +```python +queue.pause(queue_name: str) -> None +``` + +Pause a named queue. Workers continue running but skip jobs in this queue until it is resumed. + +### `queue.resume()` + +```python +queue.resume(queue_name: str) -> None +``` + +Resume a previously paused queue. + +### `queue.paused_queues()` + +```python +queue.paused_queues() -> list[str] +``` + +Return the names of all currently paused queues. + +### `queue.purge()` + +```python +queue.purge( + queue: str | None = None, + task_name: str | None = None, + status: str | None = None, +) -> int +``` + +Delete jobs matching the given filters. Returns the count deleted. + +### `queue.revoke_task()` + +```python +queue.revoke_task(task_name: str) -> None +``` + +Prevent all future enqueues of the given task name. Existing pending jobs are not affected. + +## Statistics + +### `queue.stats()` + +```python +queue.stats() -> dict[str, int] +``` + +Returns `{"pending": N, "running": N, "completed": N, "failed": N, "dead": N, "cancelled": N}`. + +### `queue.stats_by_queue()` + +```python +queue.stats_by_queue() -> dict[str, dict[str, int]] +``` + +Returns per-queue status counts: `{queue_name: {"pending": N, ...}}`. + +### `queue.stats_all_queues()` + +```python +queue.stats_all_queues() -> dict[str, dict[str, int]] +``` + +Returns stats for all queues including those with zero jobs. + +### `queue.metrics()` + +```python +queue.metrics() -> dict +``` + +Returns current throughput and latency snapshot. + +### `queue.metrics_timeseries()` + +```python +queue.metrics_timeseries( + window: int = 3600, + bucket: int = 60, +) -> list[dict] +``` + +Returns historical metrics bucketed by time. `window` is the lookback period in seconds; `bucket` is the bucket size in seconds. + +## Dead Letter Queue + +### `queue.dead_letters()` + +```python +queue.dead_letters(limit: int = 10, offset: int = 0) -> list[dict] +``` + +List dead letter entries. Each dict contains: `id`, `original_job_id`, `queue`, `task_name`, `error`, `retry_count`, `failed_at`, `metadata`. + +### `queue.retry_dead()` + +```python +queue.retry_dead(dead_id: str) -> str +``` + +Re-enqueue a dead letter job. Returns the new job ID. + +### `queue.purge_dead()` + +```python +queue.purge_dead(older_than: int = 86400) -> int +``` + +Purge dead letter entries older than `older_than` seconds. Returns count deleted. + +## Cleanup + +### `queue.purge_completed()` + +```python +queue.purge_completed(older_than: int = 86400) -> int +``` + +Purge completed jobs older than `older_than` seconds. Returns count deleted. diff --git a/docs/api/queue/resources.md b/docs/api/queue/resources.md new file mode 100644 index 0000000..f308603 --- /dev/null +++ b/docs/api/queue/resources.md @@ -0,0 +1,165 @@ +# Queue — Resources & Locking + +Methods for the worker resource system and distributed locking. + +## Resource System + +### `@queue.worker_resource()` + +```python +@queue.worker_resource( + name: str, + depends_on: list[str] | None = None, + teardown: Callable | None = None, + health_check: Callable | None = None, + health_check_interval: float = 0.0, + max_recreation_attempts: int = 3, + scope: str = "worker", + pool_size: int | None = None, + pool_min: int = 0, + acquire_timeout: float = 10.0, + max_lifetime: float = 3600.0, + idle_timeout: float = 300.0, + reloadable: bool = False, + frozen: bool = False, +) -> Callable +``` + +Decorator to register a resource factory initialized at worker startup. + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `name` | `str` | — | Resource name used in `inject=["name"]` or `Inject["name"]`. | +| `depends_on` | `list[str] | None` | `None` | Names of resources this one depends on. | +| `teardown` | `Callable | None` | `None` | Called with the resource instance on shutdown. | +| `health_check` | `Callable | None` | `None` | Called periodically; returns truthy if healthy. | +| `health_check_interval` | `float` | `0.0` | Seconds between health checks (0 = disabled). | +| `max_recreation_attempts` | `int` | `3` | Max times to recreate on health failure. | +| `scope` | `str` | `"worker"` | Lifetime scope: `"worker"`, `"task"`, `"thread"`, or `"request"`. | +| `pool_size` | `int | None` | `None` | Pool capacity for task-scoped resources (default = worker thread count). | +| `pool_min` | `int` | `0` | Pre-warmed instances for task-scoped resources. | +| `acquire_timeout` | `float` | `10.0` | Max seconds to wait for a pool instance. | +| `max_lifetime` | `float` | `3600.0` | Max seconds a pooled instance can live. | +| `idle_timeout` | `float` | `300.0` | Max idle seconds before pool eviction. | +| `reloadable` | `bool` | `False` | Allow hot-reload via SIGHUP. | +| `frozen` | `bool` | `False` | Wrap in a read-only proxy that blocks attribute writes. | + +### `queue.register_resource()` + +```python +queue.register_resource(definition: ResourceDefinition) -> None +``` + +Programmatically register a `ResourceDefinition`. Equivalent to `@queue.worker_resource()` but accepts a pre-built definition object. + +### `queue.load_resources()` + +```python +queue.load_resources(toml_path: str) -> None +``` + +Load resource definitions from a TOML file. Must be called before `run_worker()`. See [TOML Configuration](../../resources/configuration.md). + +### `queue.health_check()` + +```python +queue.health_check(name: str) -> bool +``` + +Run a resource's health check immediately. Returns `True` if healthy, `False` otherwise or if the runtime is not initialized. + +### `queue.resource_status()` + +```python +queue.resource_status() -> list[dict] +``` + +Return per-resource status. Each entry contains: `name`, `scope`, `health`, `init_duration_ms`, `recreations`, `depends_on`. Task-scoped entries also include `pool` stats. + +### `queue.register_type()` + +```python +queue.register_type( + python_type: type, + strategy: str, + *, + resource: str | None = None, + message: str | None = None, + converter: Callable | None = None, + type_key: str | None = None, + proxy_handler: str | None = None, +) -> None +``` + +Register a custom type with the interception system. Requires `interception="strict"` or `"lenient"`. + +| Parameter | Type | Description | +|---|---|---| +| `python_type` | `type` | The type to register. | +| `strategy` | `str` | `"pass"`, `"convert"`, `"redirect"`, `"reject"`, or `"proxy"`. | +| `resource` | `str | None` | Resource name for `"redirect"` strategy. | +| `message` | `str | None` | Rejection reason for `"reject"` strategy. | +| `converter` | `Callable | None` | Converter callable for `"convert"` strategy. | +| `type_key` | `str | None` | Dispatch key for the converter reconstructor. | +| `proxy_handler` | `str | None` | Handler name for `"proxy"` strategy. | + +### `queue.interception_stats()` + +```python +queue.interception_stats() -> dict +``` + +Return interception metrics: total call count, per-strategy counts, average duration in ms, max depth reached. Returns an empty dict if interception is disabled. + +### `queue.proxy_stats()` + +```python +queue.proxy_stats() -> list[dict] +``` + +Return per-handler proxy metrics: handler name, deconstruction count, reconstruction count, error count, average reconstruction time in ms. + +## Distributed Locking + +### `queue.lock()` + +```python +queue.lock( + name: str, + ttl: int = 30, + auto_extend: bool = True, + owner_id: str | None = None, + timeout: float | None = None, + retry_interval: float = 0.1, +) -> contextlib.AbstractContextManager +``` + +Acquire a distributed lock. Use as a context manager: + +```python +with queue.lock("my-resource", ttl=60): + # exclusive section + ... +``` + +Raises `LockNotAcquired` if acquisition fails (when `timeout` is `None` or expires). + +### `queue.alock()` + +```python +queue.alock( + name: str, + ttl: float = 30.0, + auto_extend: bool = True, + owner_id: str | None = None, + timeout: float | None = None, + retry_interval: float = 0.1, +) -> AsyncDistributedLock +``` + +Async context manager version of `lock()`. Returns an `AsyncDistributedLock` directly — use `async with`, not `await`: + +```python +async with queue.alock("my-resource"): + ... +``` diff --git a/docs/api/queue/workers.md b/docs/api/queue/workers.md new file mode 100644 index 0000000..a518113 --- /dev/null +++ b/docs/api/queue/workers.md @@ -0,0 +1,125 @@ +# Queue — Workers & Hooks + +Methods for running workers, lifecycle hooks, circuit breakers, and the sync/async method mapping. + +## Workers + +### `queue.run_worker()` + +```python +queue.run_worker( + queues: Sequence[str] | None = None, + tags: list[str] | None = None, + pool: str = "thread", + app: str | None = None, +) -> None +``` + +Start the worker loop. **Blocks** until interrupted. + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `queues` | `Sequence[str] | None` | `None` | Queue names to consume from. `None` = all. | +| `tags` | `list[str] | None` | `None` | Tags for worker specialization / routing. | +| `pool` | `str` | `"thread"` | Worker pool type: `"thread"` or `"prefork"`. | +| `app` | `str | None` | `None` | Import path to Queue (e.g. `"myapp:queue"`). Required when `pool="prefork"`. | + +### `queue.workers()` + +```python +queue.workers() -> list[dict] +``` + +Return live state of all registered workers. Each dict contains: + +| Key | Type | Description | +|-----|------|-------------| +| `worker_id` | `str` | Unique worker ID | +| `hostname` | `str` | OS hostname | +| `pid` | `int` | Process ID | +| `status` | `str` | `"active"` or `"draining"` | +| `pool_type` | `str` | `"thread"`, `"prefork"`, or `"native-async"` | +| `started_at` | `int` | Registration timestamp (ms since epoch) | +| `last_heartbeat` | `int` | Last heartbeat timestamp (ms) | +| `queues` | `str` | Comma-separated queue names | +| `threads` | `int` | Worker thread/process count | +| `tags` | `str | None` | Worker specialization tags | +| `resources` | `str | None` | Registered resource names (JSON) | +| `resource_health` | `str | None` | Per-resource health status (JSON) | + +### `await queue.aworkers()` + +```python +await queue.aworkers() -> list[dict] +``` + +Async version of `workers()`. + +### `queue.arun_worker()` + +```python +await queue.arun_worker( + queues: Sequence[str] | None = None, + tags: list[str] | None = None, +) -> None +``` + +Async version of `run_worker()`. Runs the worker in a thread pool without blocking the event loop. + +## Circuit Breakers + +### `queue.circuit_breakers()` + +```python +queue.circuit_breakers() -> list[dict] +``` + +Return current state of all circuit breakers: task name, state (`closed`/`open`/`half-open`), failure count, last failure time. + +## Hooks + +### `@queue.before_task` + +```python +@queue.before_task +def hook(task_name: str, args: tuple, kwargs: dict) -> None: ... +``` + +### `@queue.after_task` + +```python +@queue.after_task +def hook(task_name: str, args: tuple, kwargs: dict, result: Any, error: Exception | None) -> None: ... +``` + +### `@queue.on_success` + +```python +@queue.on_success +def hook(task_name: str, args: tuple, kwargs: dict, result: Any) -> None: ... +``` + +### `@queue.on_failure` + +```python +@queue.on_failure +def hook(task_name: str, args: tuple, kwargs: dict, error: Exception) -> None: ... +``` + +## Async Methods + +| Sync | Async | +|---|---| +| `queue.stats()` | `await queue.astats()` | +| `queue.stats_by_queue()` | `await queue.astats_by_queue()` | +| `queue.stats_all_queues()` | `await queue.astats_all_queues()` | +| `queue.dead_letters()` | `await queue.adead_letters()` | +| `queue.retry_dead()` | `await queue.aretry_dead()` | +| `queue.cancel_job()` | `await queue.acancel_job()` | +| `queue.run_worker()` | `await queue.arun_worker()` | +| `queue.metrics()` | `await queue.ametrics()` | +| `queue.workers()` | `await queue.aworkers()` | +| `queue.circuit_breakers()` | `await queue.acircuit_breakers()` | +| `queue.replay()` | `await queue.areplay()` | +| `queue.lock()` | `queue.alock()` (async context manager, not a coroutine) | +| `queue.resource_status()` | `await queue.aresource_status()` | diff --git a/docs/api/result.md b/docs/api/result.md index b27b02c..617c307 100644 --- a/docs/api/result.md +++ b/docs/api/result.md @@ -78,7 +78,7 @@ for err in job.errors: job.dependencies -> list[str] ``` -List of job IDs this job depends on. Returns an empty list if the job has no dependencies. See [Dependencies](../guide/dependencies.md). +List of job IDs this job depends on. Returns an empty list if the job has no dependencies. See [Dependencies](../guide/execution/dependencies.md). ### `job.dependents` @@ -96,7 +96,7 @@ List of job IDs that depend on this job. Returns an empty list if no other jobs job.to_dict() -> dict ``` -Return all job fields as a plain dictionary. Useful for JSON serialization (e.g. in the [dashboard](../guide/dashboard.md) or [FastAPI integration](../guide/advanced.md#fastapi-integration)). +Return all job fields as a plain dictionary. Useful for JSON serialization (e.g. in the [dashboard](../guide/observability/dashboard.md) or [FastAPI integration](../integrations/fastapi.md)). ### `job.result()` diff --git a/docs/api/task.md b/docs/api/task.md index 2d76ed9..791d746 100644 --- a/docs/api/task.md +++ b/docs/api/task.md @@ -55,15 +55,15 @@ Enqueue with full control over submission options. Any parameter not provided fa | Parameter | Type | Default | Description | |---|---|---|---| | `args` | `tuple` | `()` | Positional arguments for the task | -| `kwargs` | `dict \| None` | `None` | Keyword arguments for the task | -| `priority` | `int \| None` | `None` | Override priority (higher = more urgent) | -| `delay` | `float \| None` | `None` | Delay in seconds before the task is eligible | -| `queue` | `str \| None` | `None` | Override queue name | -| `max_retries` | `int \| None` | `None` | Override max retry count | -| `timeout` | `int \| None` | `None` | Override timeout in seconds | -| `unique_key` | `str \| None` | `None` | Deduplicate active jobs with same key | -| `metadata` | `str \| None` | `None` | Arbitrary JSON metadata to attach | -| `depends_on` | `str \| list[str] \| None` | `None` | Job ID(s) this job depends on. See [Dependencies](../guide/dependencies.md). | +| `kwargs` | `dict | None` | `None` | Keyword arguments for the task | +| `priority` | `int | None` | `None` | Override priority (higher = more urgent) | +| `delay` | `float | None` | `None` | Delay in seconds before the task is eligible | +| `queue` | `str | None` | `None` | Override queue name | +| `max_retries` | `int | None` | `None` | Override max retry count | +| `timeout` | `int | None` | `None` | Override timeout in seconds | +| `unique_key` | `str | None` | `None` | Deduplicate active jobs with same key | +| `metadata` | `str | None` | `None` | Arbitrary JSON metadata to attach | +| `depends_on` | `str | list[str] | None` | `None` | Job ID(s) this job depends on. See [Dependencies](../guide/execution/dependencies.md). | ```python job = send_email.apply_async( diff --git a/docs/api/testing.md b/docs/api/testing.md index 6a2b8b7..2d32f15 100644 --- a/docs/api/testing.md +++ b/docs/api/testing.md @@ -22,7 +22,7 @@ TestMode(queue: Queue, propagate_errors: bool = False, resources: dict[str, Any] |---|---|---|---| | `queue` | `Queue` | *required* | The Queue instance to put into test mode | | `propagate_errors` | `bool` | `False` | Re-raise task exceptions immediately instead of capturing them | -| `resources` | `dict[str, Any] \| None` | `None` | Resource name → mock instance map injected during test mode. `MockResource` values are unwrapped automatically. | +| `resources` | `dict[str, Any] | None` | `None` | Resource name → mock instance map injected during test mode. `MockResource` values are unwrapped automatically. | ### Usage @@ -59,8 +59,8 @@ Dataclass capturing the result of a single task execution in test mode. | `args` | `tuple` | Positional arguments the task was called with | | `kwargs` | `dict` | Keyword arguments the task was called with | | `return_value` | `Any` | Return value of the task (or `None` if it failed) | -| `error` | `Exception \| None` | Exception instance if the task raised | -| `traceback` | `str \| None` | Formatted traceback string if the task raised | +| `error` | `Exception | None` | Exception instance if the task raised | +| `traceback` | `str | None` | Formatted traceback string if the task raised | ### Properties @@ -98,8 +98,8 @@ Filter results by task name and/or outcome. | Parameter | Type | Default | Description | |---|---|---|---| -| `task_name` | `str \| None` | `None` | Exact match on task name | -| `succeeded` | `bool \| None` | `None` | `True` = successes only, `False` = failures only | +| `task_name` | `str | None` | `None` | Exact match on task name | +| `succeeded` | `bool | None` | `None` | `True` = successes only, `False` = failures only | Returns a new `TestResults` containing only matching items. diff --git a/docs/architecture.md b/docs/architecture.md index 019db0e..bd24919 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -175,7 +175,7 @@ Diesel's `r2d2` connection pool with up to 8 connections (SQLite) or 10 connecti ### Postgres Configuration -taskito also supports PostgreSQL as an alternative storage backend. See the [Postgres Backend guide](guide/postgres.md) for full details. +taskito also supports PostgreSQL as an alternative storage backend. See the [Postgres Backend guide](guide/operations/postgres.md) for full details. Key differences from the SQLite storage layer: @@ -269,7 +269,7 @@ Same behavior as database unavailable. The scheduler retries on the next poll cy ### Duplicate execution -`claim_execution` prevents two workers from picking up the same job simultaneously. But if a worker crashes *after* starting execution, the job will be retried — potentially executing the same task twice. Design tasks to be [idempotent](guide/guarantees.md) to handle this safely. +`claim_execution` prevents two workers from picking up the same job simultaneously. But if a worker crashes *after* starting execution, the job will be retried — potentially executing the same task twice. Design tasks to be [idempotent](guide/reliability/guarantees.md) to handle this safely. ### Recovery timeline @@ -295,7 +295,7 @@ sequenceDiagram ### Partial writes -If a task completes successfully but the result write to the database fails (e.g., database full, connection lost), the job stays in `running` status. The stale reaper eventually marks it failed and retries it. The task will execute again — make sure it's [idempotent](guide/guarantees.md). +If a task completes successfully but the result write to the database fails (e.g., database full, connection lost), the job stays in `running` status. The stale reaper eventually marks it failed and retries it. The task will execute again — make sure it's [idempotent](guide/reliability/guarantees.md). ### Jobs without timeouts diff --git a/docs/comparison.md b/docs/comparison.md index 0307c26..a553f55 100644 --- a/docs/comparison.md +++ b/docs/comparison.md @@ -71,7 +71,7 @@ Celery is the most popular Python task queue — battle-tested, feature-rich, an **Choose taskito** if you want zero-infrastructure simplicity on a single machine. **Choose Celery** if you need distributed workers, complex routing, or enterprise features. -Looking to switch? See the [Migrating from Celery](guide/migration.md) guide for a step-by-step walkthrough with side-by-side code examples. +Looking to switch? See the [Migrating from Celery](guide/operations/migration.md) guide for a step-by-step walkthrough with side-by-side code examples. ### vs RQ (Redis Queue) diff --git a/docs/examples/index.md b/docs/examples/index.md new file mode 100644 index 0000000..5c964b0 --- /dev/null +++ b/docs/examples/index.md @@ -0,0 +1,11 @@ +# Examples + +End-to-end examples demonstrating common taskito patterns. + +| Example | Description | +|---------|-------------| +| [FastAPI Service](fastapi-service.md) | REST API that enqueues tasks and streams progress via SSE | +| [Notification Service](notifications.md) | Multi-channel notifications with retries and rate limiting | +| [Web Scraper Pipeline](web-scraper.md) | Distributed scraping with chains and error handling | +| [Data Pipeline](data-pipeline.md) | ETL pipeline with dependencies, groups, and chords | +| [Benchmark](benchmark.md) | Performance benchmarks comparing taskito to alternatives | diff --git a/docs/faq.md b/docs/faq.md index 7c16536..bf27e84 100644 --- a/docs/faq.md +++ b/docs/faq.md @@ -81,13 +81,13 @@ Use the **Postgres backend** (`pip install taskito[postgres]`) when you need: - **Higher write throughput** — Postgres handles concurrent writers better than SQLite - **Existing Postgres infrastructure** — leverage your existing database and backups -For single-machine workloads, SQLite is simpler and requires zero setup. See the [Postgres Backend guide](guide/postgres.md). +For single-machine workloads, SQLite is simpler and requires zero setup. See the [Postgres Backend guide](guide/operations/postgres.md). ## Is taskito production-ready? taskito is suitable for production workloads — background job processing, periodic tasks, data pipelines, and similar use cases. -For single-machine deployments, use the default SQLite backend. For multi-server setups, use the [Postgres backend](guide/postgres.md). +For single-machine deployments, use the default SQLite backend. For multi-server setups, use the [Postgres backend](guide/operations/postgres.md). ## What observability options does taskito support? @@ -131,7 +131,7 @@ result = await job.aresult(timeout=30) stats = await queue.astats() ``` -Sync and async tasks can coexist in the same queue. The worker automatically routes each job to the correct pool based on the task type. See the [Native Async Tasks](guide/async-tasks.md) guide for details including `async_concurrency` tuning and `current_job` context in async tasks. +Sync and async tasks can coexist in the same queue. The worker automatically routes each job to the correct pool based on the task type. See the [Native Async Tasks](guide/execution/async-tasks.md) guide for details including `async_concurrency` tuning and `current_job` context in async tasks. ## What serialization format does taskito use? diff --git a/docs/getting-started/index.md b/docs/getting-started/index.md new file mode 100644 index 0000000..28f0a66 --- /dev/null +++ b/docs/getting-started/index.md @@ -0,0 +1,23 @@ +# Getting Started + +Get taskito installed and running in under 5 minutes. + +
+ +- **Installation** + + --- + + Install taskito from PyPI with optional extras for your framework. + + [:octicons-arrow-right-24: Install](installation.md) + +- **Quickstart** + + --- + + Define a task, enqueue it, and start a worker — step by step. + + [:octicons-arrow-right-24: Quickstart](quickstart.md) + +
diff --git a/docs/getting-started/installation.md b/docs/getting-started/installation.md index a3d9726..92ae484 100644 --- a/docs/getting-started/installation.md +++ b/docs/getting-started/installation.md @@ -19,7 +19,7 @@ To use PostgreSQL as the storage backend instead of SQLite: pip install taskito[postgres] ``` -See the [Postgres Backend guide](../guide/postgres.md) for configuration details. +See the [Postgres Backend guide](../guide/operations/postgres.md) for configuration details. ## From Source diff --git a/docs/getting-started/quickstart.md b/docs/getting-started/quickstart.md index 3ee1969..ae1b7a8 100644 --- a/docs/getting-started/quickstart.md +++ b/docs/getting-started/quickstart.md @@ -109,13 +109,13 @@ taskito dashboard --app tasks:queue Open `http://localhost:8080` in your browser. The dashboard includes 11 pages covering every aspect of your task queue — no extra dependencies needed. -[:octicons-arrow-right-24: Dashboard guide](../guide/dashboard.md) +[:octicons-arrow-right-24: Dashboard guide](../guide/observability/dashboard.md) ## Next Steps -- [Tasks](../guide/tasks.md) — decorator options, `.delay()` vs `.apply_async()` -- [Workers](../guide/workers.md) — CLI flags, graceful shutdown, worker count -- [Retries](../guide/retries.md) — exponential backoff, dead letter queue -- [Workflows](../guide/workflows.md) — chain, group, chord -- [Testing](../guide/testing.md) — run tasks synchronously in tests with `queue.test_mode()` -- [Migrating from Celery](../guide/migration.md) — concept mapping and side-by-side examples +- [Tasks](../guide/core/tasks.md) — decorator options, `.delay()` vs `.apply_async()` +- [Workers](../guide/core/workers.md) — CLI flags, graceful shutdown, worker count +- [Retries](../guide/reliability/retries.md) — exponential backoff, dead letter queue +- [Workflows](../guide/core/workflows.md) — chain, group, chord +- [Testing](../guide/operations/testing.md) — run tasks synchronously in tests with `queue.test_mode()` +- [Migrating from Celery](../guide/operations/migration.md) — concept mapping and side-by-side examples diff --git a/docs/guide/advanced.md b/docs/guide/advanced.md deleted file mode 100644 index da2195d..0000000 --- a/docs/guide/advanced.md +++ /dev/null @@ -1,450 +0,0 @@ -# Advanced - -## Unique Tasks - -Deduplicate active jobs by key — if a job with the same `unique_key` is already pending or running, the existing job is returned instead of creating a new one: - -```python -job1 = process.apply_async(args=("report",), unique_key="daily-report") -job2 = process.apply_async(args=("report",), unique_key="daily-report") -assert job1.id == job2.id # Same job, not duplicated -``` - -Once the original job completes (or fails to DLQ), the key is released and a new job can be created with the same key. - -!!! info "Implementation" - Deduplication uses a partial unique index: `CREATE UNIQUE INDEX ... ON jobs(unique_key) WHERE unique_key IS NOT NULL AND status IN (0, 1)`. Only pending and running jobs participate. The check-and-insert is atomic (transaction-protected), so concurrent calls with the same `unique_key` are handled gracefully without race conditions. - -## Job Cancellation - -Cancel a pending job before it starts: - -```python -job = send_email.delay("user@example.com", "Hello", "World") -cancelled = queue.cancel_job(job.id) # True if was pending -``` - -- Returns `True` if the job was pending and is now cancelled -- Returns `False` if the job was already running, completed, or in another non-pending state -- Cancelled jobs cannot be un-cancelled - -## Result TTL & Auto-Cleanup - -### Manual Cleanup - -```python -# Purge completed jobs older than 1 hour -deleted = queue.purge_completed(older_than=3600) - -# Purge dead letters older than 24 hours -deleted = queue.purge_dead(older_than=86400) -``` - -### Automatic Cleanup - -Set `result_ttl` on the Queue to automatically purge old jobs while the worker runs: - -```python -queue = Queue( - db_path="myapp.db", - result_ttl=3600, # Auto-purge completed/dead jobs older than 1 hour -) -``` - -The scheduler checks every ~60 seconds and purges: - -- Completed jobs older than `result_ttl` -- Dead letter entries older than `result_ttl` -- Error history records older than `result_ttl` - -Set to `None` (default) to disable auto-cleanup. - -### Cascade Cleanup - -When jobs are purged — either manually via `purge_completed()` or automatically via `result_ttl` — their related child records are also deleted: - -- Error history (`job_errors`) -- Task logs (`task_logs`) -- Task metrics (`task_metrics`) -- Job dependencies (`job_dependencies`) -- Replay history (`replay_history`) - -This prevents orphaned records from accumulating when parent jobs are removed. - -```python -# Manual purge — child records are cleaned up automatically -deleted = queue.purge_completed(older_than=3600) -print(f"Purged {deleted} jobs and their related records") - -# With per-job TTL — cascade cleanup still applies -job = resize_image.apply_async( - args=("photo.jpg",), - result_ttl=600, # This job's results expire after 10 minutes -) -# When this job is purged (after 10 min), its errors, logs, -# metrics, dependencies, and replay history are also removed. -``` - -!!! note - Dead letter entries are **not** cascade-deleted — they have their own lifecycle managed by `purge_dead()`. Timestamp-based cleanup (`result_ttl`) of error history, logs, and metrics also continues to run independently, catching old records regardless of whether the parent job still exists. - -## Async Support - -All inspection methods have async variants that run in a thread pool: - -```python -# Sync -stats = queue.stats() -dead = queue.dead_letters() -new_id = queue.retry_dead(dead_id) -cancelled = queue.cancel_job(job_id) -result = job.result(timeout=30) - -# Async equivalents -stats = await queue.astats() -dead = await queue.adead_letters() -new_id = await queue.aretry_dead(dead_id) -cancelled = await queue.acancel_job(job_id) -result = await job.aresult(timeout=30) -``` - -### Async Worker - -```python -async def main(): - await queue.arun_worker(queues=["default"]) - -asyncio.run(main()) -``` - -## Batch Enqueue - -Insert many jobs in a single SQLite transaction for high throughput: - -### `task.map()` - -```python -@queue.task() -def process(item_id): - return fetch_and_process(item_id) - -# Enqueue 1000 jobs in one transaction -jobs = process.map([(i,) for i in range(1000)]) -``` - -### `queue.enqueue_many()` - -```python -# Basic batch — same options for all jobs -jobs = queue.enqueue_many( - task_name="myapp.process", - args_list=[(i,) for i in range(1000)], - priority=5, - queue="processing", -) - -# Full parity with enqueue() — per-job overrides -jobs = queue.enqueue_many( - task_name="myapp.process", - args_list=[(i,) for i in range(100)], - delay=5.0, # uniform 5s delay for all - unique_keys=[f"item-{i}" for i in range(100)], # per-job dedup - metadata='{"source": "batch"}', # uniform metadata - expires=3600.0, # expire after 1 hour - result_ttl=600, # keep results for 10 minutes -) -``` - -Per-job lists (`delay_list`, `metadata_list`, `expires_list`, `result_ttl_list`) override uniform values when both are provided. See the [API reference](../api/queue.md#queueenqueue_many) for the full parameter list. - -## Queue Pause/Resume - -Temporarily pause job processing on a queue without stopping the worker: - -```python -# Pause the "emails" queue -queue.pause("emails") - -# Check which queues are paused -print(queue.paused_queues()) # ["emails"] - -# Resume processing -queue.resume("emails") -``` - -Paused queues still accept new jobs — they just won't be dequeued until resumed. - -## Job Archival - -Move old completed jobs to an archive table to keep the main jobs table lean: - -```python -# Archive completed jobs older than 24 hours -archived_count = queue.archive(older_than=86400) -print(f"Archived {archived_count} jobs") - -# Browse archived jobs -archived = queue.list_archived(limit=50, offset=0) -for job in archived: - print(f"{job.id}: {job.task_name} ({job.status})") -``` - -Archived jobs are no longer returned by `queue.stats()` or `queue.list_jobs()`, but remain queryable via `queue.list_archived()`. - -### Example: Maintenance Window - -```python -# Before maintenance: pause all queues -for q in ["default", "emails", "reports"]: - queue.pause(q) -print(f"Paused: {queue.paused_queues()}") - -# ... perform maintenance ... - -# After maintenance: resume all queues -for q in ["default", "emails", "reports"]: - queue.resume(q) -``` - -### Example: Scheduled Archival - -```python -@queue.periodic(cron="0 0 2 * * *") # Daily at 2 AM -def nightly_archival(): - archived = queue.archive(older_than=7 * 86400) # Archive jobs older than 7 days - current_job.log(f"Archived {archived} jobs") -``` - -## Task Revocation - -Cancel all pending jobs for a specific task: - -```python -# Revoke all pending "send_newsletter" jobs -cancelled = queue.revoke_task("myapp.tasks.send_newsletter") -print(f"Revoked {cancelled} jobs") -``` - -## Queue Purge - -Remove all pending jobs from a specific queue: - -```python -purged = queue.purge("emails") -print(f"Purged {purged} jobs from the emails queue") -``` - -## Job Replay - -Replay a completed or dead job with the same arguments: - -```python -new_job = queue.replay(job_id) -print(f"Replayed as {new_job.id}") - -# Check replay history -history = queue.replay_history(job_id) -``` - -### Example: Retry from Dead Letter with Replay - -```python -# List dead letters and replay them -dead = queue.dead_letters() -for entry in dead: - print(f"Replaying dead job {entry['original_job_id']}: {entry['task_name']}") - new_id = queue.retry_dead(entry["id"]) - print(f" -> New job: {new_id}") -``` - -## SQLite Configuration - -taskito configures SQLite for optimal performance: - -| Pragma | Value | Purpose | -|---|---|---| -| `journal_mode` | WAL | Concurrent reads during writes | -| `busy_timeout` | 5000ms | Wait instead of failing on lock contention | -| `synchronous` | NORMAL | Balance between safety and speed | -| `journal_size_limit` | 64MB | Prevent unbounded WAL growth | - -The connection pool uses up to 8 connections via `r2d2`. - -## FastAPI Integration - -taskito provides a first-class FastAPI integration via `taskito.contrib.fastapi`. It gives you a pre-built `APIRouter` with endpoints for job status, progress streaming via SSE, and dead letter queue management. - -### Installation - -```bash -pip install taskito[fastapi] -``` - -This installs `fastapi` and `pydantic` as extras. - -### Quick Setup - -```python -from fastapi import FastAPI -from taskito import Queue -from taskito.contrib.fastapi import TaskitoRouter - -queue = Queue(db_path="myapp.db") - -@queue.task() -def process_data(payload: dict) -> str: - return "done" - -app = FastAPI() -app.include_router(TaskitoRouter(queue), prefix="/tasks") -``` - -Run with: - -```bash -uvicorn myapp:app --reload -``` - -All taskito endpoints are now available under `/tasks/`. - -### Endpoints - -The `TaskitoRouter` exposes the following endpoints: - -| Method | Path | Description | -|---|---|---| -| `GET` | `/stats` | Queue statistics (pending, running, completed, etc.) | -| `GET` | `/jobs/{job_id}` | Job status, progress, and metadata | -| `GET` | `/jobs/{job_id}/errors` | Error history for a job | -| `GET` | `/jobs/{job_id}/result` | Job result (optional blocking with `?timeout=N`) | -| `GET` | `/jobs/{job_id}/progress` | SSE stream of progress updates | -| `POST` | `/jobs/{job_id}/cancel` | Cancel a pending job | -| `GET` | `/dead-letters` | List dead letter entries (paginated) | -| `POST` | `/dead-letters/{dead_id}/retry` | Re-enqueue a dead letter | - -### Blocking Result Fetch - -The `/jobs/{job_id}/result` endpoint supports an optional `timeout` query parameter (0–300 seconds). When `timeout > 0`, the request blocks until the job completes or the timeout elapses: - -```bash -# Non-blocking (default) -curl http://localhost:8000/tasks/jobs/01H5K6X.../result - -# Block up to 30 seconds for the result -curl http://localhost:8000/tasks/jobs/01H5K6X.../result?timeout=30 -``` - -### SSE Progress Streaming - -Stream real-time progress for a running job using Server-Sent Events: - -```python -import httpx - -with httpx.stream("GET", "http://localhost:8000/tasks/jobs/01H5K6X.../progress") as r: - for line in r.iter_lines(): - print(line) - # data: {"progress": 25, "status": "running"} - # data: {"progress": 50, "status": "running"} - # data: {"progress": 100, "status": "completed"} -``` - -From the browser: - -```javascript -const source = new EventSource("/tasks/jobs/01H5K6X.../progress"); -source.onmessage = (event) => { - const data = JSON.parse(event.data); - console.log(`Progress: ${data.progress}%`); - if (data.status === "completed" || data.status === "failed") { - source.close(); - } -}; -``` - -The stream sends a JSON event every 0.5 seconds while the job is active, then a final event when the job reaches a terminal state. - -### Pydantic Response Models - -All endpoints return validated Pydantic models with clean OpenAPI docs. You can import them for type-safe client code: - -```python -from taskito.contrib.fastapi import ( - StatsResponse, - JobResponse, - JobErrorResponse, - JobResultResponse, - CancelResponse, - DeadLetterResponse, - RetryResponse, -) -``` - -### Custom Tags and Dependencies - -Customize the router with FastAPI tags and dependency injection: - -```python -from fastapi import Depends, FastAPI, Header, HTTPException -from taskito.contrib.fastapi import TaskitoRouter - -async def verify_api_key(x_api_key: str = Header(...)): - if x_api_key != "secret": - raise HTTPException(status_code=401, detail="Invalid API key") - -app = FastAPI() - -router = TaskitoRouter( - queue, - tags=["task-queue"], # OpenAPI tags - dependencies=[Depends(verify_api_key)], # Applied to all endpoints -) - -app.include_router(router, prefix="/tasks") -``` - -### Full Example - -```python -from fastapi import FastAPI, Header, HTTPException, Depends -from taskito import Queue, current_job -from taskito.contrib.fastapi import TaskitoRouter - -queue = Queue(db_path="myapp.db") - -@queue.task() -def resize_image(image_url: str, sizes: list[int]) -> dict: - results = {} - for i, size in enumerate(sizes): - results[size] = do_resize(image_url, size) - current_job.update_progress(int((i + 1) / len(sizes) * 100)) - return results - -async def require_auth(authorization: str = Header(...)): - if not authorization.startswith("Bearer "): - raise HTTPException(401) - -app = FastAPI(title="Image Service") -app.include_router( - TaskitoRouter(queue, dependencies=[Depends(require_auth)]), - prefix="/tasks", - tags=["tasks"], -) - -# Start worker in a separate process: -# taskito worker --app myapp:queue -``` - -```bash -# Check job status -curl http://localhost:8000/tasks/jobs/01H5K6X... \ - -H "Authorization: Bearer mytoken" - -# Stream progress -curl -N http://localhost:8000/tasks/jobs/01H5K6X.../progress \ - -H "Authorization: Bearer mytoken" - -# Block for result (up to 60s) -curl http://localhost:8000/tasks/jobs/01H5K6X.../result?timeout=60 \ - -H "Authorization: Bearer mytoken" -``` diff --git a/docs/guide/execution-model.md b/docs/guide/core/execution-model.md similarity index 95% rename from docs/guide/execution-model.md rename to docs/guide/core/execution-model.md index 7d014c3..99f6a30 100644 --- a/docs/guide/execution-model.md +++ b/docs/guide/core/execution-model.md @@ -52,7 +52,7 @@ taskito worker --app myapp:queue --pool prefork The `app` parameter tells each child process where to import your `Queue` instance. It must be a module-level name (`"module:attribute"` format) — tasks defined inside functions or closures cannot be imported by child processes. -For more details, see the [Prefork Pool guide](prefork.md). +For more details, see the [Prefork Pool guide](../execution/prefork.md). ## Native Async @@ -75,7 +75,7 @@ queue = Queue( ) ``` -For more details, see the [Native Async Tasks guide](async-tasks.md). +For more details, see the [Native Async Tasks guide](../execution/async-tasks.md). ## Mixing Sync and Async diff --git a/docs/guide/core/index.md b/docs/guide/core/index.md new file mode 100644 index 0000000..1c7b4f0 --- /dev/null +++ b/docs/guide/core/index.md @@ -0,0 +1,12 @@ +# Core Concepts + +The building blocks of every taskito application. + +| Guide | Description | +|-------|-------------| +| [Tasks](tasks.md) | Define tasks with `@queue.task()`, configure retries, timeouts, and options | +| [Workers](workers.md) | Start workers, control concurrency, graceful shutdown | +| [Execution Models](execution-model.md) | How tasks move from enqueue to completion | +| [Queues & Priority](queues.md) | Named queues, priority levels, and routing | +| [Scheduling](scheduling.md) | Periodic tasks with cron expressions | +| [Workflows](workflows.md) | Chains, groups, and chords for multi-step pipelines | diff --git a/docs/guide/queues.md b/docs/guide/core/queues.md similarity index 100% rename from docs/guide/queues.md rename to docs/guide/core/queues.md diff --git a/docs/guide/scheduling.md b/docs/guide/core/scheduling.md similarity index 100% rename from docs/guide/scheduling.md rename to docs/guide/core/scheduling.md diff --git a/docs/guide/tasks.md b/docs/guide/core/tasks.md similarity index 81% rename from docs/guide/tasks.md rename to docs/guide/core/tasks.md index 371ef46..87634c6 100644 --- a/docs/guide/tasks.md +++ b/docs/guide/core/tasks.md @@ -19,22 +19,22 @@ def process_data(data: dict) -> str: | Parameter | Type | Default | Description | |---|---|---|---| -| `name` | `str \| None` | Auto-generated | Explicit task name. Defaults to `module.qualname`. | +| `name` | `str | None` | Auto-generated | Explicit task name. Defaults to `module.qualname`. | | `max_retries` | `int` | `3` | Max retry attempts before moving to DLQ. | | `retry_backoff` | `float` | `1.0` | Base delay in seconds for exponential backoff. | -| `retry_delays` | `list[float] \| None` | `None` | Per-attempt delays in seconds, overrides backoff. e.g. `[1, 5, 30]`. | -| `max_retry_delay` | `int \| None` | `None` | Cap on backoff delay in seconds (default 300 s). | +| `retry_delays` | `list[float] | None` | `None` | Per-attempt delays in seconds, overrides backoff. e.g. `[1, 5, 30]`. | +| `max_retry_delay` | `int | None` | `None` | Cap on backoff delay in seconds (default 300 s). | | `timeout` | `int` | `300` | Max execution time in seconds (hard timeout). | -| `soft_timeout` | `float \| None` | `None` | Cooperative time limit in seconds; checked via `current_job.check_timeout()`. | +| `soft_timeout` | `float | None` | `None` | Cooperative time limit in seconds; checked via `current_job.check_timeout()`. | | `priority` | `int` | `0` | Default priority (higher = more urgent). | -| `rate_limit` | `str \| None` | `None` | Rate limit string, e.g. `"100/m"`. | +| `rate_limit` | `str | None` | `None` | Rate limit string, e.g. `"100/m"`. | | `queue` | `str` | `"default"` | Named queue to submit to. | -| `circuit_breaker` | `dict \| None` | `None` | Circuit breaker config: `{"threshold": 5, "window": 60, "cooldown": 120}`. | -| `middleware` | `list[TaskMiddleware] \| None` | `None` | Per-task middleware, applied in addition to queue-level middleware. | -| `expires` | `float \| None` | `None` | Seconds until the job expires if not started. | -| `inject` | `list[str] \| None` | `None` | Worker resource names to inject as keyword arguments. See [Resource System](resources.md). | -| `serializer` | `Serializer \| None` | `None` | Per-task serializer override. Falls back to the queue-level serializer. | -| `max_concurrent` | `int \| None` | `None` | Max concurrent running instances of this task. `None` means no limit. | +| `circuit_breaker` | `dict | None` | `None` | Circuit breaker config: `{"threshold": 5, "window": 60, "cooldown": 120}`. | +| `middleware` | `list[TaskMiddleware] | None` | `None` | Per-task middleware, applied in addition to queue-level middleware. | +| `expires` | `float | None` | `None` | Seconds until the job expires if not started. | +| `inject` | `list[str] | None` | `None` | Worker resource names to inject as keyword arguments. See [Resource System](../../resources/index.md). | +| `serializer` | `Serializer | None` | `None` | Per-task serializer override. Falls back to the queue-level serializer. | +| `max_concurrent` | `int | None` | `None` | Max concurrent running instances of this task. `None` means no limit. | ```python @queue.task( diff --git a/docs/guide/workers.md b/docs/guide/core/workers.md similarity index 99% rename from docs/guide/workers.md rename to docs/guide/core/workers.md index df203c9..3706207 100644 --- a/docs/guide/workers.md +++ b/docs/guide/core/workers.md @@ -245,7 +245,7 @@ queue = Queue( ) ``` -See [Native Async Tasks](async-tasks.md) for the full guide. +See [Native Async Tasks](../execution/async-tasks.md) for the full guide. ## How Workers Work diff --git a/docs/guide/workflows.md b/docs/guide/core/workflows.md similarity index 100% rename from docs/guide/workflows.md rename to docs/guide/core/workflows.md diff --git a/docs/guide/async-tasks.md b/docs/guide/execution/async-tasks.md similarity index 88% rename from docs/guide/async-tasks.md rename to docs/guide/execution/async-tasks.md index 85e648f..096f8f0 100644 --- a/docs/guide/async-tasks.md +++ b/docs/guide/execution/async-tasks.md @@ -149,3 +149,32 @@ maturin develop --features native-async ``` Without the feature, async tasks are still enqueued and processed — they fall back to running via `asyncio.run()` on a worker thread. + +## Async Queue Methods + +All inspection methods have async variants that run in a thread pool: + +```python +# Sync +stats = queue.stats() +dead = queue.dead_letters() +new_id = queue.retry_dead(dead_id) +cancelled = queue.cancel_job(job_id) +result = job.result(timeout=30) + +# Async equivalents +stats = await queue.astats() +dead = await queue.adead_letters() +new_id = await queue.aretry_dead(dead_id) +cancelled = await queue.acancel_job(job_id) +result = await job.aresult(timeout=30) +``` + +### Async Worker + +```python +async def main(): + await queue.arun_worker(queues=["default"]) + +asyncio.run(main()) +``` diff --git a/docs/guide/execution/batch-enqueue.md b/docs/guide/execution/batch-enqueue.md new file mode 100644 index 0000000..8d1da5b --- /dev/null +++ b/docs/guide/execution/batch-enqueue.md @@ -0,0 +1,39 @@ +# Batch Enqueue + +Insert many jobs in a single SQLite transaction for high throughput. + +## `task.map()` + +```python +@queue.task() +def process(item_id): + return fetch_and_process(item_id) + +# Enqueue 1000 jobs in one transaction +jobs = process.map([(i,) for i in range(1000)]) +``` + +## `queue.enqueue_many()` + +```python +# Basic batch — same options for all jobs +jobs = queue.enqueue_many( + task_name="myapp.process", + args_list=[(i,) for i in range(1000)], + priority=5, + queue="processing", +) + +# Full parity with enqueue() — per-job overrides +jobs = queue.enqueue_many( + task_name="myapp.process", + args_list=[(i,) for i in range(100)], + delay=5.0, # uniform 5s delay for all + unique_keys=[f"item-{i}" for i in range(100)], # per-job dedup + metadata='{"source": "batch"}', # uniform metadata + expires=3600.0, # expire after 1 hour + result_ttl=600, # keep results for 10 minutes +) +``` + +Per-job lists (`delay_list`, `metadata_list`, `expires_list`, `result_ttl_list`) override uniform values when both are provided. See the [API reference](../../api/queue/index.md#queueenqueue_many) for the full parameter list. diff --git a/docs/guide/dependencies.md b/docs/guide/execution/dependencies.md similarity index 97% rename from docs/guide/dependencies.md rename to docs/guide/execution/dependencies.md index b49d6d3..69fb8bb 100644 --- a/docs/guide/dependencies.md +++ b/docs/guide/execution/dependencies.md @@ -223,7 +223,7 @@ job_success = on_valid.apply_async( ``` !!! note "Dependencies vs. Workflows" - `depends_on` is a lower-level primitive than [chains, groups, and chords](workflows.md). Use `depends_on` when you need fine-grained control over a custom DAG. Use the workflow primitives when your pipeline fits a standard pattern. + `depends_on` is a lower-level primitive than [chains, groups, and chords](../core/workflows.md). Use `depends_on` when you need fine-grained control over a custom DAG. Use the workflow primitives when your pipeline fits a standard pattern. ## Combining with Other Features diff --git a/docs/guide/execution/index.md b/docs/guide/execution/index.md new file mode 100644 index 0000000..05de40e --- /dev/null +++ b/docs/guide/execution/index.md @@ -0,0 +1,12 @@ +# Advanced Execution + +Patterns for scaling and optimizing task execution. + +| Guide | Description | +|-------|-------------| +| [Prefork Pool](prefork.md) | Process-based isolation for CPU-bound or memory-leaking tasks | +| [Native Async Tasks](async-tasks.md) | `async def` tasks with native event loop integration | +| [Result Streaming](streaming.md) | Stream partial results and progress updates in real time | +| [Dependencies](dependencies.md) | DAG-based job dependencies — run tasks in order | +| [Batch Enqueue](batch-enqueue.md) | Enqueue many jobs efficiently with `task.map()` and `enqueue_many()` | +| [Unique Tasks](unique-tasks.md) | Deduplicate active jobs with unique keys | diff --git a/docs/guide/prefork.md b/docs/guide/execution/prefork.md similarity index 100% rename from docs/guide/prefork.md rename to docs/guide/execution/prefork.md diff --git a/docs/guide/streaming.md b/docs/guide/execution/streaming.md similarity index 100% rename from docs/guide/streaming.md rename to docs/guide/execution/streaming.md diff --git a/docs/guide/execution/unique-tasks.md b/docs/guide/execution/unique-tasks.md new file mode 100644 index 0000000..a0c9ea2 --- /dev/null +++ b/docs/guide/execution/unique-tasks.md @@ -0,0 +1,14 @@ +# Unique Tasks + +Deduplicate active jobs by key — if a job with the same `unique_key` is already pending or running, the existing job is returned instead of creating a new one: + +```python +job1 = process.apply_async(args=("report",), unique_key="daily-report") +job2 = process.apply_async(args=("report",), unique_key="daily-report") +assert job1.id == job2.id # Same job, not duplicated +``` + +Once the original job completes (or fails to DLQ), the key is released and a new job can be created with the same key. + +!!! info "Implementation" + Deduplication uses a partial unique index: `CREATE UNIQUE INDEX ... ON jobs(unique_key) WHERE unique_key IS NOT NULL AND status IN (0, 1)`. Only pending and running jobs participate. The check-and-insert is atomic (transaction-protected), so concurrent calls with the same `unique_key` are handled gracefully without race conditions. diff --git a/docs/guide/events-webhooks.md b/docs/guide/extensibility/events-webhooks.md similarity index 97% rename from docs/guide/events-webhooks.md rename to docs/guide/extensibility/events-webhooks.md index dd405a1..a68225b 100644 --- a/docs/guide/events-webhooks.md +++ b/docs/guide/extensibility/events-webhooks.md @@ -75,9 +75,9 @@ queue.add_webhook( | Parameter | Type | Default | Description | |-----------|------|---------|-------------| | `url` | `str` | — | URL to POST event payloads to (must be `http://` or `https://`) | -| `events` | `list[EventType] \| None` | `None` | Event types to subscribe to. `None` means all events | -| `headers` | `dict[str, str] \| None` | `None` | Extra HTTP headers to include in requests | -| `secret` | `str \| None` | `None` | HMAC-SHA256 signing secret | +| `events` | `list[EventType] | None` | `None` | Event types to subscribe to. `None` means all events | +| `headers` | `dict[str, str] | None` | `None` | Extra HTTP headers to include in requests | +| `secret` | `str | None` | `None` | HMAC-SHA256 signing secret | | `max_retries` | `int` | `3` | Maximum delivery attempts | | `timeout` | `float` | `10.0` | HTTP request timeout in seconds | | `retry_backoff` | `float` | `2.0` | Base for exponential backoff between retries | diff --git a/docs/guide/extensibility/index.md b/docs/guide/extensibility/index.md new file mode 100644 index 0000000..b231646 --- /dev/null +++ b/docs/guide/extensibility/index.md @@ -0,0 +1,9 @@ +# Extensibility + +Extend taskito with custom behavior at every stage of the task lifecycle. + +| Guide | Description | +|-------|-------------| +| [Middleware](middleware.md) | Hook into task execution with `before`, `after`, `on_retry`, and more | +| [Serializers](serializers.md) | Custom payload serialization — msgpack, orjson, encryption | +| [Events & Webhooks](events-webhooks.md) | React to queue events and push notifications to external services | diff --git a/docs/guide/middleware.md b/docs/guide/extensibility/middleware.md similarity index 98% rename from docs/guide/middleware.md rename to docs/guide/extensibility/middleware.md index 17fc99f..dd7a42c 100644 --- a/docs/guide/middleware.md +++ b/docs/guide/extensibility/middleware.md @@ -192,4 +192,4 @@ queue = Queue(middleware=[ ]) ``` -See the [OpenTelemetry guide](otel.md) for setup details. +See the [OpenTelemetry guide](../../integrations/otel.md) for setup details. diff --git a/docs/guide/serializers.md b/docs/guide/extensibility/serializers.md similarity index 100% rename from docs/guide/serializers.md rename to docs/guide/extensibility/serializers.md diff --git a/docs/guide/index.md b/docs/guide/index.md new file mode 100644 index 0000000..37acf78 --- /dev/null +++ b/docs/guide/index.md @@ -0,0 +1,55 @@ +# User Guide + +Everything you need to build, run, and operate taskito in production. + +
+ +- **Core** + + --- + + Tasks, workers, queues, scheduling, and workflows — the fundamentals. + + [:octicons-arrow-right-24: Core concepts](core/index.md) + +- **Reliability** + + --- + + Retries, error handling, rate limiting, circuit breakers, and distributed locking. + + [:octicons-arrow-right-24: Reliability](reliability/index.md) + +- **Advanced Execution** + + --- + + Prefork pool, native async, result streaming, dependencies, and batching. + + [:octicons-arrow-right-24: Execution](execution/index.md) + +- **Extensibility** + + --- + + Middleware, custom serializers, and event hooks. + + [:octicons-arrow-right-24: Extensibility](extensibility/index.md) + +- **Observability** + + --- + + Monitoring, structured logging, and the web dashboard. + + [:octicons-arrow-right-24: Observability](observability/index.md) + +- **Operations** + + --- + + Testing, deployment, troubleshooting, and migration. + + [:octicons-arrow-right-24: Operations](operations/index.md) + +
diff --git a/docs/guide/observability/dashboard-api.md b/docs/guide/observability/dashboard-api.md new file mode 100644 index 0000000..2891728 --- /dev/null +++ b/docs/guide/observability/dashboard-api.md @@ -0,0 +1,228 @@ +# Dashboard REST API + +The dashboard exposes a JSON API you can use independently of the UI. All endpoints return `application/json` with `Access-Control-Allow-Origin: *`. + +## Stats + +### `GET /api/stats` + +Queue statistics snapshot. + +```json +{ + "pending": 12, + "running": 3, + "completed": 450, + "failed": 2, + "dead": 1, + "cancelled": 0 +} +``` + +### `GET /api/stats/queues` + +Per-queue statistics. Pass `?queue=name` for a single queue, or omit for all queues. + +```bash +curl http://localhost:8080/api/stats/queues +curl http://localhost:8080/api/stats/queues?queue=emails +``` + +## Jobs + +### `GET /api/jobs` + +Paginated list of jobs with filtering. + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `status` | `string` | all | Filter by status | +| `queue` | `string` | all | Filter by queue name | +| `task` | `string` | all | Filter by task name | +| `metadata` | `string` | — | Search metadata (LIKE) | +| `error` | `string` | — | Search error text (LIKE) | +| `created_after` | `int` | — | Unix ms timestamp | +| `created_before` | `int` | — | Unix ms timestamp | +| `limit` | `int` | `20` | Page size | +| `offset` | `int` | `0` | Pagination offset | + +```bash +curl http://localhost:8080/api/jobs?status=running&limit=10 +``` + +### `GET /api/jobs/{id}` + +Full detail for a single job. + +### `GET /api/jobs/{id}/errors` + +Error history for a job (one entry per failed attempt). + +### `GET /api/jobs/{id}/logs` + +Task execution logs for a specific job. + +### `GET /api/jobs/{id}/replay-history` + +Replay history for a job that has been replayed. + +### `GET /api/jobs/{id}/dag` + +Dependency graph for a job (nodes and edges). + +### `POST /api/jobs/{id}/cancel` + +Cancel a pending job. + +```json +{ "cancelled": true } +``` + +### `POST /api/jobs/{id}/replay` + +Replay a completed or failed job with the same payload. + +```json +{ "replay_job_id": "01H5K7Y..." } +``` + +## Dead Letters + +### `GET /api/dead-letters` + +Paginated list of dead letter entries. Supports `limit` and `offset` parameters. + +### `POST /api/dead-letters/{id}/retry` + +Re-enqueue a dead letter job. + +```json +{ "new_job_id": "01H5K7Y..." } +``` + +### `POST /api/dead-letters/purge` + +Purge all dead letters. + +```json +{ "purged": 42 } +``` + +## Metrics + +### `GET /api/metrics` + +Per-task execution metrics. + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `task` | `string` | all | Filter by task name | +| `since` | `int` | `3600` | Lookback window in seconds | + +### `GET /api/metrics/timeseries` + +Time-bucketed metrics for charts. + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `task` | `string` | all | Filter by task name | +| `since` | `int` | `3600` | Lookback window in seconds | +| `bucket` | `int` | `60` | Bucket size in seconds | + +## Logs + +### `GET /api/logs` + +Query task execution logs across all jobs. + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `task` | `string` | all | Filter by task name | +| `level` | `string` | all | Filter by log level | +| `since` | `int` | `3600` | Lookback window in seconds | +| `limit` | `int` | `100` | Max entries | + +## Infrastructure + +### `GET /api/workers` + +List registered workers with heartbeat status. + +### `GET /api/circuit-breakers` + +Current state of all circuit breakers. + +### `GET /api/resources` + +Worker resource health and pool status. + +### `GET /api/queues/paused` + +List paused queue names. + +### `POST /api/queues/{name}/pause` + +Pause a queue (jobs stop being dequeued). + +### `POST /api/queues/{name}/resume` + +Resume a paused queue. + +## Observability + +### `GET /api/proxy-stats` + +Per-handler proxy reconstruction metrics. + +### `GET /api/interception-stats` + +Interception strategy performance metrics. + +### `GET /api/scaler` + +KEDA-compatible autoscaler payload. Pass `?queue=name` for a specific queue. + +### `GET /health` + +Liveness check. Always returns `{"status": "ok"}`. + +### `GET /readiness` + +Readiness check with storage, worker, and resource health. + +### `GET /metrics` + +Prometheus metrics endpoint (requires `prometheus-client` package). + +## Using the API Programmatically + +```python +import requests + +# Health check script +stats = requests.get("http://localhost:8080/api/stats").json() + +if stats["dead"] > 0: + print(f"WARNING: {stats['dead']} dead letter(s)") + +if stats["running"] > 100: + print(f"WARNING: {stats['running']} jobs running, possible backlog") +``` + +```python +# Pause a queue during deployment +requests.post("http://localhost:8080/api/queues/default/pause") + +# ... deploy ... + +# Resume after deployment +requests.post("http://localhost:8080/api/queues/default/resume") +``` + +```python +# Retry all dead letters +dead = requests.get("http://localhost:8080/api/dead-letters?limit=100").json() +for entry in dead: + requests.post(f"http://localhost:8080/api/dead-letters/{entry['id']}/retry") + print(f"Retried {entry['task_name']}") +``` diff --git a/docs/guide/dashboard.md b/docs/guide/observability/dashboard.md similarity index 74% rename from docs/guide/dashboard.md rename to docs/guide/observability/dashboard.md index 325b207..fde9855 100644 --- a/docs/guide/dashboard.md +++ b/docs/guide/observability/dashboard.md @@ -247,234 +247,8 @@ Use the **Refresh** dropdown in the header to change how often all data refreshe - **10s** -- Low-frequency polling. - **Off** -- Manual only (reload the page to refresh). -## REST API - -The dashboard exposes a JSON API you can use independently of the UI. All endpoints return `application/json` with `Access-Control-Allow-Origin: *`. - -### Stats - -#### `GET /api/stats` - -Queue statistics snapshot. - -```json -{ - "pending": 12, - "running": 3, - "completed": 450, - "failed": 2, - "dead": 1, - "cancelled": 0 -} -``` - -#### `GET /api/stats/queues` - -Per-queue statistics. Pass `?queue=name` for a single queue, or omit for all queues. - -```bash -curl http://localhost:8080/api/stats/queues -curl http://localhost:8080/api/stats/queues?queue=emails -``` - -### Jobs - -#### `GET /api/jobs` - -Paginated list of jobs with filtering. - -| Parameter | Type | Default | Description | -|---|---|---|---| -| `status` | `string` | all | Filter by status | -| `queue` | `string` | all | Filter by queue name | -| `task` | `string` | all | Filter by task name | -| `metadata` | `string` | — | Search metadata (LIKE) | -| `error` | `string` | — | Search error text (LIKE) | -| `created_after` | `int` | — | Unix ms timestamp | -| `created_before` | `int` | — | Unix ms timestamp | -| `limit` | `int` | `20` | Page size | -| `offset` | `int` | `0` | Pagination offset | - -```bash -curl http://localhost:8080/api/jobs?status=running&limit=10 -``` - -#### `GET /api/jobs/{id}` - -Full detail for a single job. - -#### `GET /api/jobs/{id}/errors` - -Error history for a job (one entry per failed attempt). - -#### `GET /api/jobs/{id}/logs` - -Task execution logs for a specific job. - -#### `GET /api/jobs/{id}/replay-history` - -Replay history for a job that has been replayed. - -#### `GET /api/jobs/{id}/dag` - -Dependency graph for a job (nodes and edges). - -#### `POST /api/jobs/{id}/cancel` - -Cancel a pending job. - -```json -{ "cancelled": true } -``` - -#### `POST /api/jobs/{id}/replay` - -Replay a completed or failed job with the same payload. - -```json -{ "replay_job_id": "01H5K7Y..." } -``` - -### Dead Letters - -#### `GET /api/dead-letters` - -Paginated list of dead letter entries. Supports `limit` and `offset` parameters. - -#### `POST /api/dead-letters/{id}/retry` - -Re-enqueue a dead letter job. - -```json -{ "new_job_id": "01H5K7Y..." } -``` - -#### `POST /api/dead-letters/purge` - -Purge all dead letters. - -```json -{ "purged": 42 } -``` - -### Metrics - -#### `GET /api/metrics` - -Per-task execution metrics. - -| Parameter | Type | Default | Description | -|---|---|---|---| -| `task` | `string` | all | Filter by task name | -| `since` | `int` | `3600` | Lookback window in seconds | - -#### `GET /api/metrics/timeseries` - -Time-bucketed metrics for charts. - -| Parameter | Type | Default | Description | -|---|---|---|---| -| `task` | `string` | all | Filter by task name | -| `since` | `int` | `3600` | Lookback window in seconds | -| `bucket` | `int` | `60` | Bucket size in seconds | - -### Logs - -#### `GET /api/logs` - -Query task execution logs across all jobs. - -| Parameter | Type | Default | Description | -|---|---|---|---| -| `task` | `string` | all | Filter by task name | -| `level` | `string` | all | Filter by log level | -| `since` | `int` | `3600` | Lookback window in seconds | -| `limit` | `int` | `100` | Max entries | - -### Infrastructure - -#### `GET /api/workers` - -List registered workers with heartbeat status. - -#### `GET /api/circuit-breakers` - -Current state of all circuit breakers. - -#### `GET /api/resources` - -Worker resource health and pool status. - -#### `GET /api/queues/paused` - -List paused queue names. - -#### `POST /api/queues/{name}/pause` - -Pause a queue (jobs stop being dequeued). - -#### `POST /api/queues/{name}/resume` - -Resume a paused queue. - -### Observability - -#### `GET /api/proxy-stats` - -Per-handler proxy reconstruction metrics. - -#### `GET /api/interception-stats` - -Interception strategy performance metrics. - -#### `GET /api/scaler` - -KEDA-compatible autoscaler payload. Pass `?queue=name` for a specific queue. - -#### `GET /health` - -Liveness check. Always returns `{"status": "ok"}`. - -#### `GET /readiness` - -Readiness check with storage, worker, and resource health. - -#### `GET /metrics` - -Prometheus metrics endpoint (requires `prometheus-client` package). - -## Using the API Programmatically - -```python -import requests - -# Health check script -stats = requests.get("http://localhost:8080/api/stats").json() - -if stats["dead"] > 0: - print(f"WARNING: {stats['dead']} dead letter(s)") - -if stats["running"] > 100: - print(f"WARNING: {stats['running']} jobs running, possible backlog") -``` - -```python -# Pause a queue during deployment -requests.post("http://localhost:8080/api/queues/default/pause") - -# ... deploy ... - -# Resume after deployment -requests.post("http://localhost:8080/api/queues/default/resume") -``` - -```python -# Retry all dead letters -dead = requests.get("http://localhost:8080/api/dead-letters?limit=100").json() -for entry in dead: - requests.post(f"http://localhost:8080/api/dead-letters/{entry['id']}/retry") - print(f"Retried {entry['task_name']}") -``` +!!! tip "REST API" + The dashboard also exposes a full JSON API. See the [Dashboard REST API](dashboard-api.md) reference for all endpoints. ## Development diff --git a/docs/guide/observability/index.md b/docs/guide/observability/index.md new file mode 100644 index 0000000..83a13ed --- /dev/null +++ b/docs/guide/observability/index.md @@ -0,0 +1,10 @@ +# Observability + +Monitor, log, and inspect your task queue in real time. + +| Guide | Description | +|-------|-------------| +| [Monitoring & Hooks](monitoring.md) | Queue stats, progress tracking, worker heartbeat, and alerting hooks | +| [Structured Logging](logging.md) | Per-task structured logs with automatic context | +| [Web Dashboard](dashboard.md) | Built-in web UI for browsing jobs, metrics, and worker status | +| [Dashboard REST API](dashboard-api.md) | Programmatic access to all dashboard data via REST endpoints | diff --git a/docs/guide/logging.md b/docs/guide/observability/logging.md similarity index 100% rename from docs/guide/logging.md rename to docs/guide/observability/logging.md diff --git a/docs/guide/monitoring.md b/docs/guide/observability/monitoring.md similarity index 96% rename from docs/guide/monitoring.md rename to docs/guide/observability/monitoring.md index 5620a75..f144e3b 100644 --- a/docs/guide/monitoring.md +++ b/docs/guide/observability/monitoring.md @@ -117,7 +117,7 @@ The worker heartbeat is also available via the dashboard REST API at `GET /api/w taskito includes an in-process event bus for reacting to job lifecycle events (`JOB_ENQUEUED`, `JOB_COMPLETED`, `JOB_FAILED`, `JOB_RETRYING`, `JOB_DEAD`, `JOB_CANCELLED`). Events can also be delivered as signed HTTP webhooks to external systems. -[:octicons-arrow-right-24: Events & Webhooks guide](events-webhooks.md) +[:octicons-arrow-right-24: Events & Webhooks guide](../extensibility/events-webhooks.md) ## Prometheus Metrics @@ -127,7 +127,7 @@ For production monitoring, the optional Prometheus integration provides counters pip install taskito[prometheus] ``` -[:octicons-arrow-right-24: Prometheus integration](../integrations/prometheus.md) +[:octicons-arrow-right-24: Prometheus integration](../../integrations/prometheus.md) ## Hooks diff --git a/docs/guide/deployment.md b/docs/guide/operations/deployment.md similarity index 98% rename from docs/guide/deployment.md rename to docs/guide/operations/deployment.md index 4829a9b..6156a8e 100644 --- a/docs/guide/deployment.md +++ b/docs/guide/operations/deployment.md @@ -299,7 +299,7 @@ queue = Queue( ) ``` -If you need distributed workers across multiple machines, use the [Postgres backend](postgres.md) which removes the single-writer constraint and supports multi-machine deployments. Alternatively, consider [Celery or Dramatiq](../comparison.md). +If you need distributed workers across multiple machines, use the [Postgres backend](postgres.md) which removes the single-writer constraint and supports multi-machine deployments. Alternatively, consider [Celery or Dramatiq](../../comparison.md). ## SQLite Scaling Limits @@ -343,7 +343,7 @@ Increasing the pool beyond ~16 typically doesn't help, since SQLite write serial | > 50K jobs/s | — | — | — | Consider Celery + RabbitMQ for this scale | !!! note - These are rough guidelines for noop tasks. Real throughput depends on task duration, payload size, and I/O patterns. Run the [benchmark](../examples/benchmark.md) on your hardware to get accurate numbers. + These are rough guidelines for noop tasks. Real throughput depends on task duration, payload size, and I/O patterns. Run the [benchmark](../../examples/benchmark.md) on your hardware to get accurate numbers. ## Checklist diff --git a/docs/guide/operations/index.md b/docs/guide/operations/index.md new file mode 100644 index 0000000..d580852 --- /dev/null +++ b/docs/guide/operations/index.md @@ -0,0 +1,13 @@ +# Operations + +Run taskito reliably in production. + +| Guide | Description | +|-------|-------------| +| [Testing](testing.md) | Test mode, fixtures, mocking resources, and workflow testing | +| [Job Management](job-management.md) | Cancel, pause, archive, revoke, replay, and clean up jobs | +| [Troubleshooting](troubleshooting.md) | Diagnose stuck jobs, lock contention, and worker issues | +| [Deployment](deployment.md) | systemd, Docker, WAL mode, Postgres, and production checklists | +| [KEDA Autoscaling](keda.md) | Kubernetes event-driven autoscaling for workers | +| [Postgres Backend](postgres.md) | Set up and run taskito with PostgreSQL | +| [Migrating from Celery](migration.md) | Side-by-side comparison and step-by-step migration guide | diff --git a/docs/guide/operations/job-management.md b/docs/guide/operations/job-management.md new file mode 100644 index 0000000..834591e --- /dev/null +++ b/docs/guide/operations/job-management.md @@ -0,0 +1,189 @@ +# Job Management + +Manage running jobs — cancel, pause queues, archive, revoke, replay, and clean up. + +## Job Cancellation + +Cancel a pending job before it starts: + +```python +job = send_email.delay("user@example.com", "Hello", "World") +cancelled = queue.cancel_job(job.id) # True if was pending +``` + +- Returns `True` if the job was pending and is now cancelled +- Returns `False` if the job was already running, completed, or in another non-pending state +- Cancelled jobs cannot be un-cancelled + +## Result TTL & Auto-Cleanup + +### Manual Cleanup + +```python +# Purge completed jobs older than 1 hour +deleted = queue.purge_completed(older_than=3600) + +# Purge dead letters older than 24 hours +deleted = queue.purge_dead(older_than=86400) +``` + +### Automatic Cleanup + +Set `result_ttl` on the Queue to automatically purge old jobs while the worker runs: + +```python +queue = Queue( + db_path="myapp.db", + result_ttl=3600, # Auto-purge completed/dead jobs older than 1 hour +) +``` + +The scheduler checks every ~60 seconds and purges: + +- Completed jobs older than `result_ttl` +- Dead letter entries older than `result_ttl` +- Error history records older than `result_ttl` + +Set to `None` (default) to disable auto-cleanup. + +### Cascade Cleanup + +When jobs are purged — either manually via `purge_completed()` or automatically via `result_ttl` — their related child records are also deleted: + +- Error history (`job_errors`) +- Task logs (`task_logs`) +- Task metrics (`task_metrics`) +- Job dependencies (`job_dependencies`) +- Replay history (`replay_history`) + +This prevents orphaned records from accumulating when parent jobs are removed. + +```python +# Manual purge — child records are cleaned up automatically +deleted = queue.purge_completed(older_than=3600) +print(f"Purged {deleted} jobs and their related records") + +# With per-job TTL — cascade cleanup still applies +job = resize_image.apply_async( + args=("photo.jpg",), + result_ttl=600, # This job's results expire after 10 minutes +) +# When this job is purged (after 10 min), its errors, logs, +# metrics, dependencies, and replay history are also removed. +``` + +!!! note + Dead letter entries are **not** cascade-deleted — they have their own lifecycle managed by `purge_dead()`. Timestamp-based cleanup (`result_ttl`) of error history, logs, and metrics also continues to run independently, catching old records regardless of whether the parent job still exists. + +## Queue Pause/Resume + +Temporarily pause job processing on a queue without stopping the worker: + +```python +# Pause the "emails" queue +queue.pause("emails") + +# Check which queues are paused +print(queue.paused_queues()) # ["emails"] + +# Resume processing +queue.resume("emails") +``` + +Paused queues still accept new jobs — they just won't be dequeued until resumed. + +### Maintenance Window Example + +```python +# Before maintenance: pause all queues +for q in ["default", "emails", "reports"]: + queue.pause(q) +print(f"Paused: {queue.paused_queues()}") + +# ... perform maintenance ... + +# After maintenance: resume all queues +for q in ["default", "emails", "reports"]: + queue.resume(q) +``` + +## Job Archival + +Move old completed jobs to an archive table to keep the main jobs table lean: + +```python +# Archive completed jobs older than 24 hours +archived_count = queue.archive(older_than=86400) +print(f"Archived {archived_count} jobs") + +# Browse archived jobs +archived = queue.list_archived(limit=50, offset=0) +for job in archived: + print(f"{job.id}: {job.task_name} ({job.status})") +``` + +Archived jobs are no longer returned by `queue.stats()` or `queue.list_jobs()`, but remain queryable via `queue.list_archived()`. + +### Scheduled Archival + +```python +@queue.periodic(cron="0 0 2 * * *") # Daily at 2 AM +def nightly_archival(): + archived = queue.archive(older_than=7 * 86400) # Archive jobs older than 7 days + current_job.log(f"Archived {archived} jobs") +``` + +## Task Revocation + +Cancel all pending jobs for a specific task: + +```python +# Revoke all pending "send_newsletter" jobs +cancelled = queue.revoke_task("myapp.tasks.send_newsletter") +print(f"Revoked {cancelled} jobs") +``` + +## Queue Purge + +Remove all pending jobs from a specific queue: + +```python +purged = queue.purge("emails") +print(f"Purged {purged} jobs from the emails queue") +``` + +## Job Replay + +Replay a completed or dead job with the same arguments: + +```python +new_job = queue.replay(job_id) +print(f"Replayed as {new_job.id}") + +# Check replay history +history = queue.replay_history(job_id) +``` + +### Retry from Dead Letter with Replay + +```python +# List dead letters and replay them +dead = queue.dead_letters() +for entry in dead: + print(f"Replaying dead job {entry['original_job_id']}: {entry['task_name']}") + new_id = queue.retry_dead(entry["id"]) + print(f" -> New job: {new_id}") +``` + +## SQLite Configuration + +taskito configures SQLite for optimal performance: + +| Pragma | Value | Purpose | +|---|---|---| +| `journal_mode` | WAL | Concurrent reads during writes | +| `busy_timeout` | 5000ms | Wait instead of failing on lock contention | +| `synchronous` | NORMAL | Balance between safety and speed | +| `journal_size_limit` | 64MB | Prevent unbounded WAL growth | + +The connection pool uses up to 8 connections via `r2d2`. diff --git a/docs/guide/keda.md b/docs/guide/operations/keda.md similarity index 97% rename from docs/guide/keda.md rename to docs/guide/operations/keda.md index d7445fb..180549a 100644 --- a/docs/guide/keda.md +++ b/docs/guide/operations/keda.md @@ -183,7 +183,7 @@ spec: threshold: "0.8" ``` -See the [Prometheus integration](../integrations/prometheus.md) for setting up the metrics collector. +See the [Prometheus integration](../../integrations/prometheus.md) for setting up the metrics collector. ## Deploy Templates diff --git a/docs/guide/migration.md b/docs/guide/operations/migration.md similarity index 100% rename from docs/guide/migration.md rename to docs/guide/operations/migration.md diff --git a/docs/guide/postgres.md b/docs/guide/operations/postgres.md similarity index 100% rename from docs/guide/postgres.md rename to docs/guide/operations/postgres.md diff --git a/docs/guide/testing.md b/docs/guide/operations/testing.md similarity index 94% rename from docs/guide/testing.md rename to docs/guide/operations/testing.md index 55dd671..a6f4ffb 100644 --- a/docs/guide/testing.md +++ b/docs/guide/operations/testing.md @@ -44,7 +44,7 @@ with queue.test_mode(propagate_errors=False, resources=None) as results: | Parameter | Type | Default | Description | |---|---|---|---| | `propagate_errors` | `bool` | `False` | If `True`, task exceptions are re-raised immediately instead of being captured in `TestResult.error` | -| `resources` | `dict[str, Any] \| None` | `None` | Map of resource name → mock instance or `MockResource` for injection. See [Resource System](resources.md#testing-with-resources). | +| `resources` | `dict[str, Any] | None` | `None` | Map of resource name → mock instance or `MockResource` for injection. See [Resource System](../../resources/index.md#testing-with-resources). | The context manager yields a `TestResults` list that accumulates results as tasks execute. @@ -75,8 +75,8 @@ with queue.test_mode() as results: | `args` | `tuple` | Positional arguments passed to the task | | `kwargs` | `dict` | Keyword arguments passed to the task | | `return_value` | `Any` | Return value on success, `None` on failure | -| `error` | `Exception \| None` | The exception if the task failed | -| `traceback` | `str \| None` | Formatted traceback if the task failed | +| `error` | `Exception | None` | The exception if the task failed | +| `traceback` | `str | None` | Formatted traceback if the task failed | | `succeeded` | `bool` | `True` if no error | | `failed` | `bool` | `True` if an error occurred | @@ -109,8 +109,8 @@ results.filter(task_name=None, succeeded=None) -> TestResults | Parameter | Type | Description | |---|---|---| -| `task_name` | `str \| None` | Filter by exact task name | -| `succeeded` | `bool \| None` | `True` for successes, `False` for failures | +| `task_name` | `str | None` | Filter by exact task name | +| `succeeded` | `bool | None` | `True` for successes, `False` for failures | ## Testing Failures @@ -259,7 +259,7 @@ async def test_async_enqueue(task_results): ## Testing with Worker Resources -If your tasks use [worker resources](resources.md) (injected via `inject=` or `Inject["name"]`), pass mock instances through `resources=`: +If your tasks use [worker resources](../../resources/index.md) (injected via `inject=` or `Inject["name"]`), pass mock instances through `resources=`: ```python from unittest.mock import MagicMock diff --git a/docs/guide/troubleshooting.md b/docs/guide/operations/troubleshooting.md similarity index 100% rename from docs/guide/troubleshooting.md rename to docs/guide/operations/troubleshooting.md diff --git a/docs/guide/otel.md b/docs/guide/otel.md deleted file mode 100644 index 29634d6..0000000 --- a/docs/guide/otel.md +++ /dev/null @@ -1,83 +0,0 @@ -# OpenTelemetry Integration - -taskito provides optional OpenTelemetry support for distributed tracing of task execution. - -## Installation - -Install with the `otel` extra: - -```bash -pip install taskito[otel] -``` - -This installs `opentelemetry-api` as a dependency. - -## Setup - -Add `OpenTelemetryMiddleware` to your queue: - -```python -from taskito import Queue -from taskito.contrib.otel import OpenTelemetryMiddleware - -queue = Queue(middleware=[OpenTelemetryMiddleware()]) -``` - -## What Gets Traced - -Each task execution produces a span with: - -- **Span name**: `taskito.execute.` -- **Attributes**: - - `taskito.job_id` — the job ID - - `taskito.task_name` — the registered task name - - `taskito.queue` — the queue name - - `taskito.retry_count` — current retry attempt -- **Status**: `OK` on success, `ERROR` on failure (with exception recorded) -- **Events**: A `retry` event is added when a task is about to be retried - -## Configuration with Exporters - -`OpenTelemetryMiddleware` uses the standard OpenTelemetry API, so configure exporters as you normally would: - -```python -from opentelemetry import trace -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor -from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter - -# Set up the tracer provider with an OTLP exporter -provider = TracerProvider() -provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter())) -trace.set_tracer_provider(provider) - -# Now create your queue — spans will be exported automatically -from taskito import Queue -from taskito.contrib.otel import OpenTelemetryMiddleware - -queue = Queue(middleware=[OpenTelemetryMiddleware()]) -``` - -### Custom Tracer Name - -By default, spans are created under the `"taskito"` tracer. Override with: - -```python -OpenTelemetryMiddleware(tracer_name="my-service") -``` - -## Combining with Other Middleware - -`OpenTelemetryMiddleware` is a standard `TaskMiddleware`, so it composes with other middleware: - -```python -queue = Queue(middleware=[ - OpenTelemetryMiddleware(), - MyLoggingMiddleware(), -]) -``` - -!!! note "Thread safety" - `OpenTelemetryMiddleware` is thread-safe and can be used with multi-worker configurations. Internal span tracking is protected by a lock. - -See the [Middleware guide](middleware.md) for more on combining middleware. diff --git a/docs/guide/circuit-breakers.md b/docs/guide/reliability/circuit-breakers.md similarity index 100% rename from docs/guide/circuit-breakers.md rename to docs/guide/reliability/circuit-breakers.md diff --git a/docs/guide/error-handling.md b/docs/guide/reliability/error-handling.md similarity index 98% rename from docs/guide/error-handling.md rename to docs/guide/reliability/error-handling.md index c60afd2..cb89742 100644 --- a/docs/guide/error-handling.md +++ b/docs/guide/reliability/error-handling.md @@ -220,7 +220,7 @@ def on_task_failure(task_name, args, kwargs, error): ### Test Mode for Isolation -Use [test mode](testing.md) to run tasks synchronously and inspect errors without a worker: +Use [test mode](../operations/testing.md) to run tasks synchronously and inspect errors without a worker: ```python with queue.test_mode() as results: diff --git a/docs/guide/guarantees.md b/docs/guide/reliability/guarantees.md similarity index 97% rename from docs/guide/guarantees.md rename to docs/guide/reliability/guarantees.md index 0d506cd..38ae4ab 100644 --- a/docs/guide/guarantees.md +++ b/docs/guide/reliability/guarantees.md @@ -73,7 +73,7 @@ job = send_report.apply_async( ) ``` -If a job with the same `unique_key` is already pending or running, the duplicate is silently dropped. See [Advanced > Unique Tasks](advanced.md) for details. +If a job with the same `unique_key` is already pending or running, the duplicate is silently dropped. See [Unique Tasks](../execution/unique-tasks.md) for details. ### Avoid side effects that can't be undone diff --git a/docs/guide/reliability/index.md b/docs/guide/reliability/index.md new file mode 100644 index 0000000..9f7a898 --- /dev/null +++ b/docs/guide/reliability/index.md @@ -0,0 +1,12 @@ +# Reliability + +Harden your task queue for production workloads. + +| Guide | Description | +|-------|-------------| +| [Retries & Dead Letters](retries.md) | Automatic retries with exponential backoff, dead letter queue | +| [Error Handling](error-handling.md) | Task failure lifecycle, error inspection, debugging patterns | +| [Delivery Guarantees](guarantees.md) | At-least-once delivery, idempotency, and exactly-once patterns | +| [Rate Limiting](rate-limiting.md) | Throttle task execution with token bucket rate limits | +| [Circuit Breakers](circuit-breakers.md) | Protect downstream services from cascading failures | +| [Distributed Locking](locking.md) | Mutual exclusion across workers with database-backed locks | diff --git a/docs/guide/locking.md b/docs/guide/reliability/locking.md similarity index 94% rename from docs/guide/locking.md rename to docs/guide/reliability/locking.md index d5f93c5..3297587 100644 --- a/docs/guide/locking.md +++ b/docs/guide/reliability/locking.md @@ -33,8 +33,8 @@ queue.lock( | `name` | `str` | — | Lock name. All processes using the same name compete for the same lock. | | `ttl` | `int` | `30` | Lock TTL in seconds. Auto-extended if `auto_extend=True`. | | `auto_extend` | `bool` | `True` | Automatically extend the lock before it expires (background thread). | -| `owner_id` | `str \| None` | `None` | Custom owner identifier. Defaults to a random UUID per acquisition. | -| `timeout` | `float \| None` | `None` | Max seconds to wait for the lock. `None` raises immediately if unavailable. | +| `owner_id` | `str | None` | `None` | Custom owner identifier. Defaults to a random UUID per acquisition. | +| `timeout` | `float | None` | `None` | Max seconds to wait for the lock. `None` raises immediately if unavailable. | | `retry_interval` | `float` | `0.1` | Seconds between retry attempts when waiting for the lock. | ## Async Context Manager diff --git a/docs/guide/rate-limiting.md b/docs/guide/reliability/rate-limiting.md similarity index 100% rename from docs/guide/rate-limiting.md rename to docs/guide/reliability/rate-limiting.md diff --git a/docs/guide/retries.md b/docs/guide/reliability/retries.md similarity index 100% rename from docs/guide/retries.md rename to docs/guide/reliability/retries.md diff --git a/docs/integrations/fastapi.md b/docs/integrations/fastapi.md index bffa52e..ab08939 100644 --- a/docs/integrations/fastapi.md +++ b/docs/integrations/fastapi.md @@ -79,15 +79,117 @@ app.include_router( | Parameter | Type | Default | Description | |---|---|---|---| -| `include_routes` | `set[str] \| None` | `None` | If set, only register these route names. Cannot be combined with `exclude_routes`. | -| `exclude_routes` | `set[str] \| None` | `None` | If set, skip these route names. Cannot be combined with `include_routes`. | -| `dependencies` | `Sequence[Depends] \| None` | `None` | FastAPI dependencies applied to every route (e.g. auth). | +| `include_routes` | `set[str] | None` | `None` | If set, only register these route names. Cannot be combined with `exclude_routes`. | +| `exclude_routes` | `set[str] | None` | `None` | If set, skip these route names. Cannot be combined with `include_routes`. | +| `dependencies` | `Sequence[Depends] | None` | `None` | FastAPI dependencies applied to every route (e.g. auth). | | `sse_poll_interval` | `float` | `0.5` | Seconds between SSE progress polls. | | `result_timeout` | `float` | `1.0` | Default timeout for non-blocking result fetch. | | `default_page_size` | `int` | `20` | Default page size for paginated endpoints. | | `max_page_size` | `int` | `100` | Maximum allowed page size. | -| `result_serializer` | `Callable[[Any], Any] \| None` | `None` | Custom result serializer. Receives any value, must return a JSON-serializable value. | +| `result_serializer` | `Callable[[Any], Any] | None` | `None` | Custom result serializer. Receives any value, must return a JSON-serializable value. | Valid route names: `stats`, `jobs`, `job-errors`, `job-result`, `job-progress`, `cancel`, `dead-letters`, `retry-dead`, `health`, `readiness`, `resources`, `queue-stats`. -For full details on SSE streaming, blocking result fetch, Pydantic response models, and authentication, see the [Advanced guide](../guide/advanced.md#fastapi-integration). +## Blocking Result Fetch + +The `/jobs/{job_id}/result` endpoint supports an optional `timeout` query parameter (0–300 seconds). When `timeout > 0`, the request blocks until the job completes or the timeout elapses: + +```bash +# Non-blocking (default) +curl http://localhost:8000/tasks/jobs/01H5K6X.../result + +# Block up to 30 seconds for the result +curl http://localhost:8000/tasks/jobs/01H5K6X.../result?timeout=30 +``` + +## SSE Progress Streaming + +Stream real-time progress for a running job using Server-Sent Events: + +```python +import httpx + +with httpx.stream("GET", "http://localhost:8000/tasks/jobs/01H5K6X.../progress") as r: + for line in r.iter_lines(): + print(line) + # data: {"progress": 25, "status": "running"} + # data: {"progress": 50, "status": "running"} + # data: {"progress": 100, "status": "completed"} +``` + +From the browser: + +```javascript +const source = new EventSource("/tasks/jobs/01H5K6X.../progress"); +source.onmessage = (event) => { + const data = JSON.parse(event.data); + console.log(`Progress: ${data.progress}%`); + if (data.status === "completed" || data.status === "failed") { + source.close(); + } +}; +``` + +The stream sends a JSON event every 0.5 seconds while the job is active, then a final event when the job reaches a terminal state. + +## Pydantic Response Models + +All endpoints return validated Pydantic models with clean OpenAPI docs. You can import them for type-safe client code: + +```python +from taskito.contrib.fastapi import ( + StatsResponse, + JobResponse, + JobErrorResponse, + JobResultResponse, + CancelResponse, + DeadLetterResponse, + RetryResponse, +) +``` + +## Full Example + +```python +from fastapi import FastAPI, Header, HTTPException, Depends +from taskito import Queue, current_job +from taskito.contrib.fastapi import TaskitoRouter + +queue = Queue(db_path="myapp.db") + +@queue.task() +def resize_image(image_url: str, sizes: list[int]) -> dict: + results = {} + for i, size in enumerate(sizes): + results[size] = do_resize(image_url, size) + current_job.update_progress(int((i + 1) / len(sizes) * 100)) + return results + +async def require_auth(authorization: str = Header(...)): + if not authorization.startswith("Bearer "): + raise HTTPException(401) + +app = FastAPI(title="Image Service") +app.include_router( + TaskitoRouter(queue, dependencies=[Depends(require_auth)]), + prefix="/tasks", + tags=["tasks"], +) + +# Start worker in a separate process: +# taskito worker --app myapp:queue +``` + +```bash +# Check job status +curl http://localhost:8000/tasks/jobs/01H5K6X... \ + -H "Authorization: Bearer mytoken" + +# Stream progress +curl -N http://localhost:8000/tasks/jobs/01H5K6X.../progress \ + -H "Authorization: Bearer mytoken" + +# Block for result (up to 60s) +curl http://localhost:8000/tasks/jobs/01H5K6X.../result?timeout=60 \ + -H "Authorization: Bearer mytoken" +``` diff --git a/docs/integrations/otel.md b/docs/integrations/otel.md index 3a56ec0..2d89819 100644 --- a/docs/integrations/otel.md +++ b/docs/integrations/otel.md @@ -75,10 +75,10 @@ OpenTelemetryMiddleware( | Parameter | Type | Default | Description | |---|---|---|---| | `tracer_name` | `str` | `"taskito"` | OpenTelemetry tracer name. | -| `span_name_fn` | `Callable[[JobContext], str] \| None` | `None` | Custom span name builder. Receives `JobContext`, returns a string. Defaults to `.execute.`. | +| `span_name_fn` | `Callable[[JobContext], str] | None` | `None` | Custom span name builder. Receives `JobContext`, returns a string. Defaults to `.execute.`. | | `attribute_prefix` | `str` | `"taskito"` | Prefix for all span attribute keys. | -| `extra_attributes_fn` | `Callable[[JobContext], dict] \| None` | `None` | Returns extra attributes to add to each span. Receives `JobContext`. | -| `task_filter` | `Callable[[str], bool] \| None` | `None` | Predicate that receives a task name. Return `True` to trace, `False` to skip. `None` traces all tasks. | +| `extra_attributes_fn` | `Callable[[JobContext], dict] | None` | `None` | Returns extra attributes to add to each span. Receives `JobContext`. | +| `task_filter` | `Callable[[str], bool] | None` | `None` | Predicate that receives a task name. Return `True` to trace, `False` to skip. `None` traces all tasks. | ## Combining with Other Middleware @@ -94,4 +94,4 @@ queue = Queue(middleware=[ !!! note "Thread safety" `OpenTelemetryMiddleware` is thread-safe and can be used with multi-worker configurations. Internal span tracking is protected by a lock. -See the [Middleware guide](../guide/middleware.md) for more on combining middleware. +See the [Middleware guide](../guide/extensibility/middleware.md) for more on combining middleware. diff --git a/docs/integrations/prometheus.md b/docs/integrations/prometheus.md index 860e7b8..625ae70 100644 --- a/docs/integrations/prometheus.md +++ b/docs/integrations/prometheus.md @@ -34,8 +34,8 @@ PrometheusMiddleware( | Parameter | Type | Default | Description | |---|---|---|---| | `namespace` | `str` | `"taskito"` | Prefix for all metric names. | -| `extra_labels_fn` | `Callable[[JobContext], dict[str, str]] \| None` | `None` | Returns extra labels to add to job metrics. Receives `JobContext`. | -| `disabled_metrics` | `set[str] \| None` | `None` | Metric groups or individual names to skip. Groups: `"jobs"`, `"queue"`, `"resource"`, `"proxy"`, `"intercept"`. | +| `extra_labels_fn` | `Callable[[JobContext], dict[str, str]] | None` | `None` | Returns extra labels to add to job metrics. Receives `JobContext`. | +| `disabled_metrics` | `set[str] | None` | `None` | Metric groups or individual names to skip. Groups: `"jobs"`, `"queue"`, `"resource"`, `"proxy"`, `"intercept"`. | ### Metrics Tracked @@ -73,7 +73,7 @@ PrometheusStatsCollector( | `queue` | `Queue` | — | The Queue instance to poll. | | `interval` | `float` | `10.0` | Seconds between polls. | | `namespace` | `str` | `"taskito"` | Prefix for metric names. Must match `PrometheusMiddleware` namespace to share metric objects. | -| `disabled_metrics` | `set[str] \| None` | `None` | Metric groups or names to skip. Same groups as `PrometheusMiddleware`. | +| `disabled_metrics` | `set[str] | None` | `None` | Metric groups or names to skip. Same groups as `PrometheusMiddleware`. | ### Metrics Tracked @@ -152,7 +152,7 @@ queue = Queue( ) ``` -See the [Middleware guide](../guide/middleware.md) for more on combining middleware. +See the [Middleware guide](../guide/extensibility/middleware.md) for more on combining middleware. ## Example: Alert on High DLQ Size diff --git a/docs/integrations/sentry.md b/docs/integrations/sentry.md index fdcc8c7..effa922 100644 --- a/docs/integrations/sentry.md +++ b/docs/integrations/sentry.md @@ -69,9 +69,9 @@ SentryMiddleware( | Parameter | Type | Default | Description | |---|---|---|---| | `tag_prefix` | `str` | `"taskito"` | Prefix for Sentry tag keys and breadcrumb category. | -| `transaction_name_fn` | `Callable[[JobContext], str] \| None` | `None` | Custom transaction name builder. Receives `JobContext`. Defaults to `:`. | -| `task_filter` | `Callable[[str], bool] \| None` | `None` | Predicate on task name. Return `True` to report, `False` to skip. `None` reports all tasks. | -| `extra_tags_fn` | `Callable[[JobContext], dict[str, str]] \| None` | `None` | Returns extra Sentry tags to set. Receives `JobContext`. | +| `transaction_name_fn` | `Callable[[JobContext], str] | None` | `None` | Custom transaction name builder. Receives `JobContext`. Defaults to `:`. | +| `task_filter` | `Callable[[str], bool] | None` | `None` | Predicate on task name. Return `True` to report, `False` to skip. `None` reports all tasks. | +| `extra_tags_fn` | `Callable[[JobContext], dict[str, str]] | None` | `None` | Returns extra Sentry tags to set. Receives `JobContext`. | ## Combining with Other Middleware @@ -91,7 +91,7 @@ queue = Queue( ) ``` -See the [Middleware guide](../guide/middleware.md) for more on combining middleware. +See the [Middleware guide](../guide/extensibility/middleware.md) for more on combining middleware. ## Full Example diff --git a/docs/resources/observability.md b/docs/resources/observability.md index b7aec39..0cd63ab 100644 --- a/docs/resources/observability.md +++ b/docs/resources/observability.md @@ -114,7 +114,7 @@ Start the dashboard: taskito dashboard --app myapp.tasks:queue ``` -See the [Web Dashboard](../guide/dashboard.md) guide for full dashboard documentation. +See the [Web Dashboard](../guide/observability/dashboard.md) guide for full dashboard documentation. ## CLI commands diff --git a/zensical.toml b/zensical.toml index e010da1..48b0600 100644 --- a/zensical.toml +++ b/zensical.toml @@ -11,45 +11,62 @@ extra_css = ["assets/css/custom.css"] nav = [ { "Home" = "index.md" }, { "Getting Started" = [ + { "Overview" = "getting-started/index.md" }, { "Installation" = "getting-started/installation.md" }, { "Quickstart" = "getting-started/quickstart.md" }, ] }, { "User Guide" = [ - # Core — everyone reads these - { "Tasks" = "guide/tasks.md" }, - { "Workers" = "guide/workers.md" }, - { "Execution Models" = "guide/execution-model.md" }, - { "Queues & Priority" = "guide/queues.md" }, - { "Scheduling" = "guide/scheduling.md" }, - { "Workflows" = "guide/workflows.md" }, - # Reliability — production users - { "Retries & Dead Letters" = "guide/retries.md" }, - { "Error Handling" = "guide/error-handling.md" }, - { "Delivery Guarantees" = "guide/guarantees.md" }, - { "Rate Limiting" = "guide/rate-limiting.md" }, - { "Circuit Breakers" = "guide/circuit-breakers.md" }, - { "Distributed Locking" = "guide/locking.md" }, - # Advanced execution - { "Prefork Pool" = "guide/prefork.md" }, - { "Native Async Tasks" = "guide/async-tasks.md" }, - { "Result Streaming" = "guide/streaming.md" }, - { "Dependencies" = "guide/dependencies.md" }, - # Extensibility - { "Middleware" = "guide/middleware.md" }, - { "Serializers" = "guide/serializers.md" }, - { "Events & Webhooks" = "guide/events-webhooks.md" }, - # Observability - { "Monitoring & Hooks" = "guide/monitoring.md" }, - { "Structured Logging" = "guide/logging.md" }, - { "Web Dashboard" = "guide/dashboard.md" }, - # Operations - { "Testing" = "guide/testing.md" }, - { "Troubleshooting" = "guide/troubleshooting.md" }, - { "Deployment" = "guide/deployment.md" }, - { "KEDA Autoscaling" = "guide/keda.md" }, - { "Postgres Backend" = "guide/postgres.md" }, - { "Migrating from Celery" = "guide/migration.md" }, - { "Advanced" = "guide/advanced.md" }, + { "Overview" = "guide/index.md" }, + { "Core" = [ + { "Overview" = "guide/core/index.md" }, + { "Tasks" = "guide/core/tasks.md" }, + { "Workers" = "guide/core/workers.md" }, + { "Execution Models" = "guide/core/execution-model.md" }, + { "Queues & Priority" = "guide/core/queues.md" }, + { "Scheduling" = "guide/core/scheduling.md" }, + { "Workflows" = "guide/core/workflows.md" }, + ] }, + { "Reliability" = [ + { "Overview" = "guide/reliability/index.md" }, + { "Retries & Dead Letters" = "guide/reliability/retries.md" }, + { "Error Handling" = "guide/reliability/error-handling.md" }, + { "Delivery Guarantees" = "guide/reliability/guarantees.md" }, + { "Rate Limiting" = "guide/reliability/rate-limiting.md" }, + { "Circuit Breakers" = "guide/reliability/circuit-breakers.md" }, + { "Distributed Locking" = "guide/reliability/locking.md" }, + ] }, + { "Advanced Execution" = [ + { "Overview" = "guide/execution/index.md" }, + { "Prefork Pool" = "guide/execution/prefork.md" }, + { "Native Async Tasks" = "guide/execution/async-tasks.md" }, + { "Result Streaming" = "guide/execution/streaming.md" }, + { "Dependencies" = "guide/execution/dependencies.md" }, + { "Batch Enqueue" = "guide/execution/batch-enqueue.md" }, + { "Unique Tasks" = "guide/execution/unique-tasks.md" }, + ] }, + { "Extensibility" = [ + { "Overview" = "guide/extensibility/index.md" }, + { "Middleware" = "guide/extensibility/middleware.md" }, + { "Serializers" = "guide/extensibility/serializers.md" }, + { "Events & Webhooks" = "guide/extensibility/events-webhooks.md" }, + ] }, + { "Observability" = [ + { "Overview" = "guide/observability/index.md" }, + { "Monitoring & Hooks" = "guide/observability/monitoring.md" }, + { "Structured Logging" = "guide/observability/logging.md" }, + { "Web Dashboard" = "guide/observability/dashboard.md" }, + { "Dashboard REST API" = "guide/observability/dashboard-api.md" }, + ] }, + { "Operations" = [ + { "Overview" = "guide/operations/index.md" }, + { "Testing" = "guide/operations/testing.md" }, + { "Job Management" = "guide/operations/job-management.md" }, + { "Troubleshooting" = "guide/operations/troubleshooting.md" }, + { "Deployment" = "guide/operations/deployment.md" }, + { "KEDA Autoscaling" = "guide/operations/keda.md" }, + { "Postgres Backend" = "guide/operations/postgres.md" }, + { "Migrating from Celery" = "guide/operations/migration.md" }, + ] }, ] }, { "Integrations" = [ { "Overview" = "integrations/index.md" }, @@ -71,7 +88,15 @@ nav = [ ] }, { "Architecture" = "architecture.md" }, { "API Reference" = [ - { "Queue" = "api/queue.md" }, + { "Overview" = "api/index.md" }, + { "Queue" = [ + { "Constructor & Registration" = "api/queue/index.md" }, + { "Job Management" = "api/queue/jobs.md" }, + { "Queue & Stats" = "api/queue/queues.md" }, + { "Workers & Hooks" = "api/queue/workers.md" }, + { "Resources & Locking" = "api/queue/resources.md" }, + { "Events & Logs" = "api/queue/events.md" }, + ] }, { "TaskWrapper" = "api/task.md" }, { "JobResult" = "api/result.md" }, { "JobContext" = "api/context.md" }, @@ -80,6 +105,7 @@ nav = [ { "CLI" = "api/cli.md" }, ] }, { "Examples" = [ + { "Overview" = "examples/index.md" }, { "FastAPI Service" = "examples/fastapi-service.md" }, { "Notification Service" = "examples/notifications.md" }, { "Web Scraper Pipeline" = "examples/web-scraper.md" }, @@ -97,6 +123,11 @@ name = "material" features = [ "navigation.instant", "navigation.tracking", + "navigation.tabs", + "navigation.sections", + "navigation.indexes", + "navigation.path", + "navigation.prune", "navigation.top", "content.code.copy", "content.code.annotate", @@ -148,6 +179,9 @@ def_list = {} [project.plugins] search = {} +[project.extra] +generator = false + [[project.extra.social]] icon = "fontawesome/brands/github" link = "https://github.com/ByteVeda/taskito"