Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,100 @@ jobs:
jq '.[]' --raw-output <<< '${{steps.changes.outputs.all_files}}' |
xargs pre-commit run --files

python-client-lint:
name: Python Client Lint
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4

- name: Get changed Python client files
id: changes
uses: dorny/paths-filter@0bc4621a3135347011ad047f9ecf449bf72ce2bd # v3.0.0
with:
list-files: json
filters: |
python:
- added|modified: 'clients/python/**'

- name: Skip if no Python client changes
if: steps.changes.outputs.python != 'true'
run: echo "No Python client files changed, skipping pre-commit checks"

- uses: astral-sh/setup-uv@884ad927a57e558e7a70b92f2bccf9198a4be546 # v6
if: steps.changes.outputs.python == 'true'
with:
version: '0.8.2'
enable-cache: false

- uses: getsentry/action-setup-venv@5a80476d175edf56cb205b08bc58986fa99d1725 # v3.2.0
if: steps.changes.outputs.python == 'true'
with:
cache-dependency-path: uv.lock
install-cmd: uv sync --all-packages --all-groups --frozen --active

- uses: actions/cache@1bd1e32a3bdc45362d1e726936510720a7c30a57 # v4.2.0
if: steps.changes.outputs.python == 'true'
with:
path: ~/.cache/pre-commit
key: cache-epoch-1|${{ env.pythonLocation }}|${{ hashFiles('.pre-commit-config.yaml', 'uv.lock') }}

- name: Install pre-commit
if: steps.changes.outputs.python == 'true'
run: pre-commit install-hooks

- name: Run pre-commit on Python client files
if: steps.changes.outputs.python == 'true'
run: |
jq '.[]' --raw-output <<< '${{steps.changes.outputs.python_files}}' |
xargs pre-commit run --files

python-client-test:
name: Python Client Tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4

- name: Get changed Python client files
id: changes
uses: dorny/paths-filter@0bc4621a3135347011ad047f9ecf449bf72ce2bd # v3.0.0
with:
filters: |
python:
- 'clients/python/**'

- name: Skip if no Python client changes
if: steps.changes.outputs.python != 'true'
run: echo "No Python client files changed, skipping tests"

- uses: astral-sh/setup-uv@884ad927a57e558e7a70b92f2bccf9198a4be546 # v6
if: steps.changes.outputs.python == 'true'
with:
version: '0.8.2'
enable-cache: false

- uses: getsentry/action-setup-venv@5a80476d175edf56cb205b08bc58986fa99d1725 # v3.2.0
if: steps.changes.outputs.python == 'true'
with:
cache-dependency-path: uv.lock
install-cmd: uv sync --all-packages --all-groups --frozen --active

- name: Start devservices
if: steps.changes.outputs.python == 'true'
run: |
devservices up --mode=client

- name: Run pytest with coverage
if: steps.changes.outputs.python == 'true'
run: make python-test

- uses: codecov/codecov-action@0565863a31f2c772f9f0395002a31e3f06189574 # v5
if: steps.changes.outputs.python == 'true'
with:
files: clients/python/coverage.xml
slug: getsentry/taskbroker
token: ${{ secrets.CODECOV_TOKEN }}
flags: python-client

test:
name: Tests (ubuntu)
runs-on: ubuntu-latest
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

# Editors
.DS_Store
.claude

# Sqlite artifacts
*.sqlite
Expand All @@ -13,6 +14,8 @@
**/.pytest_cache/
**/integration_tests/.tests_output/
**/.venv
clients/python/.coverage*
clients/python/coverage.xml

.VERSION
VERSION
9 changes: 9 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ setup:
@test -n "$$CI" || devenv sync
.PHONY: setup

python-venv: ## Build Python virtual environment
python -m venv .venv
uv sync --all-packages --all-groups
.PHONY: python-venv

# Builds

build: ## Build all features without debug symbols
Expand Down Expand Up @@ -38,6 +43,10 @@ unit-test: ## Run unit tests
cargo test
.PHONY: unit-test

python-test: ## Run Python client tests
cd clients/python && uv run pytest --cov=src/taskbroker_client --cov-report=xml --cov-report=term
.PHONY: python-test

reset-kafka: setup ## Reset kafka
devservices down
-docker container rm kafka-kafka-1
Expand Down
1 change: 1 addition & 0 deletions clients/python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ dev = [
"black==24.10.0",
"pre-commit>=4.2.0",
"pytest>=8.3.3",
"pytest-cov>=4.0.0",
"flake8>=7.3.0",
"isort>=5.13.2",
"mypy>=1.17.1",
Expand Down
Empty file.
22 changes: 22 additions & 0 deletions clients/python/src/examples/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from arroyo.backends.kafka import KafkaProducer

from examples.store import StubAtMostOnce
from taskbroker_client.app import TaskbrokerApp


def producer_factory(topic: str) -> KafkaProducer:
# TODO use env vars for kafka host/port
config = {
"bootstrap.servers": "127.0.0.1:9092",
"compression.type": "lz4",
"message.max.bytes": 50000000, # 50MB
}
return KafkaProducer(config)


app = TaskbrokerApp(
name="example-app",
producer_factory=producer_factory,
at_most_once_store=StubAtMostOnce(),
)
app.set_modules(["examples.tasks"])
96 changes: 96 additions & 0 deletions clients/python/src/examples/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import logging
import os
import time

import click

logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s %(message)s",
handlers=[logging.StreamHandler()],
)


