Skip to content

Conversation

@avirajsingh7
Copy link
Collaborator

@avirajsingh7 avirajsingh7 commented Dec 2, 2025

Target issue is #PLEASE_TYPE_ISSUE_NUMBER

Introduce resource monitoring for Celery workers to manage CPU and memory usage effectively. This change addresses the need for better resource management during task execution, preventing overload and ensuring smoother operation.

Checklist

Before submitting a pull request, please ensure that you mark these tasks.

  • Ran fastapi run --reload app/main.py or docker compose up in the repository root and test.

  • If you've fixed a bug or added code that is tested and has test cases.

Notes

Please add here if any other information is required for the reviewer.

Summary by CodeRabbit

  • New Features

    • Automatic resource monitoring for task workers to track CPU and memory and pause/resume task consumption when thresholds are crossed
    • Configurable monitoring options: enable/disable, check interval, CPU threshold, and memory threshold (added to environment/config)
  • Chores

    • Added runtime dependency for resource inspection (psutil)

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Dec 2, 2025

Walkthrough

Adds optional Celery worker resource monitoring: new environment keys and Settings fields, a new ResourceMonitor class using psutil, Celery signal handlers to start/stop monitoring and track active tasks, and integration that pauses/resumes queue consumption based on CPU/memory thresholds.

Changes

Cohort / File(s) Change Summary
Environment Configuration
\.env.example``
Added non-commented env vars: RESOURCE_MONITORING_ENABLED, RESOURCE_CHECK_INTERVAL, CPU_THRESHOLD_PERCENT, MEMORY_THRESHOLD_PERCENT.
Settings Configuration
backend/app/core/config.py
Added Settings fields with defaults: RESOURCE_MONITORING_ENABLED: bool = True, RESOURCE_CHECK_INTERVAL: int = 5, CPU_THRESHOLD_PERCENT: float = 75.0, MEMORY_THRESHOLD_PERCENT: float = 75.0.
Celery integration & hooks
backend/app/celery/celery_app.py
Added logger, imports for Celery control/signals, and four signal handlers: start_resource_monitoring (worker_ready), stop_resource_monitoring (worker_shutdown), track_task_start (task_prerun), track_task_end (task_postrun). Wires resource_monitor into worker lifecycle.
Resource monitoring implementation
backend/app/celery/resource_monitor.py
New ResourceMonitor class and module-level resource_monitor instance: psutil-based CPU/memory sampling, threshold evaluation, thread-based monitoring loop, pause/resume via Celery Control API for configured queues, start/stop lifecycle, and active-task counters with locking and logging.
Dependencies
backend/pyproject.toml
Added dependency: psutil>=7.1.3.

Sequence Diagram(s)

sequenceDiagram
    participant Worker as Celery Worker
    participant Signals as Celery Signals
    participant Monitor as ResourceMonitor
    participant Control as Celery Control API
    participant OS as Host OS (psutil)
    participant Task as Task Executor

    Worker->>Signals: worker_ready
    Signals->>Monitor: start_monitoring(control, hostname, queues)
    Monitor->>Monitor: spawn monitoring thread

    loop every check_interval
        Monitor->>OS: get_cpu_usage()
        OS-->>Monitor: cpu %
        Monitor->>OS: get_memory_usage()
        OS-->>Monitor: mem %
        alt cpu/mem > thresholds
            Monitor->>Control: cancel_consumer(queues)
            Monitor->>Monitor: set paused = true
        else paused and below thresholds
            Monitor->>Control: add_consumer(queues)
            Monitor->>Monitor: set paused = false
        end
    end

    par Task lifecycle
        Task->>Signals: task_prerun
        Signals->>Monitor: increment_active_tasks()
        Task->>Task: execute
        Task->>Signals: task_postrun
        Signals->>Monitor: decrement_active_tasks()
    end

    Worker->>Signals: worker_shutdown
    Signals->>Monitor: stop_monitoring()
    alt paused
        Monitor->>Control: add_consumer(queues)
    end
    Monitor->>Monitor: stop monitoring thread
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

  • Review threading, locking, and shutdown behavior in backend/app/celery/resource_monitor.py.
  • Validate Celery Control API calls and queue name resolution in backend/app/celery/celery_app.py.
  • Confirm psutil usage, threshold math, and logging/error handling.

