Skip to content

Commit 7781d0d

Browse files
authored
Add new post (#11)
1 parent 8510fb7 commit 7781d0d

File tree

1 file changed

+146
-0
lines changed

1 file changed

+146
-0
lines changed
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
---
2+
title: Tracking Celery Tasks with Signals
3+
description: "Track Celery tasks to capture user information, timing metrics, and contextual data for
4+
enhanced debugging and performance analysis."
5+
author: hemantapkh
6+
date: 2025-06-08 07:55:00 +0000
7+
categories: [Monitoring]
8+
tags: [python, celery, monitoring, logging]
9+
pin: false
10+
math: false
11+
mermaid: false
12+
image:
13+
path: https://assets.hemantapkh.com/tracking-celery-tasks-with-signals/thumbnail.webp
14+
---
15+
16+
In this post, I’ll share how I use Celery signals in Python to monitor tasks and make debugging easier.
17+
The focus is on two main goals:
18+
19+
- **Track timing metrics:** Measure task execution time, queue wait time, and total time to gain performance
20+
insights and identify improvements.
21+
22+
- **Enrich and correlate task logs:** Attach contextual data like user details and request IDs to tasks, binding
23+
this metadata to all logs for improved traceability and debugging.
24+
25+
## Understanding Celery Signals
26+
27+
Celery signals are a mechanism that lets applications listen for specific events during a task’s lifecycle,
28+
such as when a task is dispatched, starts, or completes. They allow you to attach custom logic to these events,
29+
enabling actions like logging metadata or tracking performance without tightly coupling your code.
30+
We will use these signals to capture task metrics and contextual data for better observability and debugging.
31+
32+
## Adding Metadata at Task Dispatch
33+
34+
The `before_task_publish` signal is triggered just before a task is sent to the message broker.
35+
let's use it to attach metadata, such as user information, request ID, queue name, and dispatch timestamp,
36+
to the task headers.
37+
38+
```python
39+
import time
40+
from celery import signals
41+
import structlog
42+
43+
logger = structlog.get_logger()
44+
45+
@signals.before_task_publish.connect
46+
def before_task_publish_handler(headers: dict, **kwargs: dict):
47+
# Replace with your application's logic to fetch request_id and user_info or add other info
48+
request_id = "your_request_id"
49+
user_info = {"user_id": "<user_id>", "org_id": "<org_id>"}
50+
51+
headers["request_id"] = request_id
52+
headers["user_info"] = user_info
53+
headers["publish_time"] = time.time()
54+
55+
logger.info(
56+
"Celery event",
57+
state="before_task_publish",
58+
task_id=headers.get("id"),
59+
task_name=headers.get("task"),
60+
request_id=request_id,
61+
queue_name=kwargs.get("routing_key"),
62+
**user_info,
63+
)
64+
```
65+
66+
## Capturing Task Start Time and Context
67+
68+
The `task_prerun` signal is triggered just before a task begins execution on a worker. Let's use it to record the
69+
task’s start time and bind contextual data (like `task_id` and `request_id`) to the `structlog` contextvars. This
70+
ensures all logs generated during task execution are correlated with this metadata, improving traceability.
71+
72+
```python
73+
import time
74+
75+
from celery import signals, Task
76+
import structlog
77+
78+
@signals.task_prerun.connect
79+
def on_task_prerun(task_id: str, task: Task, **kwargs):
80+
task.request.prerun_time = time.time() # Record task start time
81+
request_id = getattr(task.request, "request_id", None)
82+
queue_name = task.request.get("delivery_info", {}).get("routing_key")
83+
user_info = getattr(task.request, "user_info", {})
84+
85+
# Bind task_id and request_id to structlog context for log correlation
86+
structlog.contextvars.bind_contextvars(
87+
request_id=request_id,
88+
task_id=task_id,
89+
)
90+
91+
logger.info(
92+
"Celery event",
93+
state="task_prerun",
94+
task_id=task_id,
95+
task_name=task.name,
96+
request_id=request_id,
97+
queue_name=queue_name,
98+
**user_info,
99+
)
100+
```
101+
102+
103+
## Calculating Metrics After Task Completion
104+
105+
The `task_postrun` signal is triggered after a task completes, whether it succeeds or fails. Here, we calculate
106+
key metrics: queue wait time (time spent in the queue before execution), execution time (time spent running the task),
107+
and total time (from dispatch to completion). These metrics help identify bottlenecks and optimize task performance.
108+
109+
```python
110+
import time
111+
112+
from celery import Task
113+
114+
@signals.task_postrun.connect
115+
def on_task_postrun(task_id: str, task: Task, state: str, **kwargs: dict):
116+
current_time = time.time()
117+
118+
user_info = getattr(task.request, "user_info", {})
119+
request_id = getattr(task.request, "request_id", "")
120+
queue_name = task.request.get("delivery_info", {}).get("routing_key")
121+
122+
# Calculate timing metrics
123+
total_time = current_time - task.request.publish_time if hasattr(task.request, "publish_time") else None
124+
execution_time = current_time - task.request.prerun_time if hasattr(task.request, "prerun_time") else None
125+
queue_wait_time = (
126+
task.request.prerun_time - task.request.publish_time
127+
if hasattr(task.request, "publish_time") and hasattr(task.request, "prerun_time")
128+
else None
129+
)
130+
131+
logger.info(
132+
"Celery event",
133+
state="task_postrun",
134+
task_id=task_id,
135+
task_name=task.name,
136+
request_id=request_id,
137+
queue_name=queue_name,
138+
queue_wait_time=queue_wait_time,
139+
total_time=total_time,
140+
execution_time=execution_time,
141+
result=state,
142+
**user_info,
143+
)
144+
```
145+
146+
Now, you'll receive timing logs for each task after its completion, along with important details like the request ID and user information. Additionally, task logs are linked with the request_id and task_id, ensuring better traceability and easier debugging. 🎉

0 commit comments

Comments
 (0)