Skip to content

Commit f1330e4

Browse files
committed
added sample conversation/session manager on top of kurrentdb to integrations
1 parent e1e7917 commit f1330e4

File tree

7 files changed

+1313
-3
lines changed

7 files changed

+1313
-3
lines changed

03-integrations/README.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
# Strands Agent Integrations
22

3-
| Integration | Features showcased |
4-
| ------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
5-
| [A2A Protocol](./A2A-protocol/) | Demonstrates agent-to-agent communication protocol for collaborative problem-solving between specialized AI agents. |
3+
| Integration | Features showcased |
4+
| ------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
5+
| [A2A Protocol](./A2A-protocol/) | Demonstrates agent-to-agent communication protocol for collaborative problem-solving between specialized AI agents. |
66
| [Aurora DSQL](./aurora-DSQL) | Demonstrates the Strands Agent integration with Amazon Aurora DSQL. |
77
| [Nova Act](./nova-act) | Nova Act integration with Strands. Amazon Nova Act is an AI model trained to perform actions within a web browser. |
88
| [Tavily](./tavily/) | This agent uses Tavily's web search, extract and crawl APIs to gather information from reliable sources, extract key insights, and save comprehensive research reports in Markdown format. |
9+
| [Persistent Session With KurrentDB](./persistent-session-kurrentdb/) | A conversation and session manager which allows you to store and restore state of your agents to resume execution. It also has unique temporal windowing capabilities, in addition to fix window sizing.|
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
3.12
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
# KurrentDB Conversation Manager for Strands Agent
2+
3+
A persistent conversation manager implementation for Strands Agent that uses KurrentDB as the storage backend. This manager enables conversation history persistence, state management, and recovery capabilities for AI agents.
4+
5+
## Overview
6+
7+
The `KurrentDBConversationManager` extends the Strands framework's `ConversationManager` to provide:
8+
9+
- **Persistent Message Storage**: All conversation messages are stored as events in KurrentDB streams
10+
- **State Checkpointing**: Save and restore agent state at any point in the conversation
11+
- **Conversation History Management**: Configure retention policies with maximum age or count limits
12+
- **Recovery Capabilities**: Restore agent state and conversation history after restarts
13+
14+
## Installation
15+
16+
### Prerequisites
17+
18+
- Python 3.7+
19+
- KurrentDB instance running (default: `localhost:2113`)
20+
- Required Python packages:
21+
```bash
22+
pip install strands kurrentdbclient
23+
```
24+
## Quick Start
25+
```json
26+
pip install strands-agents[anthropic]
27+
```
28+
Setup an instance of KurrentDB: https://console.kurrent.cloud/signup or https://aws.amazon.com/marketplace/pp/prodview-kxo6grvoovk2y?sr=0-1&ref_=beagle&applicationId=AWSMPContessa
29+
```python
30+
from strands import Agent
31+
from strands.models.anthropic import AnthropicModel
32+
from kurrentdb_session_manager import KurrentDBConversationManager
33+
34+
unique_run_id = "run-01"
35+
kurrentdb_conversation_manager = (
36+
KurrentDBConversationManager(unique_run_id, "connection string here")
37+
) # replace with your actual connection string
38+
39+
# kurrentdb_conversation_manager.set_max_window_age(60) # Set max window age to 60 seconds
40+
model = AnthropicModel(
41+
client_args={
42+
"api_key": "Your API KEY here", # Replace with your actual API key
43+
},
44+
# **model_config
45+
max_tokens= 4096,
46+
model_id="claude-3-5-haiku-latest",
47+
params={
48+
"temperature": 0.7,
49+
}
50+
)
51+
52+
poet_agent = Agent(
53+
system_prompt="You are a hungry poet who loves to write haikus about everything.",
54+
model=model,
55+
conversation_manager=kurrentdb_conversation_manager, # Assuming no specific conversation manager is needed
56+
)
57+
poet_agent("Write a haiku about the beauty of nature.")
58+
kurrentdb_conversation_manager.save_agent_state(unique_run_id=unique_run_id,
59+
state={"messages": poet_agent.messages,
60+
"system_prompt": poet_agent.system_prompt})
61+
poet_agent("Based on the previous haiku, write another one about the changing seasons.")
62+
poet_agent = kurrentdb_conversation_manager.restore_agent_state(agent=poet_agent,unique_run_id=unique_run_id)
63+
poet_agent("What did we just talk about?")
64+
65+
66+
67+
68+
```
69+
70+
## Features
71+
72+
### 1. Persistent Message Storage
73+
74+
Every message in the conversation is automatically stored as an event in KurrentDB:
75+
- Each message is stored with its role (user/assistant/system) as the event type
76+
- Messages are stored in order with stream positions for accurate replay
77+
78+
### 2. State Management
79+
80+
Save and restore complete agent state:
81+
82+
```python
83+
# Save current state
84+
conversation_manager.save_agent_state(
85+
unique_run_id="run-01",
86+
state={
87+
"messages": agent.messages,
88+
"system_prompt": agent.system_prompt,
89+
"custom_data": "any additional state"
90+
}
91+
)
92+
93+
# Restore state later
94+
agent = conversation_manager.restore_agent_state(
95+
agent=agent,
96+
unique_run_id="run-01"
97+
)
98+
```
99+
100+
### 3. Conversation Retention Policies
101+
102+
Configure how long conversations are retained:
103+
104+
```python
105+
# Set maximum age (in seconds)
106+
conversation_manager.set_max_window_age(3600) # Keep messages for 1 hour
107+
108+
# Set maximum message count
109+
conversation_manager.set_max_window_size(100) # Keep last 100 messages
110+
```
111+
112+
### 4. Window Size Management
113+
114+
Control how many messages are loaded into memory:
115+
116+
```python
117+
conversation_manager = KurrentDBConversationManager(
118+
unique_run_id="run-01",
119+
connection_string="esdb://localhost:2113?Tls=false",
120+
window_size=40 # Load last 40 messages by default
121+
)
122+
```
123+
124+
## API Reference
125+
126+
### Constructor
127+
128+
```python
129+
KurrentDBConversationManager(
130+
unique_run_id: str,
131+
connection_string: str = "esdb://localhost:2113?Tls=false",
132+
window_size: int = 40,
133+
reducer_function = lambda x: x
134+
)
135+
```
136+
137+
**Parameters:**
138+
- `unique_run_id`: Unique identifier for the conversation stream
139+
- `connection_string`: KurrentDB connection string
140+
- `window_size`: Maximum number of messages to keep in memory
141+
- `reducer_function`: Function to reduce messages if context limit is exceeded
142+
143+
### Methods
144+
145+
#### `apply_management(messages: Messages) -> None`
146+
Applies management strategies to the messages list and persists new messages to KurrentDB.
147+
148+
#### `reduce_context(messages: Messages, e: Optional[Exception] = None) -> Optional[Messages]`
149+
Reduces the context window size when it exceeds limits using the configured reducer function.
150+
151+
#### `set_max_window_age(max_age: int) -> None`
152+
Sets the maximum age for messages in the conversation (KurrentDB stream metadata).
153+
154+
#### `set_max_window_size(max_count: int) -> None`
155+
Sets the maximum number of messages to retain in the stream.
156+
157+
#### `save_agent_state(unique_run_id: str, state: dict) -> None`
158+
Saves the current agent state to a checkpoint stream.
159+
160+
#### `restore_agent_state(agent: Agent, unique_run_id: str) -> Agent`
161+
Restores agent state from the checkpoint stream.
162+
163+
## How It Works
164+
165+
### Stream Structure
166+
167+
The manager uses two types of streams in KurrentDB:
168+
169+
1. **Conversation Stream** (`{unique_run_id}`):
170+
- Contains all conversation messages as events
171+
- Event types: "user", "assistant", "system", "StateRestored"
172+
- Messages stored in chronological order
173+
174+
2. **Checkpoint Stream** (`strands_checkpoint-{unique_run_id}`):
175+
- Contains agent state snapshots
176+
- Used for recovery and state restoration
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
from strands.agent.conversation_manager import ConversationManager
2+
from strands.agent import Agent
3+
from strands.types.content import Messages
4+
from typing import Optional
5+
from kurrentdbclient import KurrentDBClient, NewEvent, StreamState
6+
from kurrentdbclient.exceptions import NotFoundError
7+
import json
8+
9+
"""
10+
Example usage:
11+
from strands import Agent
12+
from strands.models.anthropic import AnthropicModel
13+
from kurrentdb_session_manager import KurrentDBConversationManager
14+
15+
unique_run_id = "run-01"
16+
kurrentdb_conversation_manager = (
17+
KurrentDBConversationManager(unique_run_id, "esdb://localhost:2113?Tls=false")
18+
) # replace with your actual connection string
19+
20+
# kurrentdb_conversation_manager.set_max_window_age(60) # Set max window age to 60 seconds
21+
model = AnthropicModel(
22+
client_args={
23+
"api_key": "Your API KEY here", # Replace with your actual API key
24+
},
25+
# **model_config
26+
max_tokens= 4096,
27+
model_id="claude-3-5-haiku-latest",
28+
params={
29+
"temperature": 0.7,
30+
}
31+
)
32+
33+
poet_agent = Agent(
34+
system_prompt="You are a hungry poet who loves to write haikus about everything.",
35+
model=model,
36+
conversation_manager=kurrentdb_conversation_manager, # Assuming no specific conversation manager is needed
37+
)
38+
poet_agent("Write a haiku about the beauty of nature.")
39+
kurrentdb_conversation_manager.save_agent_state(unique_run_id=unique_run_id,
40+
state={"messages": poet_agent.messages,
41+
"system_prompt": poet_agent.system_prompt})
42+
poet_agent("Based on the previous haiku, write another one about the changing seasons.")
43+
poet_agent = kurrentdb_conversation_manager.restore_agent_state(agent=poet_agent,unique_run_id=unique_run_id)
44+
poet_agent("What did we just talk about?")
45+
"""
46+
class KurrentDBConversationManager(ConversationManager):
47+
client: KurrentDBClient
48+
def __init__(self, unique_run_id:str,
49+
connection_string: str = "esdb://localhost:2113?Tls=false",
50+
window_size: int = 40,
51+
reducer_function = lambda x: x) -> None:
52+
"""
53+
Initializes the KurrentDB conversation manager with a connection string.
54+
:param connection_string: The connection string for KurrentDB.
55+
"""
56+
self.client = KurrentDBClient(connection_string)
57+
self.stream_id = unique_run_id
58+
self.checkpoint = -1 # Default checkpoint value, no messages processed yet
59+
self.window_size = window_size # Maximum number of messages to keep in the conversation
60+
self.reducer_function = reducer_function # Function to reduce messages if needed
61+
62+
def apply_management(self, messages: Messages) -> None:
63+
"""Apply management strategies to the messages list."""
64+
justRestored = False
65+
try:
66+
events = self.client.get_stream(
67+
stream_name=self.stream_id,
68+
resolve_links=True,
69+
backwards=True,
70+
limit=1
71+
) # Get the last event in the stream
72+
if len(events) == 1 and events[0].type == "StateRestored":
73+
# then we don't need to remove any message
74+
justRestored = True
75+
self.checkpoint = events[0].stream_position
76+
77+
except NotFoundError as e:
78+
#this means that the stream does not exist yet
79+
if self.checkpoint != -1:
80+
# Handle inconsistency in the outside the conversation manager
81+
raise Exception("Inconsistent state: Stream not found but checkpoint exists.")
82+
if self.checkpoint != -1 and justRestored == False:
83+
# remove already added messages from the messages list
84+
messages = messages[self.checkpoint + 1:] # Keep only new messages
85+
events = []
86+
for message in messages:
87+
metadata = {}
88+
event = NewEvent(type=message["role"], data=bytes(json.dumps(message), 'utf-8'),
89+
content_type='application/json',
90+
metadata=bytes(json.dumps(metadata), 'utf-8'))
91+
events.append(event)
92+
self.client.append_to_stream(
93+
stream_name=self.stream_id,
94+
events=events,
95+
current_version=StreamState.ANY # TODO: tighten this up if needed if agent is called in parallel and order is important(is that possible?)
96+
)
97+
self.checkpoint += len(events) # Update checkpoint after appending messages
98+
99+
100+
def reduce_context(self, messages: Messages, e: Optional[Exception] = None) -> Optional[Messages]:
101+
"""Function to reduce the context window size when it exceeds the model's limit.
102+
"""
103+
return self.reducer_function(messages)
104+
105+
def set_max_window_age(self, max_age: int) -> None:
106+
"""Set the maximum age for messages in the conversation inside KurrentDB."""
107+
self.client.set_stream_metadata(self.stream_id,
108+
metadata={"$maxAge": max_age},
109+
current_version=StreamState.ANY
110+
)
111+
112+
def set_max_window_size(self, max_count: int) -> None:
113+
"""Set the maximum size for the conversation history inside KurrentDB."""
114+
self.client.set_stream_metadata(self.stream_id,
115+
metadata={"$maxCount": max_count},
116+
current_version=StreamState.ANY
117+
)
118+
119+
def save_agent_state(self, unique_run_id: str, state: dict) -> None:
120+
"""
121+
Saves the agent state variables to a checkpoint stream in KurrentDB.
122+
This event contains which position in the stream the agent is at and other state variables.
123+
"""
124+
del state["messages"] # We already keep messages in the stream, so we don't need to save them again.
125+
state["kurrentdb_checkpoint"] = self.checkpoint
126+
state["kurrentdb_checkpoint_stream_id"] = unique_run_id
127+
event = NewEvent(type="agent_state", data=bytes(json.dumps(state), 'utf-8'),
128+
content_type='application/json')
129+
self.client.append_to_stream(
130+
stream_name="strands_checkpoint-" + unique_run_id,
131+
events=[event],
132+
current_version=StreamState.ANY)
133+
134+
135+
def restore_agent_state(self, agent: Agent, unique_run_id: str) -> Agent:
136+
"""
137+
Builds the agent state messages from a stream in KurrentDB.
138+
"""
139+
try:
140+
checkpoint_event = self.client.get_stream(
141+
stream_name="strands_checkpoint-" + unique_run_id,
142+
resolve_links=True,
143+
backwards=True,
144+
limit=1
145+
)
146+
if not checkpoint_event or len(checkpoint_event) == 0:
147+
return None # No state found
148+
149+
state = json.loads(checkpoint_event[0].data.decode('utf-8'))
150+
self.stream_id = state["kurrentdb_checkpoint_stream_id"]
151+
self.checkpoint = state["kurrentdb_checkpoint"]
152+
153+
messages = []
154+
message_events = self.client.get_stream(
155+
stream_name=unique_run_id,
156+
resolve_links=True,
157+
backwards=True,
158+
stream_position=self.checkpoint,
159+
limit=self.window_size
160+
)
161+
for event in message_events:
162+
if event.type == "StateRestored":
163+
break #reached of this state
164+
message = json.loads(event.data.decode('utf-8'))
165+
messages.insert(0,message)
166+
state["messages"] = messages
167+
agent.messages = messages
168+
169+
#append an event to know restore state was called
170+
system_event = NewEvent(
171+
type="StateRestored",
172+
data=bytes("{}", 'utf-8'),
173+
content_type='application/json',
174+
metadata=bytes("{}", 'utf-8')
175+
)
176+
self.client.append_to_stream(
177+
stream_name=unique_run_id,
178+
events=[system_event],
179+
current_version=StreamState.ANY
180+
)
181+
return agent
182+
except NotFoundError as e:
183+
return agent #unchanged agent, no state to restore

0 commit comments

Comments
 (0)