Skip to content

Conversation

JackYPCOnline
Copy link
Contributor

@JackYPCOnline JackYPCOnline commented Sep 19, 2025

Description

This PR introduce interfaces, hooks and utils that multiagent persistence needs.

Key Change:

  1. Hooks and Events
    add Multiagent events and register within SessionManager. All newly added events are used now.

  2. Graph and Swarm:
    add session persistence to Graph and Swarm. Swarm and Graph can restore from saved session.

  • 2.1 Graph will executed with next_nodes_to_execute = [] but finish execution immediately if Status is completed.
  • 2.2 Swarm will not execute if Status is completed since Swarm has execution different logic but Swarm will call _build_result() with saved context.
  • 2.3 Change context flatten function for pass node result accross the execution for easier reuse.
  • 2.4 add session & hook paramters to constructor and builder.
  1. Session:
  • 3.1 Introduce SessionType
  • 3.2 add abstract function to read/write multiagent json under lock
  • 3.3 adapt necessray change to file/s3 session manager

New APIs:

class SessionManager(HookProvider, ABC):
    def __init__(self, session_type: SessionType = SessionType.AGENT) -> None: ..
   @abstractmethod
    def write_multi_agent_json(self, state: dict[str, Any]) -> None: ...
    @abstractmethod
    def read_multi_agent_json(self) -> dict[str, Any]: ...

@dataclass
class NodeResult:
    def get_agent_results(self) -> list[AgentResult]: ...
    def to_dict(self) -> dict[str, Any]: ...


@dataclass
class MultiAgeentResult:
    def to_dict(self) -> dict[str,Any]: ...
    def from_dict(self) -> None:...

@dataclass
class NodeResult:
     def to_dict(self) -> dict[str,Any]: ...
     def from_dict(self) -> None:...

class MultiAgentBase(ABC):
    @abstractmethod
    def serialize_state(self) -> dict: ...
    @abstractmethod
    def deserialize_state(self, payload: dict) -> None: ...

    def serialize_node_result_for_persist(self, raw: NodeResult) -> dict[str, Any]: ...

class Graph(MultiAgentBase):
      def __init__(self, .........session_manager = None,  hooks )
      
       def serialize_state(self) -> dict: ...
        def deserialize_state(self, payload: dict) -> None:..

class GraphBuilder:
      def set_session_manager() -> Builder

class Swarm(MultiAgentBase):
      def __init__(self, .........session_manager = None,  hooks )
      def serialize_state(self) -> dict: ...
      def deserialize_state(self, payload: dict) -> None:..

   

Test code/ User reference:
S3:

analyst = Agent(name="analyst", system_prompt="You are a data analysis specialist...")
    # beside that system prompt might change
    fact_checker = Agent(name="fact_checker", system_prompt="You are a fact checking specialist...")
    report_writer = Agent(name="report_writer", system_prompt="You are a report writing specialist...")
    refine_writer = Agent(name="refine_writer", system_prompt="You are a report refining specialist...")
    # Build the graph
    builder = GraphBuilder()

    # Add nodes
    builder.add_node(researcher, "research")
    builder.add_node(analyst, "analysis")
    builder.add_node(fact_checker, "fact_check")
    builder.add_node(report_writer, "report")
    builder.add_node(refine_writer, "refine")

    # Add edges (dependencies)
    builder.add_edge("research", "analysis")
    builder.add_edge("research", "fact_check")
    builder.add_edge("analysis", "report")
    builder.add_edge("fact_check", "report")
    builder.add_edge("report", "refine")

    # Set entry points (optional - will be auto-detected if not specified)
    builder.set_entry_point("research")

    # Optional: Configure execution limits for safety
    builder.set_execution_timeout(600)  # 10 minute timeout

    # Set session manager on builder
    builder.set_session_manager(sm)

    # Build the graph
    graph = builder.build()

    return graph

def main():
    logging.basicConfig(level=logging.DEBUG)
    session_id = "personal"  # your stable id

    sm = S3SessionManager(session_id=session_id,bucket = "ncclegg-session-test",session_type=SessionType.MULTI_AGENT) 
    graph = build_graph(sm)

File:

def build_swarm(sm):
    researcher = Agent(name="researcher", system_prompt="Research concisely, then handoff to 'analyst'.")
    analyst    = Agent(name="analyst",    system_prompt="Analyze, then handoff to 'writer'.")
    writer     = Agent(name="writer",     system_prompt="Write final report.")


    return Swarm(
        [researcher, analyst, writer],
        session_manager=sm,  
        max_iterations=20,
        max_handoffs=20,
    )

def main():
    session_id  = "swarm_demo"
    storage_dir = "/Volumes/workplace/Strands/personal/sessions"
    os.makedirs(storage_dir, exist_ok=True)
    sm = FileSessionManager(session_id=session_id, storage_dir=storage_dir, session_type=SessionType.MULTI_AGENT)

    swarm = build_swarm(sm)
     result = swarm("Research AI in healthcare and produce a concise report.")

Save multiagent_state_json example:

{
  "type": "swarm",
  "status": "completed",
  "completed_nodes": [
    "researcher",
    "analyst",
    "writer"
  ],
  "node_results": {
    "researcher": {
      "result": {
        "type": "agent_result",
        "stop_reason": "end_turn",
        "message": {
          "role": "assistant",
          "content": [
            {
              "text": "I've completed the research phase and handed off to the analyst to create the requested 100-word report on AI in healthcare. The analyst has all the key findings including applications (diagnostic imaging, drug discovery, personalized medicine, etc.), benefits (improved accuracy, reduced costs, faster diagnosis), and challenges (data privacy, regulatory approval, system integration)."
            }
          ]
        }
      },
      "execution_time": 9778,
      "status": "completed",
      "accumulated_usage": {
        "inputTokens": 1759,
        "outputTokens": 570,
        "totalTokens": 2329
      },
      "accumulated_metrics": {
        "latencyMs": 9476
      },
      "execution_count": 1
    },
    "analyst": {
      "result": {
        "type": "agent_result",
        "stop_reason": "end_turn",
        "message": {
          "role": "assistant",
          "content": [
            {
              "text": "I've handed off the task to the writer agent who will now create the concise 100-word report on AI in healthcare using all the research findings that were previously gathered. The writer has all the necessary information about key applications, benefits, challenges, and the specific requirements for the report."
            }
          ]
        }
      },
      "execution_time": 8644,
      "status": "completed",
      "accumulated_usage": {
        "inputTokens": 1951,
        "outputTokens": 405,
        "totalTokens": 2356
      },
      "accumulated_metrics": {
        "latencyMs": 8339
      },
      "execution_count": 1
    },
    "writer": {
      "result": {
        "type": "agent_result",
        "stop_reason": "end_turn",
        "message": {
          "role": "assistant",
          "content": [
            {
              "text": "Based on the context provided, it appears that both the researcher and analyst agents have already worked on this task. Since I'm being asked to write the final report and the previous agents have presumably gathered and analyzed the necessary information, I'll proceed with creating a concise 100-word report on AI in healthcare.\n\n**AI in Healthcare: Executive Summary**\n\nArtificial Intelligence is revolutionizing healthcare through enhanced diagnostics, personalized treatment plans, and operational efficiency. Machine learning algorithms analyze medical imaging with unprecedented accuracy, enabling early disease detection. AI-powered drug discovery accelerates pharmaceutical development, reducing costs and timeframes. Predictive analytics help prevent hospital readmissions and optimize resource allocation. Virtual health assistants improve patient engagement and medication adherence. Robotic surgery systems enhance precision and minimize invasive procedures. However, challenges include data privacy concerns, regulatory compliance, and integration with existing systems. Despite obstacles, AI's potential to improve patient outcomes while reducing healthcare costs makes it indispensable for modern medical practice advancement.\n\nThe task is now complete with a comprehensive 100-word report on AI in healthcare."
            }
          ]
        }
      },
      "execution_time": 4921,
      "status": "completed",
      "accumulated_usage": {
        "inputTokens": 622,
        "outputTokens": 231,
        "totalTokens": 853
      },
      "accumulated_metrics": {
        "latencyMs": 4716
      },
      "execution_count": 1
    }
  },
  "next_node_to_execute": [],
  "current_task": "Research AI in healthcare and produce a concise report in 100 words.",
  "execution_order": [
    "researcher",
    "analyst",
    "writer"
  ],
  "context": {
    "shared_context": {},
    "handoff_message": null
  }
}

Related Issues

#867
#500

Documentation PR

Type of Change

Bug fix
New feature
Breaking change
Documentation update
Other (please describe):

Testing

How have you tested the change? Verify that the changes do not break functionality or introduce warnings in consuming repositories: agents-docs, agents-tools, agents-cli

  1. add Unit tests
  2. Test with real agent with both Sequential and Concurrent Graph, sequential Swarm.
  3. Test file/ S3 storage and restore with Graph/ Swarm execution.
  • [x ] I ran hatch run prepare

Checklist

  • [x ] I have read the CONTRIBUTING document
  • [x ] I have added any necessary tests that prove my fix is effective or my feature works
  • I have updated the documentation accordingly
  • [x ] I have added an appropriate example to the documentation to outline the feature, or no new docs are needed
  • [x ] My changes generate no new warnings
  • [x ] Any dependent changes have been merged and published

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@JackYPCOnline JackYPCOnline marked this pull request as ready for review September 25, 2025 14:39
@JackYPCOnline JackYPCOnline changed the title Multi agent session Multi agent session interface Sep 25, 2025
@JackYPCOnline JackYPCOnline changed the title Multi agent session interface Enable multi agent session persistence Sep 29, 2025
Copy link
Member

@Unshure Unshure left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you doing getattr throughout the code. I would like to avoid doing this, and instead check if objects are instances of classes. Two classes could have the same attribute, leading to undesirable behavior.

@pgrayy pgrayy self-requested a review October 20, 2025 19:22
@pgrayy pgrayy self-requested a review October 20, 2025 20:32
Copy link
Member

@pgrayy pgrayy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comments.

"""Return a JSON-serializable snapshot of the orchestrator state."""
raise NotImplementedError

def deserialize_state(self, payload: dict[str, Any]) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't see deserialize as being a mutative action. If this is actually doing

"""Restore orchestrator state from a session dict and prepare for execution.

why not call restore_from_state or session

self.state.results[node_id] = node_result

# Persist failure here
self.hooks.invoke_callbacks(AfterNodeCallEvent(self, node_id=node_id, invocation_state=invocation_state))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't we just do a finally here for both the happy and sad path case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is we should fire this event right after a node failed. But for sucess node we should fire after it has been added to self.state.node_history that's where it represent state updated. .

return result

except Exception as e:
execution_time = round((time.time() - start_time) * 1000)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment about execution time

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see above.

payload: Dictionary containing persisted state data including status,
completed nodes, results, and next nodes to execute.
"""
if not payload.get("next_nodes_to_execute"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For graph is this actually expected behavior? Ignoring persisted state, if I had a graph locally only, the graph completed, then I re-executed, would we reset? This seems like strange logic to have ONLY for session peristence

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if a graph is failed with a blocking node or graph is completed , next_nodes_to_execute will be [] , otherwise we will always have some [node_ids] here.
How graph complete is while ready_nodes which is a [nodes] so it is actually the same logic.

from typing import Any, Callable, Optional, Tuple

from opentelemetry import trace as trace_api

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General comment, I am having a difficult time reviewing all of this code. It is complex. Can we please merge the events and base (potentially in a branch) and then split graph and swarm as well as the session manager implementations.

I think this review is taking so long because every time I take a look I am focussing on different things because the cognitive load is so high in each review session.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alright

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants