Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
582 changes: 582 additions & 0 deletions EVENT_DRIVEN_IMPLEMENTATION.md

Large diffs are not rendered by default.

139 changes: 139 additions & 0 deletions examples/example_4_event_driven_orchestration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
"""
Event-Driven Orchestration Demo

This script demonstrates the event-driven architecture implemented
for Issue #190: Event-Driven Orchestration with Celery Integration.

Requirements:
1. Redis server running (localhost:6379 or REDIS_URL env var)
2. Celery worker running (celery -A worker worker --loglevel=info)

Usage:
python examples/event_driven_demo.py
"""

import asyncio
import os
import sys

# add src to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src"))

from memu.app.service import MemoryService
from memu.events import event_manager
from memu.events.dispatcher import CeleryDispatcher
from memu.events.setup import init_event_system


def print_section(title: str):
"""Print a formatted section header."""
print("\n" + "=" * 60)
print(f" {title}")
print("=" * 60 + "\n")


async def demo_event_driven_flow():
"""
Demonstrate the complete event-driven flow.

Flow:
1. Initialize event system with CeleryDispatcher
2. Emit memory operation event
3. CeleryDispatcher catches event
4. Dispatches to Celery worker
5. Worker processes memory in background
"""
print_section("Event-Driven Orchestration Demo")

print("Step 1: Initializing event system...")
init_event_system(celery=True)
print("Event system initialized")
print(f" - Registered dispatchers: {len(event_manager._dispatchers)}")
print(f" - Supported events: {', '.join(event_manager.list_events())}")

print("\nStep 2: Creating MemoryService instance...")
service = MemoryService()
print("MemoryService created")

print("\nStep 3: Submitting memory for background processing...")
print(" (Using event-driven dispatch)")

result = await service.memorize(
resource_url="https://example.com/event-driven-demo.pdf",
modality="document",
user={"user_id": "demo_user_123"},
background=True, # triggers event emission
)

print("Event emitted and dispatched")
print(f" - Status: {result.get('status')}")
print(f" - Message: {result.get('message')}")
print(f" - Event: {result.get('event')}")
print(f" - Resource: {result.get('resource_url')}")

print_section("Event Flow Visualization")
print("1. MemoryService.memorize(background=True)")
print(" ↓")
print("2. EventManager.emit('on_memory_saved', data)")
print(" ↓")
print("3. CeleryDispatcher.on_memory_saved(data)")
print(" ↓")
print("4. process_memory_task.delay(...) → Redis Queue")
print(" ↓")
print("5. Celery Worker consumes task")
print(" ↓")
print("6. MemoryService.memorize() executes in background")

print_section("Custom Event Listener Demo")

custom_events = []

def custom_listener(data):
"""Custom event listener for demonstration."""
custom_events.append(data)
print("Custom listener received event!")
print(f" - Resource: {data.get('resource_url')}")
print(f" - Modality: {data.get('modality')}")

# Register custom listener
event_manager.on("on_memory_saved", custom_listener)
print("Registered custom listener")

# Emit another event
print("\nEmitting event with custom listener...")
await service.memorize(
resource_url="https://example.com/custom-listener-test.pdf",
modality="document",
user={"user_id": "demo_user_456"},
background=True,
)

print(f"\nCustom listener captured {len(custom_events)} event(s)")

print_section("Dispatcher Status")
for dispatcher in event_manager._dispatchers:
if isinstance(dispatcher, CeleryDispatcher):
print("CeleryDispatcher:")
print(f" - Enabled: {dispatcher.enabled}")
print(f" - Task Options: {dispatcher.task_options}")

print_section("Demo Complete")
print("Event-driven orchestration system working correctly!")
print("\nNext steps:")
print("1. Start Celery worker: celery -A worker worker --loglevel=info")
print("2. Check worker logs to see background task execution")
print("3. Verify tasks are being processed in the background")


if __name__ == "__main__":
print("""
MemU Event-Driven Orchestration Demo
Issue #190: Event-Driven Orchestration with Celery
""")

print("Prerequisites:")
print(" Redis server running (default: localhost:6379)")
print(" Celery worker running (celery -A worker worker --loglevel=info)")
print("\nStarting demo...\n")

asyncio.run(demo_event_driven_flow())
57 changes: 57 additions & 0 deletions src/memu/app/memorize.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,63 @@ async def memorize(
resource_url: str,
modality: str,
user: dict[str, Any] | None = None,
background: bool = False,
) -> dict[str, Any]:
"""
Memorize a resource from a URL.

This method processes the resource either synchronously or asynchronously
using the event-driven orchestration system.

Args:
resource_url: URL of the resource to memorize
modality: Type of content (document, image, video, audio, conversation)
user: Optional user context
background: If True, uses event-driven dispatch to Celery workers

Returns:
Response dictionary with result or task status

Event-Driven Flow (background=True):
1. Emit 'on_memory_saved' event with payload
2. CeleryDispatcher catches event
3. Dispatches to Celery worker (process_memory_task)
4. Returns task_id for tracking
"""
if background:
# Event-driven orchestration: emit event instead of direct task call
from memu.events import event_manager

