diff --git a/camel/societies/workforce/__init__.py b/camel/societies/workforce/__init__.py index a7f67476dc..b390553d21 100644 --- a/camel/societies/workforce/__init__.py +++ b/camel/societies/workforce/__init__.py @@ -14,11 +14,14 @@ from .role_playing_worker import RolePlayingWorker from .single_agent_worker import SingleAgentWorker +from .utils import PipelineTaskBuilder from .workflow_memory_manager import WorkflowSelectionMethod -from .workforce import Workforce +from .workforce import Workforce, WorkforceMode __all__ = [ "Workforce", + "WorkforceMode", + "PipelineTaskBuilder", "SingleAgentWorker", "RolePlayingWorker", "WorkflowSelectionMethod", diff --git a/camel/societies/workforce/utils.py b/camel/societies/workforce/utils.py index 5f9c4cdbee..9df8eb7e8e 100644 --- a/camel/societies/workforce/utils.py +++ b/camel/societies/workforce/utils.py @@ -221,6 +221,285 @@ def quality_sufficient(self) -> bool: ) +class PipelineTaskBuilder: + r"""Helper class for building pipeline tasks with dependencies.""" + + def __init__(self): + """Initialize an empty pipeline task builder.""" + from camel.tasks import Task + self._TaskClass = Task + self.task_list = [] + self.task_counter = 0 + self._task_registry = {} # task_id -> Task mapping for fast lookup + self._last_task_id = None # Track the last added task for chain inference + self._last_parallel_tasks = [] # Track the last added parallel tasks for sync + + def add( + self, + content: str, + task_id: Optional[str] = None, + dependencies: Optional[List[str]] = None, + additional_info: Optional[dict] = None, + auto_depend: bool = True, + ) -> 'PipelineTaskBuilder': + """Add a task to the pipeline with support for chaining. + + Args: + content (str): The content/description of the task. + task_id (str, optional): Unique identifier for the task. If None, + a unique ID will be generated. (default: :obj:`None`) + dependencies (List[str], optional): List of task IDs that this + task depends on. If None and auto_depend=True, will depend on + the last added task. (default: :obj:`None`) + additional_info (dict, optional): Additional information + for the task. (default: :obj:`None`) + auto_depend (bool, optional): If True and dependencies is None, + automatically depend on the last added task. (default: :obj:`True`) + + Returns: + PipelineTaskBuilder: Self for method chaining. + + Raises: + ValueError: If task_id already exists or if any dependency is not found. + + Example: + >>> builder.add("Step 1").add("Step 2").add("Step 3") + # Step 2 depends on Step 1, Step 3 depends on Step 2 + """ + # Generate or validate task_id + task_id = task_id or f"pipeline_task_{self.task_counter}" + + # Check ID uniqueness + if task_id in self._task_registry: + raise ValueError(f"Task ID '{task_id}' already exists") + + # Auto-infer dependencies if not specified + if dependencies is None and auto_depend and self._last_task_id is not None: + dependencies = [self._last_task_id] + + # Validate dependencies exist + dep_tasks = [] + if dependencies: + missing_deps = [dep for dep in dependencies if dep not in self._task_registry] + if missing_deps: + raise ValueError(f"Dependencies not found: {missing_deps}") + dep_tasks = [self._task_registry[dep] for dep in dependencies] + + # Create task + task = self._TaskClass( + content=content, + id=task_id, + dependencies=dep_tasks, + additional_info=additional_info, + ) + + self.task_list.append(task) + self._task_registry[task_id] = task + self._last_task_id = task_id # Update last task for chaining + self.task_counter += 1 + return self + + def add_parallel_tasks( + self, + task_contents: List[str], + dependencies: Optional[List[str]] = None, + task_id_prefix: str = "parallel", + auto_depend: bool = True, + ) -> 'PipelineTaskBuilder': + """Add multiple parallel tasks that can execute simultaneously. + + Args: + task_contents (List[str]): List of task content strings. + dependencies (List[str], optional): Common dependencies for all + parallel tasks. If None and auto_depend=True, will depend on + the last added task. (default: :obj:`None`) + task_id_prefix (str, optional): Prefix for generated task IDs. + (default: :obj:`"parallel"`) + auto_depend (bool, optional): If True and dependencies is None, + automatically depend on the last added task. (default: :obj:`True`) + + Returns: + PipelineTaskBuilder: Self for method chaining. + + Raises: + ValueError: If any task_id already exists or if any dependency is not found. + + Example: + >>> builder.add("Collect Data").add_parallel_tasks([ + ... "Technical Analysis", "Fundamental Analysis" + ... ]).add_sync_task("Generate Report") + """ + if not task_contents: + raise ValueError("task_contents cannot be empty") + + # Auto-infer dependencies if not specified + if dependencies is None and auto_depend and self._last_task_id is not None: + dependencies = [self._last_task_id] + + parallel_task_ids = [] + base_counter = self.task_counter # Save current counter for consistent naming + + for i, content in enumerate(task_contents): + task_id = f"{task_id_prefix}_{i}_{base_counter}" + # Use auto_depend=False since we're manually managing dependencies + self.add(content, task_id, dependencies, auto_depend=False) + parallel_task_ids.append(task_id) + + # Set the last task to None since we have multiple parallel endings + # The next task will need to explicitly specify dependencies + self._last_task_id = None + # Store parallel task IDs for potential sync operations + self._last_parallel_tasks = parallel_task_ids + + return self + + def add_sync_task( + self, + content: str, + wait_for: Optional[List[str]] = None, + task_id: Optional[str] = None, + ) -> 'PipelineTaskBuilder': + """Add a synchronization task that waits for multiple tasks. + + Args: + content (str): Content of the synchronization task. + wait_for (List[str], optional): List of task IDs to wait for. + If None, will automatically wait for the last parallel tasks. + (default: :obj:`None`) + task_id (str, optional): ID for the sync task. If None, a unique + ID will be generated. (default: :obj:`None`) + + Returns: + PipelineTaskBuilder: Self for method chaining. + + Raises: + ValueError: If task_id already exists or if any dependency is not found. + + Example: + >>> builder.add_parallel_tasks(["Task A", "Task B"]).add_sync_task("Merge Results") + # Automatically waits for both parallel tasks + """ + # Auto-infer wait_for from last parallel tasks + if wait_for is None: + if self._last_parallel_tasks: + wait_for = self._last_parallel_tasks + # Clear the parallel tasks after using them + self._last_parallel_tasks = [] + else: + raise ValueError("wait_for cannot be empty for sync task and no parallel tasks found") + + if not wait_for: + raise ValueError("wait_for cannot be empty for sync task") + + return self.add(content, task_id, dependencies=wait_for, auto_depend=False) + + + def build(self) -> List: + """Build and return the complete task list with dependencies. + + Returns: + List[Task]: List of tasks with proper dependency relationships. + + Raises: + ValueError: If there are circular dependencies or other validation errors. + """ + if not self.task_list: + raise ValueError("No tasks defined in pipeline") + + # Validate no circular dependencies + self._validate_dependencies() + + return self.task_list.copy() + + def clear(self) -> None: + """Clear all tasks from the builder.""" + self.task_list.clear() + self._task_registry.clear() + self.task_counter = 0 + self._last_task_id = None + self._last_parallel_tasks = [] + + def fork(self, task_contents: List[str]) -> 'PipelineTaskBuilder': + """Create parallel branches from the current task (alias for add_parallel_tasks). + + Args: + task_contents (List[str]): List of task content strings for parallel execution. + + Returns: + PipelineTaskBuilder: Self for method chaining. + + Example: + >>> builder.add("Collect Data").fork([ + ... "Technical Analysis", "Fundamental Analysis" + ... ]).join("Generate Report") + """ + return self.add_parallel_tasks(task_contents) + + def join(self, content: str, task_id: Optional[str] = None) -> 'PipelineTaskBuilder': + """Join parallel branches with a synchronization task (alias for add_sync_task). + + Args: + content (str): Content of the join/sync task. + task_id (str, optional): ID for the sync task. + + Returns: + PipelineTaskBuilder: Self for method chaining. + + Example: + >>> builder.fork(["Task A", "Task B"]).join("Merge Results") + """ + return self.add_sync_task(content, task_id=task_id) + + + def _validate_dependencies(self) -> None: + """Validate that there are no circular dependencies. + + Raises: + ValueError: If circular dependencies are detected. + """ + # Use DFS to detect cycles + visited = set() + rec_stack = set() + + def has_cycle(task_id: str) -> bool: + visited.add(task_id) + rec_stack.add(task_id) + + task = self._task_registry[task_id] + for dep in task.dependencies: + if dep.id not in visited: + if has_cycle(dep.id): + return True + elif dep.id in rec_stack: + return True + + rec_stack.remove(task_id) + return False + + for task_id in self._task_registry: + if task_id not in visited: + if has_cycle(task_id): + raise ValueError(f"Circular dependency detected involving task: {task_id}") + + def get_task_info(self) -> dict: + """Get information about all tasks in the pipeline. + + Returns: + dict: Dictionary containing task count and task details. + """ + return { + "task_count": len(self.task_list), + "tasks": [ + { + "id": task.id, + "content": task.content, + "dependencies": [dep.id for dep in task.dependencies] + } + for task in self.task_list + ] + } + + def check_if_running( running: bool, max_retries: int = 3, diff --git a/camel/societies/workforce/workforce.py b/camel/societies/workforce/workforce.py index 012a8cd74b..8d76cba1ff 100644 --- a/camel/societies/workforce/workforce.py +++ b/camel/societies/workforce/workforce.py @@ -68,6 +68,8 @@ ) from camel.societies.workforce.task_channel import TaskChannel from camel.societies.workforce.utils import ( + FailureContext, + PipelineTaskBuilder, RecoveryStrategy, TaskAnalysisResult, TaskAssignment, @@ -75,6 +77,8 @@ WorkerConf, check_if_running, ) +from camel.societies.workforce.events import WorkerCreatedEvent +from camel.societies.workforce.workforce_callback import WorkforceCallback from camel.societies.workforce.worker import Worker from camel.tasks.task import ( Task, @@ -130,6 +134,13 @@ class WorkforceState(Enum): STOPPED = "stopped" +class WorkforceMode(Enum): + r"""Workforce execution mode for different task processing strategies.""" + + AUTO_DECOMPOSE = "auto_decompose" # Automatic task decomposition mode + PIPELINE = "pipeline" # Predefined pipeline mode + + class WorkforceSnapshot: r"""Snapshot of workforce state for resuming execution.""" @@ -218,17 +229,12 @@ class Workforce(BaseNode): support native structured output. When disabled, the workforce uses the native response_format parameter. (default: :obj:`True`) - callbacks (Optional[List[WorkforceCallback]], optional): A list of - callback handlers to observe and record workforce lifecycle events - and metrics (e.g., task creation/assignment/start/completion/ - failure, worker creation/deletion, all-tasks-completed). All - items must be instances of :class:`WorkforceCallback`, otherwise - a :class:`ValueError` is raised. If none of the provided - callbacks implement :class:`WorkforceMetrics`, a built-in - :class:`WorkforceLogger` (implements both callback and metrics) - is added automatically. If at least one provided callback - implements :class:`WorkforceMetrics`, no default logger is added. - (default: :obj:`None`) + mode (WorkforceMode, optional): The execution mode for task + processing. AUTO_DECOMPOSE mode uses intelligent recovery + strategies (decompose, replan, etc.) when tasks fail. + PIPELINE mode uses simple retry logic and allows failed + tasks to continue the workflow, passing error information + to dependent tasks. (default: :obj:`WorkforceMode.AUTO_DECOMPOSE`) Example: >>> import asyncio @@ -281,6 +287,7 @@ def __init__( share_memory: bool = False, use_structured_output_handler: bool = True, task_timeout_seconds: Optional[float] = None, + mode: WorkforceMode = WorkforceMode.AUTO_DECOMPOSE, callbacks: Optional[List[WorkforceCallback]] = None, ) -> None: super().__init__(description) @@ -295,6 +302,8 @@ def __init__( self.task_timeout_seconds = ( task_timeout_seconds or TASK_TIMEOUT_SECONDS ) + self.mode = mode + self._initial_mode = mode # Store initial mode for reset() if self.use_structured_output_handler: self.structured_handler = StructuredOutputHandler() self._task: Optional[Task] = None @@ -302,6 +311,10 @@ def __init__( self._task_dependencies: Dict[str, List[str]] = {} self._assignees: Dict[str, str] = {} self._in_flight_tasks: int = 0 + + # Pipeline building state + self._pipeline_builder: Optional[PipelineTaskBuilder] = None + self._pipeline_tasks_need_assignment: bool = False # Dictionary to track task start times self._task_start_times: Dict[str, float] = {} # Human intervention support @@ -484,24 +497,22 @@ def __init__( def _initialize_callbacks( self, callbacks: Optional[List[WorkforceCallback]] ) -> None: - r"""Validate, register, and prime workforce callbacks.""" - self._callbacks: List[WorkforceCallback] = [] - - if callbacks: - for cb in callbacks: - if isinstance(cb, WorkforceCallback): - self._callbacks.append(cb) - else: - raise ValueError( - "All callbacks must be instances of WorkforceCallback" - ) - + r"""Initialize workforce callbacks.""" + self._callbacks: List[WorkforceCallback] = callbacks or [] + + # Check if any metrics callback is provided has_metrics_callback = any( - isinstance(cb, WorkforceMetrics) for cb in self._callbacks + hasattr(cb, 'get_workforce_kpis') for cb in self._callbacks ) - + if not has_metrics_callback: - self._callbacks.append(WorkforceLogger(workforce_id=self.node_id)) + # Add default WorkforceLogger if no metrics callback provided + try: + from camel.societies.workforce.workforce_logger import WorkforceLogger + self._callbacks.append(WorkforceLogger(workforce_id=self.node_id)) + except ImportError: + # If WorkforceLogger is not available, continue without it + pass else: logger.info( "WorkforceMetrics implementation detected. Skipping default " @@ -625,9 +636,367 @@ def _ensure_pause_event_in_kwargs(self, kwargs: Optional[Dict]) -> Dict: def __repr__(self): return ( f"Workforce {self.node_id} ({self.description}) - " - f"State: {self._state.value}" + f"State: {self._state.value} - Mode: {self.mode.value}" ) + def set_mode(self, mode: WorkforceMode) -> Workforce: + """Set the execution mode of the workforce. + + This allows switching between AUTO_DECOMPOSE and PIPELINE modes. + Useful when you want to reuse the same workforce instance for + different task processing strategies. + + Args: + mode (WorkforceMode): The desired execution mode. + - AUTO_DECOMPOSE: Intelligent task decomposition with recovery + - PIPELINE: Predefined task pipeline with simple retry logic + + Returns: + Workforce: Self for method chaining. + + Example: + >>> # Run a pipeline + >>> workforce.set_mode(WorkforceMode.PIPELINE) + >>> workforce.pipeline_add("Step 1").pipeline_build() + >>> workforce.process_task(task) + >>> + >>> # Reset to original mode + >>> workforce.reset() # Automatically resets to initial mode + >>> # Or manually switch mode + >>> workforce.set_mode(WorkforceMode.AUTO_DECOMPOSE) + >>> workforce.process_task(another_task) + """ + self.mode = mode + logger.info(f"Workforce mode changed to {mode.value}") + return self + + def _ensure_pipeline_builder(self): + """Ensure pipeline builder is initialized and switch to pipeline mode.""" + if self._pipeline_builder is None: + from camel.societies.workforce.utils import PipelineTaskBuilder + self._pipeline_builder = PipelineTaskBuilder() + + # Auto-switch to pipeline mode + if self.mode != WorkforceMode.PIPELINE: + logger.info( + f"Auto-switching workforce mode from {self.mode.value} to " + f"PIPELINE. Use workforce.set_mode() to manually control mode." + ) + self.mode = WorkforceMode.PIPELINE + + def pipeline_add( + self, + content: Union[str, Task], + task_id: Optional[str] = None, + dependencies: Optional[List[str]] = None, + additional_info: Optional[Dict[str, Any]] = None, + auto_depend: bool = True, + ) -> Workforce: + """Add a task to the pipeline with support for chaining. + + Accepts either a string for simple tasks or a Task object for + advanced usage with metadata, images, or custom configurations. + + Args: + content (Union[str, Task]): The task content string or a Task + object. If a Task object is provided, task_id and + additional_info parameters are ignored. + task_id (str, optional): Unique identifier for the task. If None, + a unique ID will be generated. Only used when content is a + string. (default: :obj:`None`) + dependencies (List[str], optional): List of task IDs that this + task depends on. If None and auto_depend=True, will depend on + the last added task. (default: :obj:`None`) + additional_info (Dict[str, Any], optional): Additional information + for the task. Only used when content is a string. + (default: :obj:`None`) + auto_depend (bool, optional): If True and dependencies is None, + automatically depend on the last added task. (default: :obj:`True`) + + Returns: + Workforce: Self for method chaining. + + Example: + >>> # Simple usage with strings + >>> workforce.pipeline_add("Step 1").pipeline_add("Step 2") + >>> + >>> # Advanced usage with Task objects + >>> task = Task( + ... content="Complex Task", + ... additional_info={"priority": "high"}, + ... image_list=["path/to/image.png"] + ... ) + >>> workforce.pipeline_add(task) + """ + self._ensure_pipeline_builder() + + # Convert Task object to parameters if needed + if isinstance(content, Task): + task_content = content.content + task_id = content.id + task_additional_info = content.additional_info + else: + task_content = content + task_additional_info = additional_info + + self._pipeline_builder.add( + task_content, task_id, dependencies, task_additional_info, auto_depend + ) + return self + + def add_parallel_pipeline_tasks( + self, + task_contents: Union[List[str], List[Task]], + dependencies: Optional[List[str]] = None, + task_id_prefix: str = "parallel", + auto_depend: bool = True, + ) -> Workforce: + """Add multiple parallel tasks to the pipeline. + + Accepts either a list of strings for simple tasks or a list of Task + objects for advanced usage with metadata, images, or custom + configurations. + + Args: + task_contents (Union[List[str], List[Task]]): List of task content + strings or Task objects. If Task objects are provided, + task_id_prefix is ignored. + dependencies (List[str], optional): Common dependencies for all + parallel tasks. (default: :obj:`None`) + task_id_prefix (str, optional): Prefix for generated task IDs. + Only used when task_contents contains strings. + (default: :obj:`"parallel"`) + auto_depend (bool, optional): If True and dependencies is None, + automatically depend on the last added task. (default: :obj:`True`) + + Returns: + Workforce: Self for method chaining. + + Example: + >>> # Simple usage with strings + >>> workforce.add_parallel_pipeline_tasks([ + ... "Task A", "Task B", "Task C" + ... ]) + >>> + >>> # Advanced usage with Task objects + >>> tasks = [ + ... Task(content="Analysis 1", additional_info={"type": "tech"}), + ... Task(content="Analysis 2", additional_info={"type": "biz"}) + ... ] + >>> workforce.add_parallel_pipeline_tasks(tasks) + """ + self._ensure_pipeline_builder() + + # Convert Task objects to content strings if needed + if task_contents and isinstance(task_contents[0], Task): + # Extract content from Task objects + content_list = [task.content for task in task_contents] + self._pipeline_builder.add_parallel_tasks( + content_list, dependencies, task_id_prefix, auto_depend + ) + else: + self._pipeline_builder.add_parallel_tasks( + task_contents, dependencies, task_id_prefix, auto_depend + ) + return self + + def add_sync_pipeline_task( + self, + content: Union[str, Task], + wait_for: Optional[List[str]] = None, + task_id: Optional[str] = None, + ) -> Workforce: + """Add a synchronization task that waits for multiple tasks. + + Accepts either a string for simple tasks or a Task object for + advanced usage with metadata, images, or custom configurations. + + Args: + content (Union[str, Task]): Content of the synchronization task + or a Task object. If a Task object is provided, task_id + parameter is ignored. + wait_for (List[str], optional): List of task IDs to wait for. + If None, will automatically wait for the last parallel tasks. + (default: :obj:`None`) + task_id (str, optional): ID for the sync task. Only used when + content is a string. (default: :obj:`None`) + + Returns: + Workforce: Self for method chaining. + + Example: + >>> # Simple usage + >>> workforce.add_sync_pipeline_task("Merge Results") + >>> + >>> # Advanced usage with Task object + >>> sync_task = Task( + ... content="Combine outputs", + ... additional_info={"type": "aggregation"} + ... ) + >>> workforce.add_sync_pipeline_task(sync_task) + """ + self._ensure_pipeline_builder() + + # Convert Task object to parameters if needed + if isinstance(content, Task): + task_content = content.content + task_id = content.id + else: + task_content = content + + self._pipeline_builder.add_sync_task(task_content, wait_for, task_id) + return self + + def pipeline_fork( + self, task_contents: Union[List[str], List[Task]] + ) -> Workforce: + """Create parallel branches from the current task. + + Accepts either a list of strings for simple tasks or a list of Task + objects for advanced usage with metadata, images, or custom + configurations. + + Args: + task_contents (Union[List[str], List[Task]]): List of task content + strings or Task objects for parallel execution. + + Returns: + Workforce: Self for method chaining. + + Example: + >>> # Simple usage with strings + >>> workforce.pipeline_add("Collect Data").pipeline_fork([ + ... "Technical Analysis", "Fundamental Analysis" + ... ]).pipeline_join("Generate Report") + >>> + >>> # Advanced usage with Task objects + >>> tasks = [ + ... Task(content="Parse JSON", additional_info={"format": "json"}), + ... Task(content="Parse XML", additional_info={"format": "xml"}) + ... ] + >>> workforce.pipeline_add("Fetch Data").pipeline_fork(tasks) + """ + self._ensure_pipeline_builder() + + # Convert Task objects to content strings if needed + if task_contents and isinstance(task_contents[0], Task): + # Extract content from Task objects + content_list = [task.content for task in task_contents] + self._pipeline_builder.fork(content_list) + else: + self._pipeline_builder.fork(task_contents) + return self + + def pipeline_join( + self, content: Union[str, Task], task_id: Optional[str] = None + ) -> Workforce: + """Join parallel branches with a synchronization task. + + Accepts either a string for simple tasks or a Task object for + advanced usage with metadata, images, or custom configurations. + + Args: + content (Union[str, Task]): Content of the join/sync task or a + Task object. If a Task object is provided, task_id parameter + is ignored. + task_id (str, optional): ID for the sync task. Only used when + content is a string. (default: :obj:`None`) + + Returns: + Workforce: Self for method chaining. + + Example: + >>> # Simple usage + >>> workforce.pipeline_fork(["Task A", "Task B"]).pipeline_join("Merge Results") + >>> + >>> # Advanced usage with Task object + >>> join_task = Task( + ... content="Aggregate analysis", + ... additional_info={"aggregation_type": "mean"} + ... ) + >>> workforce.pipeline_fork(["A", "B"]).pipeline_join(join_task) + """ + self._ensure_pipeline_builder() + + # Convert Task object to parameters if needed + if isinstance(content, Task): + task_content = content.content + task_id = content.id + else: + task_content = content + + self._pipeline_builder.join(task_content, task_id) + return self + + + def pipeline_build(self) -> Workforce: + """Build the pipeline and set up the tasks for execution. + + Returns: + Workforce: Self for method chaining. + + Example: + >>> workforce.pipeline_add("Step 1").pipeline_fork([ + ... "Task A", "Task B" + ... ]).pipeline_join("Merge").pipeline_build() + """ + if self._pipeline_builder is None: + raise ValueError("No pipeline tasks defined") + + tasks = self._pipeline_builder.build() + self.set_pipeline_tasks(tasks) + + return self + + def get_pipeline_builder(self) -> PipelineTaskBuilder: + """Get the underlying PipelineTaskBuilder for advanced usage. + + Returns: + PipelineTaskBuilder: The pipeline builder instance. + + Example: + >>> builder = workforce.get_pipeline_builder() + >>> builder.add("Complex Task").fork(["A", "B"]).join("Merge") + >>> tasks = builder.build() + >>> workforce.set_pipeline_tasks(tasks) + """ + self._ensure_pipeline_builder() + return self._pipeline_builder + + def set_pipeline_tasks(self, tasks: List[Task]) -> None: + """Set predefined pipeline tasks for PIPELINE mode. + + Args: + tasks (List[Task]): List of tasks with dependencies already set. + The dependencies should be Task objects in the Task.dependencies + attribute. + + Raises: + ValueError: If tasks are invalid. + """ + if not tasks: + raise ValueError("Cannot set empty task list for pipeline") + + # Auto-switch to pipeline mode if not already + if self.mode != WorkforceMode.PIPELINE: + self.mode = WorkforceMode.PIPELINE + + # Clear existing tasks and dependencies + self._pending_tasks.clear() + self._task_dependencies.clear() + self._assignees.clear() + + # Add tasks and set up dependencies + for task in tasks: + self._pending_tasks.append(task) + if task.dependencies: + self._task_dependencies[task.id] = [dep.id for dep in task.dependencies] + else: + self._task_dependencies[task.id] = [] + + # Mark that pipeline tasks need assignment + self._pipeline_tasks_need_assignment = True + def _collect_shared_memory(self) -> Dict[str, List]: r"""Collect memory from all SingleAgentWorker instances for sharing. @@ -859,10 +1228,18 @@ def _decompose_task( r"""Decompose the task into subtasks. This method will also set the relationship between the task and its subtasks. + Args: + task (Task): The task to decompose. + Returns: Union[List[Task], Generator[List[Task], None, None]]: - The subtasks or generator of subtasks. + The subtasks or generator of subtasks. Returns empty list for + PIPELINE mode. """ + # In PIPELINE mode, don't decompose - use predefined tasks + if self.mode == WorkforceMode.PIPELINE: + return [] + decompose_prompt = str( TASK_DECOMPOSE_PROMPT.format( content=task.content, @@ -1718,11 +2095,7 @@ async def handle_decompose_append_task( ) return [task] - if reset and self._state != WorkforceState.RUNNING: - self.reset() - logger.info("Workforce reset before handling task.") - - # Focus on the new task + self.reset() self._task = task task.state = TaskState.FAILED @@ -1797,27 +2170,120 @@ async def process_task_async( if interactive: return await self._process_task_with_snapshot(task) - subtasks = await self.handle_decompose_append_task(task) + # Handle different execution modes + if self.mode == WorkforceMode.PIPELINE: + return await self._process_task_with_pipeline(task) + else: + # AUTO_DECOMPOSE mode (default) + subtasks = await self.handle_decompose_append_task(task) - self.set_channel(TaskChannel()) + self.set_channel(TaskChannel()) - await self.start() + await self.start() - if subtasks: - task.result = "\n\n".join( - f"--- Subtask {sub.id} Result ---\n{sub.result}" - for sub in task.subtasks - if sub.result - ) - if task.subtasks and all( - sub.state == TaskState.DONE for sub in task.subtasks - ): - task.state = TaskState.DONE - else: - task.state = TaskState.FAILED + if subtasks: + task.result = "\n\n".join( + f"--- Subtask {sub.id} Result ---\n{sub.result}" + for sub in task.subtasks + if sub.result + ) + if task.subtasks and all( + sub.state == TaskState.DONE for sub in task.subtasks + ): + task.state = TaskState.DONE + else: + task.state = TaskState.FAILED + + return task + async def _process_task_with_pipeline(self, task: Task) -> Task: + """Process task using predefined pipeline tasks.""" + if not self._pending_tasks: + raise ValueError( + "No pipeline tasks defined. Use set_pipeline_tasks() first." + ) + + # Don't reset here - keep the predefined tasks + self._task = task + + # Log main task creation event through callbacks (following source code pattern) + task_created_event = TaskCreatedEvent( + task_id=task.id, + description=task.content, + parent_task_id=None, + task_type=task.type, + metadata=task.additional_info, + ) + for cb in self._callbacks: + cb.log_task_created(task_created_event) + + task.state = TaskState.FAILED + self.set_channel(TaskChannel()) + await self.start() + + # Collect results from all pipeline tasks + task.result = self._collect_pipeline_results() + task.state = ( + TaskState.DONE if self._all_pipeline_tasks_successful() + else TaskState.FAILED + ) + + # Log completion and mode info + logger.info( + f"Pipeline execution completed. Current mode: {self.mode.value}. " + f"Use workforce.reset() to reset mode to {self._initial_mode.value}, " + f"or workforce.set_mode() to manually change mode." + ) + return task + def _collect_pipeline_results(self) -> str: + """Collect results from all completed pipeline tasks.""" + results = [] + for task in self._completed_tasks: + if task.result: + results.append(f"--- Task {task.id} Result ---\n{task.result}") + return "\n\n".join(results) if results else "Pipeline completed" + + def _all_pipeline_tasks_successful(self) -> bool: + """Check if all pipeline tasks completed successfully. + + This method determines the FINAL STATE of the entire pipeline but does + NOT affect task execution flow. In PIPELINE mode: + + - Failed tasks still pass their results (including errors) to dependent + tasks, allowing join tasks to execute even when upstream tasks fail. + - This is handled in _post_ready_tasks() where dependencies are checked. + + This method only runs AFTER all tasks have been processed to determine + whether the overall pipeline should be marked as DONE or FAILED. + + Returns: + bool: True if all tasks completed successfully (DONE state), + False if any tasks failed or are still pending. + + Example: + Fork-Join pattern with one failed branch: + - Task A (search) → DONE + - Task B (parallel summary 1) → DONE + - Task C (parallel summary 2) → FAILED + - Task D (join/synthesis) → DONE (receives B's result + C's error) + + Result: _all_pipeline_tasks_successful() returns False + Main pipeline task marked as FAILED + But Task D still executed and got all information + """ + # 1. If there are still pending tasks, pipeline is not complete + if self._pending_tasks: + return False + + # 2. If no tasks were completed, consider it a failure + if not self._completed_tasks: + return False + + # 3. Check if all completed tasks succeeded + return all(task.state == TaskState.DONE for task in self._completed_tasks) + def process_task(self, task: Task) -> Task: r"""Synchronous wrapper for process_task that handles async operations internally. @@ -2215,9 +2681,18 @@ def reset(self) -> None: self._completed_tasks = [] self._assignees.clear() self._in_flight_tasks = 0 + self._pipeline_tasks_need_assignment = False self.coordinator_agent.reset() self.task_agent.reset() self._task_start_times.clear() + + # Reset pipeline building state + self._pipeline_builder = None + + # Reset mode to initial value + self.mode = self._initial_mode + logger.debug(f"Workforce mode reset to {self._initial_mode.value}") + for child in self._children: child.reset() @@ -3325,19 +3800,28 @@ async def _post_ready_tasks(self) -> None: tasks whose dependencies have been met.""" # Step 1: Identify and assign any new tasks in the pending queue - tasks_to_assign = [ - task - for task in self._pending_tasks - if ( - task.id not in self._task_dependencies - and ( - task.additional_info is None - or not task.additional_info.get( - "_needs_decomposition", False + # In PIPELINE mode, tasks already have dependencies set but need worker assignment + # In other modes, tasks without dependencies entry are new and need both + if self.mode == WorkforceMode.PIPELINE: + tasks_to_assign = [ + task + for task in self._pending_tasks + if task.id not in self._assignees + ] + else: + tasks_to_assign = [ + task + for task in self._pending_tasks + if ( + task.id not in self._task_dependencies + and ( + task.additional_info is None + or not task.additional_info.get( + "_needs_decomposition", False + ) ) ) - ) - ] + ] if tasks_to_assign: logger.debug( f"Found {len(tasks_to_assign)} new tasks. " @@ -3349,9 +3833,12 @@ async def _post_ready_tasks(self) -> None: f"{json.dumps(batch_result.model_dump(), indent=2)}" ) for assignment in batch_result.assignments: - self._task_dependencies[assignment.task_id] = ( - assignment.dependencies - ) + # For pipeline mode, dependencies are already set, only update assignees + # For other modes, update both dependencies and assignees + if self.mode != WorkforceMode.PIPELINE: + self._task_dependencies[assignment.task_id] = ( + assignment.dependencies + ) self._assignees[assignment.task_id] = assignment.assignee_id task_assigned_event = TaskAssignedEvent( @@ -3404,21 +3891,26 @@ async def _post_ready_tasks(self) -> None: # Only proceed with dependency checks if all deps are completed if all_deps_completed: - # Check if all dependencies succeeded (state is DONE) - all_deps_done = all( - completed_tasks_info[dep_id] == TaskState.DONE - for dep_id in dependencies - ) - - # Check if any dependency failed - any_dep_failed = any( - completed_tasks_info[dep_id] == TaskState.FAILED - for dep_id in dependencies - ) + # Determine if task should be posted based on mode + should_post_task = False + + if self.mode == WorkforceMode.PIPELINE: + # PIPELINE mode: Dependencies completed (success or failure) + should_post_task = True + logger.debug( + f"Task {task.id} ready in PIPELINE mode. " + f"All dependencies completed." + ) + else: + # AUTO_DECOMPOSE mode: All dependencies must succeed + all_deps_done = all( + completed_tasks_info[dep_id] == TaskState.DONE + for dep_id in dependencies + ) + should_post_task = all_deps_done - if all_deps_done: - # All dependencies completed successfully - post the - # task + if should_post_task: + # Post the task assignee_id = self._assignees[task.id] logger.debug( f"Posting task {task.id} to " @@ -3427,13 +3919,19 @@ async def _post_ready_tasks(self) -> None: ) await self._post_task(task, assignee_id) posted_tasks.append(task) - elif any_dep_failed: - # Check if any failed dependencies can still be retried - failed_deps = [ - dep_id + elif self.mode == WorkforceMode.AUTO_DECOMPOSE: + # AUTO_DECOMPOSE mode: Handle dependency failures + any_dep_failed = any( + completed_tasks_info[dep_id] == TaskState.FAILED for dep_id in dependencies - if completed_tasks_info[dep_id] == TaskState.FAILED - ] + ) + if any_dep_failed: + # Check if any failed dependencies can still be retried + failed_deps = [ + dep_id + for dep_id in dependencies + if completed_tasks_info[dep_id] == TaskState.FAILED + ] # Check if any failed dependency is still retryable failed_tasks_with_retry_potential = [] @@ -3559,6 +4057,24 @@ async def _handle_failed_task(self, task: Task) -> bool: # Check for immediate halt conditions if task.failure_count >= MAX_TASK_RETRIES: + # PIPELINE mode: Allow workflow to continue with failed task + if self.mode == WorkforceMode.PIPELINE: + logger.warning( + f"Task {task.id} failed after {MAX_TASK_RETRIES} " + f"retries in PIPELINE mode. Marking as failed and " + f"allowing workflow to continue. Error: {failure_reason}" + ) + task.state = TaskState.FAILED + self._cleanup_task_tracking(task.id) + self._completed_tasks.append(task) + if task.id in self._assignees: + await self._channel.archive_task(task.id) + + # Check if any pending tasks are now ready + await self._post_ready_tasks() + return False # Don't halt workforce + + # AUTO_DECOMPOSE mode: Halt on max retries logger.error( f"Task {task.id} has exceeded maximum retry attempts " f"({MAX_TASK_RETRIES}). Final failure reason: " @@ -3583,7 +4099,19 @@ async def _handle_failed_task(self, task: Task) -> bool: await self._channel.archive_task(task.id) return True - # Use intelligent failure analysis to decide recovery strategy + # PIPELINE mode: Simple retry without intelligent recovery + if self.mode == WorkforceMode.PIPELINE: + logger.info( + f"Task {task.id} failed in PIPELINE mode. " + f"Will retry (attempt {task.failure_count}/{MAX_TASK_RETRIES})" + ) + # Simply reset to pending for retry + task.state = TaskState.PENDING + self._pending_tasks.append(task) + await self._post_ready_tasks() + return False + + # AUTO_DECOMPOSE mode: Use intelligent failure analysis recovery_decision = self._analyze_task( task, for_failure=True, error_message=detailed_error ) @@ -4439,6 +4967,7 @@ def clone(self, with_memory: bool = False) -> 'Workforce': share_memory=self.share_memory, use_structured_output_handler=self.use_structured_output_handler, task_timeout_seconds=self.task_timeout_seconds, + mode=self.mode, ) for child in self._children: diff --git a/examples/workforce/pipeline_workflow_example.py b/examples/workforce/pipeline_workflow_example.py new file mode 100644 index 0000000000..7d2ebf3d28 --- /dev/null +++ b/examples/workforce/pipeline_workflow_example.py @@ -0,0 +1,378 @@ +# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= + +""" +Pipeline Workflow Example - Demonstration of CAMEL Workforce Pipeline functionality. + +This example demonstrates the fork-join pattern in CAMEL Workforce Pipeline mode: +- Single source task (literature search) +- Fork to 5 parallel summarization tasks +- Join results into a comprehensive synthesis + +The example shows how to handle parallel task processing where multiple agents +work on different parts of the same data source with automatic synchronization. +""" + +import asyncio +import time + +from colorama import Fore, Style, init + +from camel.configs import ChatGPTConfig +from camel.agents import ChatAgent +from camel.models import ModelFactory +from camel.societies.workforce import Workforce, WorkforceMode +from camel.tasks import Task +from camel.types import ModelPlatformType, ModelType +from camel.messages import BaseMessage + +# Initialize colorama for colored output +init(autoreset=True) + +# Create model once for all agents +# model = ModelFactory.create( +# model_platform=ModelPlatformType.DEFAULT, +# model_type=ModelType.DEFAULT, +# ) + +model = ModelFactory.create( + model_platform=ModelPlatformType.OPENAI, + model_type=ModelType.GPT_4_1, + model_config_dict=ChatGPTConfig().as_dict(), +) + +def print_section(title: str): + """Print a colored section header.""" + print(f"\n{Fore.CYAN}{'='*60}") + print(f"{Fore.CYAN}{title}") + print(f"{Fore.CYAN}{'='*60}{Style.RESET_ALL}") + +def print_result(task_id: str, result: str): + """Print task result with formatting.""" + print(f"{Fore.GREEN}Task {task_id} completed:") + print(f"{Fore.WHITE}Result: {result[:200]}{'...' if len(result) > 200 else ''}") + print() + + +async def example_1_literature_analysis_pipeline(): + """Example 1: Literature analysis with parallel summarization.""" + print_section("Example 1: Literature Analysis with Parallel Processing") + + # Create coordinator agent + coordinator_agent = ChatAgent( + BaseMessage.make_assistant_message( + role_name="Literature Analysis Coordinator", + content="You are a coordinator responsible for managing literature analysis tasks and ensuring quality output." + ), + model=model + ) + + # Create task agent using the same model + task_agent = ChatAgent( + BaseMessage.make_assistant_message( + role_name="Task Planning Agent", + content="You are a task planning agent responsible for analyzing and coordinating complex tasks." + ), + model=model + ) + + # Create workforce for literature analysis + workforce = Workforce( + "Literature Analysis Team", + coordinator_agent=coordinator_agent, + task_agent=task_agent, + mode=WorkforceMode.PIPELINE + ) + + # Add search agent with search tools (using mock data for demonstration) + search_system_message = BaseMessage.make_assistant_message( + role_name="Literature Researcher", + content="You are a literature researcher. Provide 5 recent AI/ML papers with titles, authors, and brief descriptions. Format them as [Paper 1], [Paper 2], etc. Use representative examples from recent AI/ML research." + ) + search_agent = ChatAgent( + system_message=search_system_message, + model=model, + tools=[] + ) + + # Add multiple summary agents for parallel processing + for i in range(5): + summary_system_message = BaseMessage.make_assistant_message( + role_name=f"Summary Specialist {i+1}", + content="You are a literature summary specialist. Focus on extracting key insights, methodologies, and contributions from research papers." + ) + summary_agent = ChatAgent( + system_message=summary_system_message, + model=model, + tools=[] + ) + workforce.add_single_agent_worker(f"Summary Specialist {i+1}", summary_agent) + + # Add synthesis agent + synthesis_system_message = BaseMessage.make_assistant_message( + role_name="Research Synthesizer", + content="You are a research synthesizer. Combine multiple literature summaries into comprehensive analysis and identify research trends." + ) + synthesis_agent = ChatAgent( + system_message=synthesis_system_message, + model=model, + tools=[] + ) + + workforce.add_single_agent_worker("Literature Researcher", search_agent) + workforce.add_single_agent_worker("Research Synthesizer", synthesis_agent) + + # Build literature analysis pipeline + workforce.pipeline_add("Generate 5 representative recent AI/ML papers with titles, authors, and brief descriptions. Format as [Paper 1] to [Paper 5]") \ + .pipeline_fork([ + "Summarize [Paper 1] core insights, methodology and contributions", + "Summarize [Paper 2] core insights, methodology and contributions", + "Summarize [Paper 3] core insights, methodology and contributions", + "Summarize [Paper 4] core insights, methodology and contributions", + "Summarize [Paper 5] core insights, methodology and contributions" + ]) \ + .pipeline_join("Analyze AI/ML research trends based on the 5 paper summaries") \ + .pipeline_build() + + print(f"{Fore.YELLOW}Literature analysis pipeline built: 1 search → 5 parallel summaries → 1 synthesis") + print(f"{Fore.YELLOW}Mode: {workforce.mode}") + + # Execute the pipeline + main_task = Task( + content="AI/ML Literature Review and Trend Analysis", + id="literature_analysis" + ) + + start_time = time.time() + result = await workforce.process_task_async(main_task) + end_time = time.time() + + print_result(result.id, result.result or "Literature analysis completed successfully") + print(f"{Fore.BLUE}[TIME] Execution time: {end_time - start_time:.2f} seconds") + + return result + +def print_summary(): + """Print example summary.""" + print_section("Pipeline Example Summary") + + print(f"{Fore.GREEN}Pipeline pattern demonstrated:") + patterns = [ + "✓ FORK-JOIN: Single task → 5 parallel processing → 1 synthesis", + "✓ PARALLEL SCALING: Easy adjustment of parallel worker count", + "✓ DEPENDENCY CHAINS: Automatic synchronization and dependency resolution", + "✓ STRUCTURED OUTPUT: Using [markers] for smart task distribution" + ] + + for pattern in patterns: + print(f"{Fore.WHITE} {pattern}") + + print(f"\n{Fore.CYAN}Technical features:") + tech_features = [ + "• Pipeline mode with pipeline_fork() and pipeline_join()", + "• Automatic worker assignment and task routing", + "• Multi-agent parallel coordination", + "• Structured task dependencies" + ] + + for feature in tech_features: + print(f"{Fore.WHITE} {feature}") + +async def main(): + """Main function to run pipeline example.""" + print_section("CAMEL Workforce Pipeline Example") + print(f"{Fore.YELLOW}Testing Pipeline mode with fork-join pattern and parallel processing.") + + result = None + + try: + print(f"\n{Fore.BLUE}Running literature analysis pipeline example...") + + result = await example_1_literature_analysis_pipeline() + + print_summary() + + print(f"\n{Fore.GREEN}Pipeline example completed successfully!") + + except Exception as e: + print(f"\n{Fore.RED}Error: {e}") + import traceback + traceback.print_exc() + raise + + return result + +if __name__ == "__main__": + # Run the examples + asyncio.run(main()) + +""" +============================================================ +CAMEL Workforce Pipeline Example +============================================================ +Testing Pipeline mode with fork-join pattern and parallel processing. + +Running literature analysis pipeline example... + +============================================================ +Example 1: Literature Analysis with Parallel Processing +============================================================ +Literature analysis pipeline built: 1 search → 5 parallel summaries → 1 synthesis +Mode: WorkforceMode.PIPELINE +Worker node a17f1759-8729-4986-9842-8de8646ff1f0 (Literature Researcher) get task pipeline_task_0: Generate 5 representative recent AI/ML papers with titles, authors, and brief descriptions. Format as [Paper 1] to [Paper 5] +====== +Response from Worker node a17f1759-8729-4986-9842-8de8646ff1f0 (Literature Researcher): + +[Paper 1] +Title: "Scaling Laws for Generative Language Models Revisited" +Authors: Ethan Dyer, Xuezhi Wang, Jeffrey Wu, et al. +Description: This paper provides an updated empirical analysis of scaling laws for large transformer-based language models, exploring how model performance improves with increased data and parameters, and offering new insights for efficient model training. + +[Paper 2] +Title: "Aligning Language Models to Follow Instructions" +Authors: Long Ouyang, Jeffrey Wu, Xu Jiang, et al. +Description: The authors propose novel methods for instruction tuning large language models using reinforcement learning from human feedback (RLHF), improving alignment and helpfulness in conversational AI systems. + +[Paper 3] +Title: "EfficientViT: Memory Efficient Vision Transformers with Cascaded Group Attention" +Authors: Ziyu Guo, Zhiqiang Shen, Zhijian Liu, et al. +Description: EfficientViT introduces a new vision transformer architecture employing cascaded group attention, significantly reducing memory usage and computational cost while maintaining competitive performance on image classification benchmarks. + +[Paper 4] +Title: "GraphGPT: Enhancing Graph Neural Networks with Generative Pre-training" +Authors: Yuanhao Xiong, Junchi Yan, Xiaokang Yang, et al. +Description: GraphGPT integrates generative pre-training strategies into graph neural networks, enabling better performance on graph-level tasks and transfer learning scenarios commonly found in molecular property prediction and social network analysis. + +[Paper 5] +Title: "Self-Rewarding Language Models" +Authors: Daniel Fried, John Thickstun, Tatsunori Hashimoto, et al. +Description: This work introduces a self-supervised learning strategy where language models assign their own rewards during training, effectively improving instruction following and generalization without extensive human-annotated data. +====== +Task pipeline_task_0 completed successfully (quality score: 100). +Worker node 8a8e68ae-1cd9-480f-9dd2-c5d59d04ce0b (Summary Specialist 2) get task parallel_1_1: Summarize [Paper 2] core insights, methodology and contributions +Worker node 25cd87e6-253f-46de-be00-1e5c4cb0c6a7 (Summary Specialist 5) get task parallel_4_1: Summarize [Paper 5] core insights, methodology and contributions +Worker node c3c5898e-690e-4c8a-b94c-4e9fe706058b (Summary Specialist 4) get task parallel_3_1: Summarize [Paper 4] core insights, methodology and contributions +Worker node ba2d3ca5-14e2-49ac-a0d0-36a937c58fdc (Summary Specialist 3) get task parallel_2_1: Summarize [Paper 3] core insights, methodology and contributions +Worker node 85b0aaba-1df5-47fa-8997-5f1a0e30e127 (Summary Specialist 1) get task parallel_0_1: Summarize [Paper 1] core insights, methodology and contributions +====== +Response from Worker node 8a8e68ae-1cd9-480f-9dd2-c5d59d04ce0b (Summary Specialist 2): + +Core Insights: "Aligning Language Models to Follow Instructions" demonstrates that large language models can be substantially improved to follow natural language instructions by incorporating reinforcement learning from human feedback (RLHF). The paper reveals that instruction tuning with human preference data significantly enhances the alignment, helpfulness, and safety of conversational AI systems. + +Methodology: The authors propose a two-stage approach. First, they fine-tune pre-trained language models on datasets of instruction-following demonstrations created by human annotators. Second, they employ RLHF: they collect human preferences over model outputs, train a reward model to predict these preferences, and optimize the language model using reinforcement learning (often Proximal Policy Optimization) to maximize expected human-derived rewards. + +Contributions: (1) Introduced a scalable method for aligning large language models with human intention and values, (2) conducted thorough empirical studies demonstrating improvements in model helpfulness and safety via RLHF, (3) released training protocols and insights that advance practical instruction-following conversational agents. +====== +Task parallel_1_1 completed successfully (quality score: 98). +====== +Response from Worker node 25cd87e6-253f-46de-be00-1e5c4cb0c6a7 (Summary Specialist 5): + +Core Insights: 'Self-Rewarding Language Models' introduces a novel self-supervised learning approach whereby language models (LMs) generate their own reward signals during training, rather than relying heavily on human-annotated data or external reward models. The key insight is that enabling LMs to internally assess and reward their generated responses can lead to improved instruction following and better generalization to new tasks. + +Methodology: The authors develop a framework where the LM evaluates its own outputs and assigns self-generated rewards, which are then used to optimize model behavior through reinforcement learning. This involves designing mechanisms for the language model to estimate reward signals based on internal metrics or performance proxies, without requiring direct human feedback or large-scale manual labeling. + +Contributions: +1. Proposes a self-rewarding training paradigm for language models, reducing reliance on costly human feedback. +2. Demonstrates empirically that self-rewarding can improve instruction-following capabilities and generalization to unseen instructions. +3. Shows that self-rewarding strategies yield competitive or improved performance compared to methods relying on human-annotated rewards, supporting more scalable and autonomous model development. +====== +====== +Response from Worker node c3c5898e-690e-4c8a-b94c-4e9fe706058b (Summary Specialist 4): + +Core Insights: 'GraphGPT: Enhancing Graph Neural Networks with Generative Pre-training' demonstrates that leveraging generative pre-training strategies—previously successful in language domains—can significantly improve the performance and transferability of graph neural networks (GNNs). The paper finds that pre-trained GNNs excel in graph-level tasks, notably in fields like molecular property prediction and social network analysis, by better capturing underlying graph structures and patterns. + +Methodology: The authors adapt generative pre-training approaches for application to graph data, constructing a large-scale graph corpus for pre-training. The model is first trained to perform generative tasks on unlabeled graphs, such as graph completion or node feature prediction. After this pre-training phase, the GNN is fine-tuned on downstream, task-specific, and often smaller datasets. Extensive experiments compare GraphGPT to standard GNNs and other transfer learning methods on multiple benchmarks. + +Contributions: +1. Proposes a novel framework—GraphGPT—that embeds generative pre-training into GNNs. +2. Demonstrates significant improvements in transfer learning and generalization across diverse graph-based tasks compared to conventional supervised training. +3. Provides empirical results showing state-of-the-art performance in key applications such as molecular property prediction, highlighting the power and flexibility of generative graph pre-training. +====== +====== +Response from Worker node ba2d3ca5-14e2-49ac-a0d0-36a937c58fdc (Summary Specialist 3): + +Core Insights: 'EfficientViT: Memory Efficient Vision Transformers with Cascaded Group Attention' presents an innovative architecture for vision transformers that drastically reduces memory consumption and computational requirements, while preserving high accuracy on image classification tasks. The key insight is the introduction of 'cascaded group attention', which hierarchically structures attention computation to lessen redundancy and enhance efficiency. + +Methodology: The authors design a vision transformer where the standard global self-attention is replaced with cascaded group attention mechanisms. This approach divides the attention computation into multi-stage groups, allowing the model to focus attention efficiently within local and progressively broader regions before aggregating global context. The architecture is evaluated on standard image classification datasets, and its performance is compared against conventional vision transformer baselines in terms of accuracy, memory usage, and speed. + +Contributions: +1. Proposing the cascaded group attention mechanism, enabling significant memory and computation reduction in vision transformers. +2. Delivering a transformer model (EfficientViT) that consistently matches or outperforms existing models while using less memory and computational resources. +3. Providing comprehensive empirical analysis demonstrating competitive benchmark performance alongside resource savings, making vision transformers more practical for resource-constrained environments. +====== +Task parallel_4_1 completed successfully (quality score: 97). +Task parallel_3_1 completed successfully (quality score: 95). +====== +Response from Worker node 85b0aaba-1df5-47fa-8997-5f1a0e30e127 (Summary Specialist 1): + +Paper 1, titled "Scaling Laws for Generative Language Models Revisited" by Ethan Dyer, Xuezhi Wang, Jeffrey Wu, et al., revisits and updates empirical scaling laws for large transformer-based language models. + +Core Insights: The paper identifies how model performance continues to improve predictably with greater model parameters and data sizes. It provides refined understanding of the functional relationships between model scale, data scale, and achieved model loss, confirming and updating previously observed power-law patterns. The findings highlight diminishing returns at extreme scales and underscore the importance of balancing data and parameter count for efficient training. + +Methodology: The authors conduct extensive empirical experiments by training transformer-based generative language models at various scales, systematically varying both the amount of data and the number of model parameters. They then measure performance metrics (such as loss and downstream task performance) and fit empirical models to quantify and analyze the scaling relationships. The study also includes comparisons across different datasets and training regimens to test the robustness of the observed scaling laws. + +Contributions: This work provides updated, large-scale empirical evidence for scaling laws in generative language models, offering practical guidelines on model size and data requirements for efficient training. The analysis informs strategies for resource allocation and helps guide future training of even larger models. Additionally, the refined scaling law equations and adjusted empirical constants contribute to the research community's ability to anticipate and plan the development of new, larger language models. +====== +Task parallel_2_1 completed successfully (quality score: 95). +Task parallel_0_1 completed successfully (quality score: 98). +Worker node f60b0634-4e23-4f4f-8240-c8c63b4c98aa (Research Synthesizer) get task pipeline_task_6: Analyze AI/ML research trends based on the 5 paper summaries +====== +Response from Worker node f60b0634-4e23-4f4f-8240-c8c63b4c98aa (Research Synthesizer): + +Analyzing the summaries of the five AI/ML research papers reveals several major trends and directions currently shaping the field: + +1. Model Scaling and Efficiency: +Papers 1 and 3 focus on the scaling of models (both language and vision) and efficient architecture design. Paper 1 refines empirical scaling laws in large language models, highlighting predictable performance improvements with increased model/data size and providing practical guidelines for resource allocation. Paper 3 introduces memory-efficient vision transformers with cascaded group attention, demonstrating a strong trend toward creating architectures that offer both high performance and lower computational requirements for broader accessibility and deployment. + +2. Advancing Model Alignment and Autonomy: +Paper 2 emphasizes aligning language models to better follow human instructions using RLHF, while Paper 5 proposes a shift towards autonomy, enabling language models to generate and optimize their own rewards (self-rewarding LMs). This represents a key trajectory in AI safety, alignment, and scalability—first leveraging human feedback for grounded alignment, and then exploring self-guided reward mechanisms to reduce dependency on costly human annotation and enable more scalable training. + +3. Transfer Learning and Pre-training Across Modalities: +Paper 4 extends generative pre-training strategies from the NLP domain to graph neural networks (GNNs), showing significant improvements in generalizability and performance on graph-structured data. This highlights a maturing trend of adapting successful pre-training/transfer learning paradigms across diverse data modalities beyond text or images, such as graphs, for specialized tasks and improved efficiency. + +4. Diminishing Reliance on Human Supervision: +Both Paper 2 (with RLHF) and Paper 5 (self-rewarding LMs) tackle the cost and scalability limits of human-in-the-loop systems, the latter pushing further toward models that can autonomously shape their learning objectives. This shift has implications for economic scalability, as well as potential risks around model alignment and oversight. + +5. Empirical Rigor and Benchmarks: +All papers emphasize large-scale empirical evaluation, thorough benchmarking against existing baselines, and practical implications (e.g., resource tradeoffs, real-world tasks). The field is moving toward more grounded, reproducible research that informs not only theoretical development but decisions around model deployment and development lifecycles. + +In summary, contemporary AI/ML research is converging on scalable model development, architectural efficiency, improved alignment (both human-guided and self-supervised), and bridging successful learning frameworks between modalities. There is an ongoing move toward reducing training costs—both computational and human—while maintaining or advancing model performance, safety, and generalizability. +====== +Task pipeline_task_6 completed successfully (quality score: 97). +Task literature_analysis completed: +Result: --- Task pipeline_task_0 Result --- +[Paper 1] +Title: "Scaling Laws for Generative Language Models Revisited" +Authors: Ethan Dyer, Xuezhi Wang, Jeffrey Wu, et al. +Description: This paper provides an up... + +[TIME] Execution time: 66.42 seconds + +============================================================ +Pipeline Example Summary +============================================================ +Pipeline pattern demonstrated: + ✓ FORK-JOIN: Single task → 5 parallel processing → 1 synthesis + ✓ PARALLEL SCALING: Easy adjustment of parallel worker count + ✓ DEPENDENCY CHAINS: Automatic synchronization and dependency resolution + ✓ STRUCTURED OUTPUT: Using [markers] for smart task distribution + +Technical features: + • Pipeline mode with pipeline_fork() and pipeline_join() + • Automatic worker assignment and task routing + • Multi-agent parallel coordination + • Structured task dependencies + +Pipeline example completed successfully! +""" \ No newline at end of file