Possibly related PRs

Suggested labels

enhancement

Suggested reviewers

  • AkhileshNegi
  • nishika26

Poem

🐰 I sniff the CPU and count each byte,

I pause the queues when numbers bite.
I watch the tasks, then let them flow,
A gentle hop so systems grow.
Hooray — the worker hums just right.

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title directly and accurately summarizes the main change: adding resource monitoring for Celery workers to manage CPU and memory usage, which aligns with all files modified in the changeset.
Docstring Coverage ✅ Passed Docstring coverage is 93.75% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch monitor/celery

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@avirajsingh7 avirajsingh7 changed the title ## Resource Monitoring for Celery workers to manage CPU and memory usage effectively Resource Monitoring for Celery workers to manage CPU and memory usage effectively Dec 2, 2025
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (5)
backend/app/core/config.py (1)

125-129: Configuration fields look good.

Type hints are correctly applied. Consider adding validation to ensure threshold percentages are within the valid range (0-100) to prevent misconfiguration.

Example using Pydantic's field_validator:

from pydantic import field_validator

@field_validator("CPU_THRESHOLD_PERCENT", "MEMORY_THRESHOLD_PERCENT")
@classmethod
def validate_threshold_percent(cls, v: float) -> float:
    if not 0 <= v <= 100:
        raise ValueError("Threshold must be between 0 and 100")
    return v
backend/app/celery/celery_app.py (1)

120-122: Redundant f-string concatenation.

The two f-strings can be combined into a single f-string.

         logger.info(
-            f"Resource monitor initialized - " f"Queues: {', '.join(queue_names)}"
+            f"Resource monitor initialized - Queues: {', '.join(queue_names)}"
         )
backend/app/celery/resource_monitor.py (3)

6-6: Use built-in list instead of deprecated typing.List.

Per Ruff hint and Python 3.9+ conventions, use list[str] directly.

-from typing import List
...
-    self.queue_names: List[str] = []
+    self.queue_names: list[str] = []

34-37: Add proper type hints for optional attributes.

These attributes are initialized to None but lack Optional type hints, which is important per the coding guidelines for a Python 3.11+ project.

+from typing import Optional
+
         # Will be set by signal
-        self.control: Control = None
-        self.worker_hostname = None
-        self.queue_names: List[str] = []
+        self.control: Optional[Control] = None
+        self.worker_hostname: Optional[str] = None
+        self.queue_names: list[str] = []

120-140: Potential long lock hold time due to network calls within lock context.

pause_consumer() and resume_consumer() make Celery Control API calls (network I/O) while the lock is held. If these calls are slow or time out, other threads (task tracking) will be blocked.

Consider restructuring to minimize lock scope:

             with self.lock:
+                current_paused = self.is_paused
+                should_pause_action = should_pause_now and not current_paused
+                should_resume_action = not should_pause_now and current_paused
+
+            # Perform network calls outside the lock
+            if should_pause_action:
-                # Pause if resources exceeded and not already paused
-                if should_pause_now and not self.is_paused:
                     logger.warning(...)
                     self.pause_consumer()
-
-                # Resume if resources OK and currently paused
-                elif not should_pause_now and self.is_paused:
+            elif should_resume_action:
                     logger.info(...)
                     self.resume_consumer()
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 17f427e and aca424b.

📒 Files selected for processing (4)
  • .env.example (1 hunks)
  • backend/app/celery/celery_app.py (3 hunks)
  • backend/app/celery/resource_monitor.py (1 hunks)
  • backend/app/core/config.py (1 hunks)
🧰 Additional context used
📓 Path-based instructions (4)
.env.example

📄 CodeRabbit inference engine (CLAUDE.md)

Provide .env.example as the template for .env

Files:

  • .env.example
**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

Use type hints in Python code (Python 3.11+ project)

Files:

  • backend/app/celery/resource_monitor.py
  • backend/app/core/config.py
  • backend/app/celery/celery_app.py
backend/app/celery/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

Keep Celery app configuration (priority queues, beat scheduler, workers) under backend/app/celery/