# prepare event payload
event_data = {
"resource_url": resource_url,
"modality": modality,
"user": user,
}

# Emit event - CeleryDispatcher will handle async dispatch
logger.info(
"Emitting on_memory_saved event for background processing",
extra={
"resource_url": resource_url,
"modality": modality,
"user_id": user.get("user_id") if user else None,
},
)

# Emit the event - any registered dispatchers will handle it
event_manager.emit("on_memory_saved", event_data)

# Get task_id from CeleryDispatcher if available
# (For now, return generic response - can enhance to track task_id)
return {
"status": "queued",
"message": "Memory processing event dispatched to background workers",
"event": "on_memory_saved",
"resource_url": resource_url,
}

# Synchronous processing
ctx = self._get_context()
store = self._get_database()
user_scope = self.user_model(**user).model_dump() if user is not None else None
Expand All @@ -92,6 +148,7 @@ async def memorize(
if response is None:
msg = "Memorize workflow failed to produce a response"
raise RuntimeError(msg)

return response

def _build_memorize_workflow(self) -> list[WorkflowStep]:
Expand Down
65 changes: 65 additions & 0 deletions src/memu/celery_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import os

from celery import Celery
from celery.signals import setup_logging

# Redis configuration with optional authentication
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD", None)
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", "6379"))
REDIS_DB = int(os.getenv("REDIS_DB", "0"))

if REDIS_PASSWORD:
REDIS_URL = f"redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}"
else:
REDIS_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}"

celery_app = Celery("memu_worker", broker=REDIS_URL, backend=REDIS_URL, include=["memu.tasks"])


# Configure Celery logging
@setup_logging.connect
def setup_celery_logging_config(**kwargs):
"""Override Celery's default logging with structured JSON logging."""
from memu.task_utils.logging_config import configure_celery_logging

log_level = os.getenv("CELERY_LOG_LEVEL", "INFO")
use_json = os.getenv("CELERY_LOG_JSON", "true").lower() == "true"
log_file = os.getenv("CELERY_LOG_FILE", None)

configure_celery_logging(log_level, use_json, log_file)


celery_app.conf.update(
# Serialization
task_serializer="json",
accept_content=["json"],
result_serializer="json",
# Timezone
timezone="UTC",
enable_utc=True,
# Task acknowledgment
task_acks_late=True,
task_acks_on_failure_or_timeout=True,
task_reject_on_worker_lost=True,
# Timeouts (can be overridden per-task)
task_soft_time_limit=int(os.getenv("CELERY_TASK_SOFT_TIME_LIMIT", "300")), # 5 minutes
task_time_limit=int(os.getenv("CELERY_TASK_TIME_LIMIT", "360")), # 6 minutes
# Result backend settings
result_expires=int(os.getenv("CELERY_RESULT_EXPIRES", "86400")), # 24 hours
result_compression="gzip",
result_extended=True, # Store task args in result
# Task tracking
task_track_started=True,
task_send_sent_event=True,
task_store_eager_result=True, # For testing
# Worker settings
worker_prefetch_multiplier=int(os.getenv("CELERY_WORKER_PREFETCH", "1")),
worker_max_tasks_per_child=int(os.getenv("CELERY_WORKER_MAX_TASKS", "100")),
worker_max_memory_per_child=int(os.getenv("CELERY_WORKER_MAX_MEMORY", "500000")), # 500MB
# Connection settings
broker_connection_retry_on_startup=True,
broker_pool_limit=int(os.getenv("CELERY_BROKER_POOL_LIMIT", "10")),
# Global rate limiting
task_default_rate_limit=os.getenv("CELERY_TASK_RATE_LIMIT", "100/h"),
)
27 changes: 27 additions & 0 deletions src/memu/events/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""
Event-driven orchestration system for MemU.

This module provides a flexible event hook system that allows external
workers (Celery/Redis) to react to memory operations asynchronously.

Architecture:
EventManager (hub) -> Dispatchers (listeners) -> Workers (Celery tasks)

Usage:
# Simply import - CeleryDispatcher auto-registers
from memu.events import event_manager

# Emit events
event_manager.emit('on_memory_saved', {
'resource_url': 'https://example.com/doc.pdf',
'modality': 'document',
'user': {'user_id': '123'}
})
"""

# Import setup to trigger auto-initialization
# This creates and registers the CeleryDispatcher automatically
from . import setup # noqa: F401
from .manager import EventManager, event_manager

__all__ = ["EventManager", "event_manager"]
Loading