@click.group()
def main() -> None:
click.echo("Example application")


@main.command()
@click.option(
"--count",
help="The number of tasks to generate",
default=1,
)
def spawn(count: int = 1) -> None:
from examples.tasks import timed_task

click.echo(f"Spawning {count} tasks")
for _ in range(0, count):
timed_task.delay(sleep_seconds=0.1)
click.echo("Complete")


@main.command()
def scheduler() -> None:
from redis import StrictRedis

from examples.app import app
from taskbroker_client.metrics import NoOpMetricsBackend
from taskbroker_client.scheduler import RunStorage, ScheduleRunner, crontab

redis_host = os.getenv("REDIS_HOST") or "localhost"
redis_port = int(os.getenv("REDIS_PORT") or 6379)

# Ensure all task modules are loaded.
app.load_modules()

redis = StrictRedis(host=redis_host, port=redis_port, decode_responses=True)
metrics = NoOpMetricsBackend()
run_storage = RunStorage(metrics=metrics, redis=redis)
scheduler = ScheduleRunner(app, run_storage)

# Define a scheduled task
scheduler.add(
"simple-task", {"task": "examples:examples.simple_task", "schedule": crontab(minute="*/1")}
)

click.echo("Starting scheduler")
scheduler.log_startup()
while True:
sleep_time = scheduler.tick()
time.sleep(sleep_time)


@main.command()
@click.option(
"--rpc-host",
help="The address of the taskbroker this worker connects to.",
default="127.0.0.1:50051",
)
@click.option(
"--concurrency",
help="The number of child processes to start.",
default=2,
)
def worker(rpc_host: str, concurrency: int) -> None:
from taskbroker_client.worker import TaskWorker

click.echo("Starting worker")
worker = TaskWorker(
app_module="examples.app:app",
broker_hosts=[rpc_host],
max_child_task_count=100,
concurrency=concurrency,
child_tasks_queue_maxsize=concurrency * 2,
result_queue_maxsize=concurrency * 2,
rebalance_after=32,
processing_pool_name="examples",
process_type="forkserver",
)
exitcode = worker.start()
raise SystemExit(exitcode)


if __name__ == "__main__":
main()
Empty file.
12 changes: 12 additions & 0 deletions clients/python/src/examples/store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from taskbroker_client.types import AtMostOnceStore


class StubAtMostOnce(AtMostOnceStore):
def __init__(self) -> None:
self._keys: dict[str, str] = {}

def add(self, key: str, value: str, timeout: int) -> bool:
if key in self._keys:
return False
self._keys[key] = value
return True
74 changes: 74 additions & 0 deletions clients/python/src/examples/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""
Example taskbroker application with tasks

Used in tests for the worker.
"""

import logging
from time import sleep
from typing import Any

from redis import StrictRedis

from examples.app import app
from taskbroker_client.retry import LastAction, NoRetriesRemainingError, Retry, RetryTaskError
from taskbroker_client.retry import retry_task as retry_task_helper

logger = logging.getLogger(__name__)


# Create a namespace and register tasks
exampletasks = app.taskregistry.create_namespace("examples")


@exampletasks.register(name="examples.simple_task")
def simple_task(*args: list[Any], **kwargs: dict[str, Any]) -> None:
sleep(0.1)
logger.debug("simple_task complete")


@exampletasks.register(name="examples.retry_task", retry=Retry(times=2))
def retry_task() -> None:
raise RetryTaskError


@exampletasks.register(name="examples.fail_task")
def fail_task() -> None:
raise ValueError("nope")


@exampletasks.register(name="examples.at_most_once", at_most_once=True)
def at_most_once_task() -> None:
pass


@exampletasks.register(
name="examples.retry_state", retry=Retry(times=2, times_exceeded=LastAction.Deadletter)
)
def retry_state() -> None:
try:
retry_task_helper()
except NoRetriesRemainingError:
# TODO read host from env vars
redis = StrictRedis(host="localhost", port=6379, decode_responses=True)
redis.set("no-retries-remaining", 1)


@exampletasks.register(
name="examples.will_retry",
retry=Retry(times=3, on=(RuntimeError,), times_exceeded=LastAction.Discard),
)
def will_retry(failure: str) -> None:
if failure == "retry":
logger.debug("going to retry with explicit retry error")
raise RetryTaskError
if failure == "raise":
logger.debug("raising runtimeerror")
raise RuntimeError("oh no")
logger.debug("got %s", failure)


@exampletasks.register(name="examples.timed")
def timed_task(sleep_seconds: float | str, *args: list[Any], **kwargs: dict[str, Any]) -> None:
sleep(float(sleep_seconds))
logger.debug("timed_task complete")
Loading
Loading