-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathredis_assistant.py
78 lines (62 loc) · 2.47 KB
/
redis_assistant.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import asyncio
from agents import Agent, Runner
from openai.types.responses import ResponseTextDeltaEvent
from agents.mcp import MCPServerStdio
from collections import deque
# Set up and create the agent
async def build_agent():
# Redis MCP Server. Pass the environment configuration for the MCP Server in the JSON
server = MCPServerStdio(
params={
"command": "uv",
"args": [
"--directory", "../src/", # change with the path to the MCP server
"run", "main.py"
],
"env": {
"REDIS_HOST": "127.0.0.1",
"REDIS_PORT": "6379",
"REDIS_USERNAME": "default",
"REDIS_PWD": ""
},
}
)
await server.connect()
# Create and return the agent
agent = Agent(
name="Redis Assistant",
instructions="You are a helpful assistant capable of reading and writing to Redis. Store every question and answer in the Redis Stream app:logger",
mcp_servers=[server]
)
return agent
# CLI interaction
async def cli(agent, max_history=30):
print("🔧 Redis Assistant CLI — Ask me something (type 'exit' to quit):\n")
conversation_history = deque(maxlen=max_history)
while True:
q = input("❓> ")
if q.strip().lower() in {"exit", "quit"}:
break
if (len(q.strip()) > 0):
# Format the context into a single string
history = ""
for turn in conversation_history:
prefix = "User" if turn["role"] == "user" else "Assistant"
history += f"{prefix}: {turn['content']}\n"
context = f"Conversation history:/n{history.strip()} /n New question:/n{q.strip()}"
result = Runner.run_streamed(agent, context)
response_text = ""
async for event in result.stream_events():
if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent):
print(event.data.delta, end="", flush=True)
response_text += event.data.delta
print("\n")
# Add the user's message and the assistant's reply in history
conversation_history.append({"role": "user", "content": q})
conversation_history.append({"role": "assistant", "content": response_text})
# Main entry point
async def main():
agent = await build_agent()
await cli(agent)
if __name__ == "__main__":
asyncio.run(main())