Files:

  • backend/app/celery/resource_monitor.py
  • backend/app/celery/celery_app.py
backend/app/core/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

Place core functionality (config, DB session, security, exceptions, middleware) in backend/app/core/

Files:

  • backend/app/core/config.py
🧠 Learnings (2)
📚 Learning: 2025-10-08T12:05:01.317Z
Learnt from: CR
Repo: ProjectTech4DevAI/ai-platform PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-10-08T12:05:01.317Z
Learning: Applies to backend/app/celery/**/*.py : Keep Celery app configuration (priority queues, beat scheduler, workers) under backend/app/celery/

Applied to files:

  • backend/app/celery/celery_app.py
📚 Learning: 2025-10-08T12:05:01.317Z
Learnt from: CR
Repo: ProjectTech4DevAI/ai-platform PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-10-08T12:05:01.317Z
Learning: Applies to backend/app/celery/tasks/**/*.py : Define Celery tasks under backend/app/celery/tasks/

Applied to files:

  • backend/app/celery/celery_app.py
🧬 Code graph analysis (1)
backend/app/celery/celery_app.py (2)
backend/app/core/config.py (1)
  • COMPUTED_CELERY_WORKER_CONCURRENCY (133-138)
backend/app/celery/resource_monitor.py (4)
  • start_monitoring (155-170)
  • stop_monitoring (172-181)
  • increment_active_tasks (183-186)
  • decrement_active_tasks (188-191)
🪛 dotenv-linter (4.0.0)
.env.example

[warning] 92-92: [UnorderedKey] The RESOURCE_CHECK_INTERVAL key should go before the RESOURCE_MONITORING_ENABLED key

(UnorderedKey)


[warning] 93-93: [UnorderedKey] The CPU_THRESHOLD_PERCENT key should go before the RESOURCE_CHECK_INTERVAL key

(UnorderedKey)


[warning] 94-94: [UnorderedKey] The MEMORY_THRESHOLD_PERCENT key should go before the RESOURCE_CHECK_INTERVAL key

(UnorderedKey)

🪛 GitHub Actions: AI Platform CI
backend/app/celery/resource_monitor.py

[error] 1-1: ModuleNotFoundError: No module named 'psutil' during import in resource_monitor. This caused pytest collection to fail while running 'coverage run --source=app -m pytest'.

🪛 Ruff (0.14.7)
backend/app/celery/resource_monitor.py

6-6: typing.List is deprecated, use list instead

(UP035)


118-118: Use f-string instead of format call

Convert to f-string

(UP032)

backend/app/celery/celery_app.py

99-99: Unused function argument: kwargs

(ARG001)


132-132: Unused function argument: kwargs

(ARG001)


141-141: Unused function argument: kwargs

(ARG001)


148-148: Unused function argument: kwargs

(ARG001)

🔇 Additional comments (4)
backend/app/celery/celery_app.py (2)

98-128: Resource monitoring initialization is well-structured.

The signal handler properly guards with RESOURCE_MONITORING_ENABLED, has error handling, and correctly initializes the monitor with control, hostname, and queue names. The **kwargs parameter is required by Celery's signal contract.


131-151: Signal handlers correctly implement lifecycle hooks.

The stop_resource_monitoring, track_task_start, and track_task_end handlers properly check RESOURCE_MONITORING_ENABLED before invoking the monitor. The **kwargs parameters are required by Celery's signal signature pattern and should not be removed despite the static analysis hint.

backend/app/celery/resource_monitor.py (2)

172-181: Potential race condition in stop_monitoring.

Setting _should_stop = True outside the lock, then acquiring the lock to check is_paused, could race with monitor_loop. The monitor thread might pause the consumer after stop_monitoring resumes it but before the thread exits. This is a minor edge case since the thread is daemon and the process is shutting down.


194-194: Module-level singleton instantiated at import time.

The resource_monitor instance is created when the module is imported, reading settings defaults at that moment. This is acceptable for the current design but be aware that any dynamic settings changes after import won't affect this instance's thresholds.

Comment on lines +39 to +41
def get_cpu_usage(self) -> float:
"""Get current CPU usage percentage."""
return psutil.cpu_percent(interval=1)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🌐 Web query:

