|
| 1 | +#!/usr/bin/env python |
| 2 | +# |
| 3 | +# Check the health of a Celery worker. |
| 4 | +# |
| 5 | +# The worker process writes and periodically touches a number of files that indicate it |
| 6 | +# is available and still healthy. If the worker becomes unhealthy for any reason, the |
| 7 | +# timestamp of when the heartbeat file was last touched will not update and the delta |
| 8 | +# becomes too big, allowing (container) orchestration to terminate and restart the |
| 9 | +# worker process. |
| 10 | +# |
| 11 | +# Example usage with Kubernetes, as a liveness probe: |
| 12 | +# |
| 13 | +# .. code-block:: yaml |
| 14 | +# |
| 15 | +# livenessProbe: |
| 16 | +# exec: |
| 17 | +# command: |
| 18 | +# - python |
| 19 | +# - /app/bin/check_celery_worker_liveness.py |
| 20 | +# initialDelaySeconds: 10 |
| 21 | +# periodSeconds: 30 # must be smaller than `MAX_WORKER_LIVENESS_DELTA` |
| 22 | +# |
| 23 | +# Reference: https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-liveness-command |
| 24 | +# |
| 25 | +# Supported environment variables: |
| 26 | +# |
| 27 | +# * ``MAX_WORKER_LIVENESS_DELTA``: maximum delta between heartbeats before reporting |
| 28 | +# failure, in seconds. Defaults to 60 (one minute). |
| 29 | + |
| 30 | + |
| 31 | +import os |
| 32 | +import sys |
| 33 | +import time |
| 34 | +from pathlib import Path |
| 35 | + |
| 36 | +HEARTBEAT_FILE = Path(__file__).parent.parent / "tmp" / "celery_worker_heartbeat" |
| 37 | +READINESS_FILE = Path(__file__).parent.parent / "tmp" / "celery_worker_ready" |
| 38 | +MAX_WORKER_LIVENESS_DELTA = int(os.getenv("MAX_WORKER_LIVENESS_DELTA", 60)) # seconds |
| 39 | + |
| 40 | + |
| 41 | +# check if worker is ready |
| 42 | +if not READINESS_FILE.is_file(): |
| 43 | + print("Celery worker not ready.") |
| 44 | + sys.exit(1) |
| 45 | + |
| 46 | +# check if worker is live |
| 47 | +if not HEARTBEAT_FILE.is_file(): |
| 48 | + print("Celery worker heartbeat not found.") |
| 49 | + sys.exit(1) |
| 50 | + |
| 51 | +# check if worker heartbeat satisfies constraint |
| 52 | +stats = HEARTBEAT_FILE.stat() |
| 53 | +worker_timestamp = stats.st_mtime |
| 54 | +current_timestamp = time.time() |
| 55 | +time_diff = current_timestamp - worker_timestamp |
| 56 | + |
| 57 | +if time_diff > MAX_WORKER_LIVENESS_DELTA: |
| 58 | + print("Celery worker heartbeat: interval exceeds constraint (60s).") |
| 59 | + sys.exit(1) |
| 60 | + |
| 61 | +print("Celery worker heartbeat found: OK.") |
| 62 | +sys.exit(0) |
0 commit comments