Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,9 @@ CALLBACK_READ_TIMEOUT = 10

# require as a env if you want to use doc transformation
OPENAI_API_KEY=""

# Resource Monitoring Configuration
RESOURCE_MONITORING_ENABLED=True
RESOURCE_CHECK_INTERVAL=5
CPU_THRESHOLD_PERCENT=80.0
MEMORY_THRESHOLD_PERCENT=85.0
78 changes: 71 additions & 7 deletions backend/app/celery/celery_app.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import logging
from celery import Celery
from celery.app.control import Control
from celery.signals import worker_ready, worker_shutdown, task_prerun, task_postrun
from kombu import Exchange, Queue

from app.core.config import settings
from app.celery.resource_monitor import resource_monitor

logger = logging.getLogger(__name__)

# Create Celery instance
celery_app = Celery(
Expand All @@ -16,7 +22,7 @@
# Define exchanges and queues with priority
default_exchange = Exchange("default", type="direct")

# Celery configuration using environment variables
# Celery configuration
celery_app.conf.update(
# Queue configuration with priority support
task_queues=(
Expand Down Expand Up @@ -52,40 +58,98 @@
# Enable priority support
task_inherit_parent_priority=True,
worker_prefetch_multiplier=settings.CELERY_WORKER_PREFETCH_MULTIPLIER,
# Worker configuration from environment
# Worker configuration
worker_concurrency=settings.COMPUTED_CELERY_WORKER_CONCURRENCY,
worker_max_tasks_per_child=settings.CELERY_WORKER_MAX_TASKS_PER_CHILD,
worker_max_memory_per_child=settings.CELERY_WORKER_MAX_MEMORY_PER_CHILD,
# Security
worker_hijack_root_logger=False,
worker_log_color=False,
# Task execution from environment
# Task execution
task_soft_time_limit=settings.CELERY_TASK_SOFT_TIME_LIMIT,
task_time_limit=settings.CELERY_TASK_TIME_LIMIT,
task_reject_on_worker_lost=True,
task_ignore_result=False,
task_acks_late=True,
# Retry configuration from environment
# Retry configuration
task_default_retry_delay=settings.CELERY_TASK_DEFAULT_RETRY_DELAY,
task_max_retries=settings.CELERY_TASK_MAX_RETRIES,
# Task configuration from environment
# Task configuration
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone=settings.CELERY_TIMEZONE,
enable_utc=settings.CELERY_ENABLE_UTC,
task_track_started=True,
task_always_eager=False,
# Result backend settings from environment
# Result backend settings
result_expires=settings.CELERY_RESULT_EXPIRES,
result_compression="gzip",
# Monitoring
worker_send_task_events=True,
task_send_sent_event=True,
# Connection settings from environment
# Connection settings
broker_connection_retry_on_startup=True,
broker_pool_limit=settings.CELERY_BROKER_POOL_LIMIT,
worker_pool_restarts=True,
)


@worker_ready.connect
def start_resource_monitoring(sender, **kwargs):
"""Start resource monitoring when worker is ready."""
if not settings.RESOURCE_MONITORING_ENABLED:
logger.info("Resource monitoring is disabled")
return

try:
# Create Control instance
control = Control(app=celery_app)

# Get worker hostname from the sender (consumer)
worker_hostname = sender.hostname

# Extract queue names from configuration
queue_names = [queue.name for queue in celery_app.conf.task_queues]

# Inject into resource monitor
resource_monitor.control = control
resource_monitor.worker_hostname = worker_hostname
resource_monitor.queue_names = queue_names

logger.info(
f"Resource monitor initialized - " f"Queues: {', '.join(queue_names)}"
)

# Start monitoring
resource_monitor.start_monitoring()

except Exception as e:
logger.error(f"Failed to start resource monitoring: {e}", exc_info=True)


@worker_shutdown.connect
def stop_resource_monitoring(**kwargs):
"""Stop resource monitoring on worker shutdown."""
if not settings.RESOURCE_MONITORING_ENABLED:
return

resource_monitor.stop_monitoring()


@task_prerun.connect
def track_task_start(**kwargs):
"""Track when a task starts executing."""
if settings.RESOURCE_MONITORING_ENABLED:
resource_monitor.increment_active_tasks()


@task_postrun.connect
def track_task_end(**kwargs):
"""Track when a task finishes executing."""
if settings.RESOURCE_MONITORING_ENABLED:
resource_monitor.decrement_active_tasks()


# Auto-discover tasks
celery_app.autodiscover_tasks()
194 changes: 194 additions & 0 deletions backend/app/celery/resource_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
# app/celery/resource_monitor.py
import logging
import psutil
import threading
import time
from typing import List

from app.core.config import settings
from celery.app.control import Control

logger = logging.getLogger(__name__)


class ResourceMonitor:
"""
Monitor system resources and control task consumption.
Uses Celery Control API to pause/resume queue consumption.
"""

def __init__(
self,
cpu_threshold: float = settings.CPU_THRESHOLD_PERCENT,
memory_threshold: float = settings.MEMORY_THRESHOLD_PERCENT,
check_interval: int = settings.RESOURCE_CHECK_INTERVAL,
):
self.cpu_threshold = cpu_threshold
self.memory_threshold = memory_threshold
self.check_interval = check_interval
self.is_paused = False
self.active_tasks = 0
self.lock = threading.Lock()
self._should_stop = False

# Will be set by signal
self.control: Control = None
self.worker_hostname = None
Comment on lines +35 to +36
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Fix type annotations for optional attributes.

Lines 35-36 have type annotation issues:

  • Line 35: Annotated as Control but initialized with None—should be Control | None
  • Line 36: Missing type annotation entirely
-        # Will be set by signal
-        self.control: Control = None
-        self.worker_hostname = None
+        # Will be set by signal
+        self.control: Control | None = None
+        self.worker_hostname: str | None = None

As per coding guidelines, type hints are required for Python 3.11+ projects.

🤖 Prompt for AI Agents
In backend/app/celery/resource_monitor.py around lines 35-36, fix the type
annotations for optional attributes: change self.control: Control = None to
self.control: Control | None = None and add a type for worker_hostname, e.g.
self.worker_hostname: str | None = None (ensure Control is imported or
referenced and adjust imports if needed).

self.queue_names: List[str] = []

def get_cpu_usage(self) -> float:
"""Get current CPU usage percentage."""
return psutil.cpu_percent(interval=1)

def get_memory_usage(self) -> float:
"""Get current memory usage percentage."""
return psutil.virtual_memory().percent

def should_pause(self, cpu: float, memory: float) -> bool:
"""Determine if worker should pause based on thresholds."""
return cpu > self.cpu_threshold or memory > self.memory_threshold

def pause_consumer(self):
"""Stop consuming tasks from all queues."""
if not self.control or not self.worker_hostname:
logger.error("Control or worker hostname not initialized")
return

if not self.queue_names:
logger.error("No queue names configured")
return

try:
# Cancel consumption for each queue
for queue_name in self.queue_names:
self.control.cancel_consumer(
queue=queue_name, destination=[self.worker_hostname]
)
logger.info(f"Cancelled consumer for queue: {queue_name}")

self.is_paused = True
logger.warning(
f"Worker PAUSED - stopped consuming from queues: {', '.join(self.queue_names)}"
)
except Exception as e:
logger.error(f"Error pausing consumer: {e}", exc_info=True)

def resume_consumer(self):
"""Resume consuming tasks from all queues."""
if not self.control or not self.worker_hostname:
logger.error("Control or worker hostname not initialized")
return

if not self.queue_names:
logger.error("No queue names configured")
return

try:
# Add consumers back for each queue
for queue_name in self.queue_names:
self.control.add_consumer(
queue=queue_name, destination=[self.worker_hostname]
)
logger.info(f"Added consumer for queue: {queue_name}")

self.is_paused = False
logger.info(
f"Worker RESUMED - started consuming from queues: {', '.join(self.queue_names)}"
)
except Exception as e:
logger.error(f"Error resuming consumer: {e}", exc_info=True)

def monitor_loop(self):
"""Main monitoring loop - runs in separate thread."""
logger.info(
f"Resource monitoring started - "
f"CPU threshold: {self.cpu_threshold}%, "
f"Memory threshold: {self.memory_threshold}%, "
f"Check interval: {self.check_interval}s, "
f"Monitoring queues: {', '.join(self.queue_names)}"
)

while not self._should_stop:
try:
cpu = self.get_cpu_usage()
memory = self.get_memory_usage()
should_pause_now = self.should_pause(cpu, memory)
# Remove this line later
logger.info(
"Memmory Usage: {:.2f}%, CPU Usage: {:.2f}%".format(memory, cpu)
)
with self.lock:
# Pause if resources exceeded and not already paused
if should_pause_now and not self.is_paused:
logger.warning(
f"Resource threshold exceeded! "
f"CPU: {cpu:.1f}% (limit: {self.cpu_threshold}%), "
f"Memory: {memory:.1f}% (limit: {self.memory_threshold}%), "
f"Active tasks: {self.active_tasks}. "
f"Pausing task consumption..."
)
self.pause_consumer()

# Resume if resources OK and currently paused
elif not should_pause_now and self.is_paused:
logger.info(
f"Resources within limits - "
f"CPU: {cpu:.1f}%, Memory: {memory:.1f}%, "
f"Active tasks: {self.active_tasks}. "
f"Resuming task consumption..."
)
self.resume_consumer()

elif not self.is_paused:
logger.debug(
f"Status - CPU: {cpu:.1f}%, Memory: {memory:.1f}%, "
f"Active tasks: {self.active_tasks}, Paused: {self.is_paused}"
)

except Exception as e:
logger.error(f"Error in resource monitor loop: {e}", exc_info=True)

time.sleep(self.check_interval)

logger.info("Resource monitoring loop ended")

def start_monitoring(self):
"""Start the monitoring thread."""
if not self.control or not self.worker_hostname:
logger.error("Cannot start monitoring: control or worker hostname not set")
return

if not self.queue_names:
logger.error("Cannot start monitoring: no queues configured")
return

self._should_stop = False
monitor_thread = threading.Thread(
target=self.monitor_loop, daemon=True, name="ResourceMonitor"
)
monitor_thread.start()
logger.info("Resource monitoring thread started")

def stop_monitoring(self):
"""Stop the monitoring thread."""
logger.info("Stopping resource monitoring...")
self._should_stop = True

# Ensure consumer is consuming on shutdown
with self.lock:
if self.is_paused:
logger.info("Resuming consumer before shutdown...")
self.resume_consumer()

def increment_active_tasks(self):
"""Track task start."""
with self.lock:
self.active_tasks += 1

def decrement_active_tasks(self):
"""Track task end."""
with self.lock:
self.active_tasks = max(0, self.active_tasks - 1)


resource_monitor = ResourceMonitor()
6 changes: 6 additions & 0 deletions backend/app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ def AWS_S3_BUCKET(self) -> str:
CALLBACK_CONNECT_TIMEOUT: int = 3
CALLBACK_READ_TIMEOUT: int = 10

# Resources Monitoring
RESOURCE_MONITORING_ENABLED: bool = True
RESOURCE_CHECK_INTERVAL: int = 5
CPU_THRESHOLD_PERCENT: float = 75.0
MEMORY_THRESHOLD_PERCENT: float = 75.0

@computed_field # type: ignore[prop-decorator]
@property
def COMPUTED_CELERY_WORKER_CONCURRENCY(self) -> int:
Expand Down
1 change: 1 addition & 0 deletions backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ dependencies = [
"celery>=5.3.0,<6.0.0",
"redis>=5.0.0,<6.0.0",
"flower>=2.0.1",
"psutil>=7.1.3",
]

[tool.uv]
Expand Down
Loading