psutil cpu_percent interval parameter behavior blocking

💡 Result:

Short summary — psutil.cpu_percent interval behavior:

  • If interval > 0: the call blocks for that many seconds and measures CPU between start and end (blocking). [1]
  • If interval == 0.0 or interval is None: it returns immediately and reports CPU usage since the last call (or since module import on first call). The very first call with 0.0/None returns a meaningless 0.0 and should be ignored / followed by a subsequent call (>= ~0.1s later for accuracy). [1][2]
  • Note: newer psutil versions maintain per-thread last-sample state so calls from different threads are independent; cpu_percent became thread-safe in recent releases. [1]

Sources: psutil docs. [1] psutil (latest) cpu_percent; [2] psutil historical docs.


🏁 Script executed:

cat -n backend/app/celery/resource_monitor.py

Repository: ProjectTech4DevAI/ai-platform

Length of output: 8584


🏁 Script executed:

fd -t f "config.py" --path-separator / | head -20

Repository: ProjectTech4DevAI/ai-platform

Length of output: 306


🏁 Script executed:

grep -n "RESOURCE_CHECK_INTERVAL\|CPU_THRESHOLD\|MEMORY_THRESHOLD" backend/app/core/config.py

Repository: ProjectTech4DevAI/ai-platform

Length of output: 204


The cpu_percent(interval=1) call adds measurable latency to the monitoring cycle.

With check_interval set to 5 seconds, each monitoring iteration blocks for 1 second, making the effective cycle 6 seconds instead of 5. While this runs in a separate daemon thread and doesn't block Celery's main worker, it reduces monitoring frequency by ~17%. If tighter monitoring cadence is needed, consider interval=None with cached state (handling the first call returning 0.0), or accept the current behavior if the accuracy trade-off is acceptable.

🤖 Prompt for AI Agents
In backend/app/celery/resource_monitor.py around lines 39-41, the use of
psutil.cpu_percent(interval=1) blocks for 1s per call and lengthens the
monitoring loop; change to a non-blocking pattern by calling
psutil.cpu_percent(interval=None) and handle the initial 0.0 result (either by
priming once at startup and discarding that first value, or by returning the
last cached value until a real reading is available) so the monitoring cadence
stays close to check_interval without the added 1s delay.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
backend/pyproject.toml (1)

39-39: Verify version constraint strategy for consistency.

The psutil 7.1.3 release (uploaded Nov 2, 2025) is valid and supports Python 3.12+, making it compatible with your project's Python requirements. However, the version constraint >=7.1.3 lacks an upper bound, which diverges from the versioning strategy used by similar dependencies in your project (e.g., celery>=5.3.0,<6.0.0, redis>=5.0.0,<6.0.0, flower>=2.0.1).

Consider whether the permissive constraint is intentional or if an upper bound should be added for consistency and to mitigate against potential breaking changes in future major versions.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between aca424b and c5da53f.

⛔ Files ignored due to path filters (1)
  • backend/uv.lock is excluded by !**/*.lock
📒 Files selected for processing (1)
  • backend/pyproject.toml (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: checks (3.11.7, 6)

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
backend/app/celery/resource_monitor.py (2)

6-6: Replace deprecated typing.List with built-in list.

Python 3.11+ supports using list directly in type hints. Update all occurrences (lines 6 and 37).

-from typing import List

Then update line 37:

-        self.queue_names: List[str] = []
+        self.queue_names: list[str] = []

116-119: Remove temporary debug logging and fix typo.

This debug logging with typo "Memmory" and .format() was flagged in a previous review and should be addressed.

-                # Remove this line later
-                logger.info(
-                    "Memmory Usage: {:.2f}%, CPU Usage: {:.2f}%".format(memory, cpu)
-                )

Or keep at debug level:

-                # Remove this line later
-                logger.info(
-                    "Memmory Usage: {:.2f}%, CPU Usage: {:.2f}%".format(memory, cpu)
-                )
+                logger.debug(f"Memory Usage: {memory:.2f}%, CPU Usage: {cpu:.2f}%")
🧹 Nitpick comments (1)
backend/app/celery/resource_monitor.py (1)

155-170: Consider tracking the monitoring thread for graceful cleanup.

The thread reference is not stored, making it impossible for stop_monitoring() to wait for completion. While the daemon flag ensures the thread won't block process exit, storing the reference would allow a graceful join with timeout.

     def __init__(
         self,
         cpu_threshold: float = settings.CPU_THRESHOLD_PERCENT,
         memory_threshold: float = settings.MEMORY_THRESHOLD_PERCENT,
         check_interval: int = settings.RESOURCE_CHECK_INTERVAL,
     ):
         self.cpu_threshold = cpu_threshold
         self.memory_threshold = memory_threshold
         self.check_interval = check_interval
         self.is_paused = False
         self.active_tasks = 0
         self.lock = threading.Lock()
         self._should_stop = False
+        self._monitor_thread: threading.Thread | None = None

         # Will be set by signal
         self.control: Control | None = None
         self.worker_hostname: str | None = None
         self.queue_names: list[str] = []

Then update start_monitoring:

     def start_monitoring(self):
         """Start the monitoring thread."""
         if not self.control or not self.worker_hostname:
             logger.error("Cannot start monitoring: control or worker hostname not set")
             return

         if not self.queue_names:
             logger.error("Cannot start monitoring: no queues configured")
             return

         self._should_stop = False
-        monitor_thread = threading.Thread(
+        self._monitor_thread = threading.Thread(
             target=self.monitor_loop, daemon=True, name="ResourceMonitor"
         )
-        monitor_thread.start()
+        self._monitor_thread.start()
         logger.info("Resource monitoring thread started")

And update stop_monitoring to join:

     def stop_monitoring(self):
         """Stop the monitoring thread."""
         logger.info("Stopping resource monitoring...")
         self._should_stop = True

         # Ensure consumer is consuming on shutdown
         with self.lock:
             if self.is_paused:
                 logger.info("Resuming consumer before shutdown...")
                 self.resume_consumer()
+
+        # Wait for thread to finish with timeout
+        if self._monitor_thread and self._monitor_thread.is_alive():
+            self._monitor_thread.join(timeout=self.check_interval + 2)
+            if self._monitor_thread.is_alive():
+                logger.warning("Monitor thread did not stop within timeout")
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c5da53f and 9364c37.

📒 Files selected for processing (1)
  • backend/app/celery/resource_monitor.py (1 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

Use type hints in Python code (Python 3.11+ project)

Files:

  • backend/app/celery/resource_monitor.py
backend/app/celery/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

Keep Celery app configuration (priority queues, beat scheduler, workers) under backend/app/celery/

Files:

  • backend/app/celery/resource_monitor.py
🪛 Ruff (0.14.7)
backend/app/celery/resource_monitor.py

6-6: typing.List is deprecated, use list instead

(UP035)


118-118: Use f-string instead of format call

Convert to f-string

(UP032)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: checks (3.11.7, 6)

Comment on lines +35 to +36
self.control: Control = None
self.worker_hostname = None
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Fix type annotations for optional attributes.

Lines 35-36 have type annotation issues:

  • Line 35: Annotated as Control but initialized with None—should be Control | None
  • Line 36: Missing type annotation entirely
-        # Will be set by signal
-        self.control: Control = None
-        self.worker_hostname = None
+        # Will be set by signal
+        self.control: Control | None = None
+        self.worker_hostname: str | None = None

As per coding guidelines, type hints are required for Python 3.11+ projects.

🤖 Prompt for AI Agents
In backend/app/celery/resource_monitor.py around lines 35-36, fix the type
annotations for optional attributes: change self.control: Control = None to
self.control: Control | None = None and add a type for worker_hostname, e.g.
self.worker_hostname: str | None = None (ensure Control is imported or
referenced and adjust imports if needed).

@codecov
Copy link

codecov bot commented Dec 2, 2025

Codecov Report

❌ Patch coverage is 34.28571% with 92 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
backend/app/celery/resource_monitor.py 30.39% 71 Missing ⚠️
backend/app/celery/celery_app.py 38.23% 21 Missing ⚠️

📢 Thoughts on this report? Let us know!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants