-
Notifications
You must be signed in to change notification settings - Fork 3.7k
/
Copy pathautogen_conversable_agent.py
316 lines (263 loc) · 12.4 KB
/
autogen_conversable_agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
# Copyright (c) Microsoft. All rights reserved.
import logging
import sys
import uuid
from collections.abc import AsyncIterable, Callable
from typing import TYPE_CHECKING, Any
from semantic_kernel.contents.chat_history import ChatHistory
from semantic_kernel.contents.history_reducer.chat_history_reducer import ChatHistoryReducer
from semantic_kernel.utils.feature_stage_decorator import experimental
if sys.version_info >= (3, 12):
from typing import override # pragma: no cover
else:
from typing_extensions import override # pragma: no cover
from autogen import ConversableAgent
from semantic_kernel.agents.agent import Agent, AgentResponseItem, AgentThread
from semantic_kernel.contents.chat_message_content import ChatMessageContent
from semantic_kernel.contents.function_call_content import FunctionCallContent
from semantic_kernel.contents.function_result_content import FunctionResultContent
from semantic_kernel.contents.text_content import TextContent
from semantic_kernel.contents.utils.author_role import AuthorRole
from semantic_kernel.exceptions.agent_exceptions import AgentInvokeException, AgentThreadOperationException
from semantic_kernel.functions.kernel_arguments import KernelArguments
from semantic_kernel.utils.telemetry.agent_diagnostics.decorators import (
trace_agent_get_response,
trace_agent_invocation,
)
if TYPE_CHECKING:
from autogen.cache import AbstractCache
from semantic_kernel.contents.streaming_chat_message_content import StreamingChatMessageContent
from semantic_kernel.kernel import Kernel
logger: logging.Logger = logging.getLogger(__name__)
@experimental
class AutoGenConversableAgentThread(AgentThread):
"""Azure AI Agent Thread class."""
def __init__(self, chat_history: ChatHistory | None = None, thread_id: str | None = None) -> None:
"""Initialize the AutoGenConversableAgentThread Thread.
Args:
chat_history: The chat history for the thread. If None, a new ChatHistory instance will be created.
thread_id: The ID of the thread. If None, a new thread will be created.
"""
super().__init__()
self._chat_history = chat_history or ChatHistory()
self._id = thread_id
@override
async def _create(self) -> str:
"""Starts the thread and returns its ID."""
if not self._id:
self._id = f"thread_{uuid.uuid4().hex}"
return self._id
@override
async def _delete(self) -> None:
"""Ends the current thread."""
self._chat_history.clear()
@override
async def _on_new_message(self, new_message: str | ChatMessageContent) -> None:
"""Called when a new message has been contributed to the chat."""
if isinstance(new_message, str):
new_message = ChatMessageContent(role=AuthorRole.USER, content=new_message)
if (
not new_message.metadata
or "thread_id" not in new_message.metadata
or new_message.metadata["thread_id"] != self._id
):
self._chat_history.add_message(new_message)
async def get_messages(self) -> ChatHistory:
"""Retrieve the current chat history."""
if self._is_deleted:
raise AgentThreadOperationException("Cannot retrieve chat history, since the thread has been deleted.")
if self._id is None:
await self.create()
return self._chat_history
async def reduce(self) -> ChatHistory | None:
"""Reduce the chat history to a smaller size."""
if self._id is None:
raise AgentThreadOperationException("Cannot reduce chat history, since the thread is not currently active.")
if not isinstance(self._chat_history, ChatHistoryReducer):
return None
return await self._chat_history.reduce()
@experimental
class AutoGenConversableAgent(Agent):
"""A Semantic Kernel wrapper around an AutoGen 0.2 `ConversableAgent`.
This allows one to use it as a Semantic Kernel `Agent`. Note: this agent abstraction
does not currently allow for the use of AgentGroupChat within Semantic Kernel.
"""
conversable_agent: ConversableAgent
def __init__(self, conversable_agent: ConversableAgent, **kwargs: Any) -> None:
"""Initialize the AutoGenConversableAgent.
Args:
conversable_agent: The existing AutoGen 0.2 ConversableAgent instance
kwargs: Other Agent base class arguments (e.g. name, id, instructions)
"""
args: dict[str, Any] = {
"name": conversable_agent.name,
"description": conversable_agent.description,
"instructions": conversable_agent.system_message,
"conversable_agent": conversable_agent,
}
if kwargs:
args.update(kwargs)
super().__init__(**args)
@trace_agent_get_response
@override
async def get_response(
self,
messages: str | ChatMessageContent | list[str | ChatMessageContent],
thread: AgentThread | None = None,
**kwargs: Any,
) -> AgentResponseItem[ChatMessageContent]:
"""Get a response from the agent.
Args:
messages: The input chat message content either as a string, ChatMessageContent or
a list of strings or ChatMessageContent.
thread: The thread to use for the conversation. If None, a new thread will be created.
kwargs: Additional keyword arguments
Returns:
An AgentResponseItem of type ChatMessageContent object with the response and the thread.
"""
thread = await self._ensure_thread_exists_with_messages(
messages=messages,
thread=thread,
construct_thread=lambda: AutoGenConversableAgentThread(),
expected_type=AutoGenConversableAgentThread,
)
assert thread.id is not None # nosec
chat_history = await thread.get_messages()
reply = await self.conversable_agent.a_generate_reply(
messages=[message.to_dict() for message in chat_history.messages],
**kwargs,
)
logger.info("Called AutoGenConversableAgent.a_generate_reply.")
return await self._create_reply_content(reply, thread)
@trace_agent_invocation
@override
async def invoke(
self,
*,
messages: str | ChatMessageContent | list[str | ChatMessageContent],
thread: AgentThread | None = None,
recipient: "AutoGenConversableAgent | None" = None,
clear_history: bool = True,
silent: bool = True,
cache: "AbstractCache | None" = None,
max_turns: int | None = None,
summary_method: str | Callable | None = ConversableAgent.DEFAULT_SUMMARY_METHOD,
summary_args: dict | None = {},
**kwargs: Any,
) -> AsyncIterable[AgentResponseItem[ChatMessageContent]]:
"""A direct `invoke` method for the ConversableAgent.
Args:
messages: The input chat message content either as a string, ChatMessageContent or
a list of strings or ChatMessageContent.
thread: The thread to use for the conversation. If None, a new thread will be created.
recipient: The recipient ConversableAgent to chat with
clear_history: Whether to clear the chat history before starting. True by default.
silent: Whether to suppress console output. True by default.
cache: The cache to use for storing chat history
max_turns: The maximum number of turns to chat for
summary_method: The method to use for summarizing the chat
summary_args: The arguments to pass to the summary method
message: The initial message to send. If message is not provided,
the agent will wait for the user to provide the first message.
kwargs: Additional keyword arguments
Yields:
An AgentResponseItem of type ChatMessageContent object with the response and the thread.
"""
thread = await self._ensure_thread_exists_with_messages(
messages=messages,
thread=thread,
construct_thread=lambda: AutoGenConversableAgentThread(),
expected_type=AutoGenConversableAgentThread,
)
assert thread.id is not None # nosec
chat_history = await thread.get_messages()
if recipient is not None:
if not isinstance(recipient, AutoGenConversableAgent):
raise AgentInvokeException(
f"Invalid recipient type: {type(recipient)}. "
"Recipient must be an instance of AutoGenConversableAgent."
)
chat_result = await self.conversable_agent.a_initiate_chat(
recipient=recipient.conversable_agent,
clear_history=clear_history,
silent=silent,
cache=cache,
max_turns=max_turns,
summary_method=summary_method,
summary_args=summary_args,
message=chat_history.messages[-1].content, # type: ignore
**kwargs,
)
logger.info(f"Called AutoGenConversableAgent.a_initiate_chat with recipient: {recipient}.")
for message in chat_result.chat_history:
msg = AutoGenConversableAgent._to_chat_message_content(message) # type: ignore
await thread.on_new_message(msg)
yield AgentResponseItem(
message=msg,
thread=thread,
)
else:
reply = await self.conversable_agent.a_generate_reply(
messages=[message.to_dict() for message in chat_history.messages],
)
logger.info("Called AutoGenConversableAgent.a_generate_reply.")
yield await self._create_reply_content(reply, thread)
@override
def invoke_stream(
self,
*,
messages: str | ChatMessageContent | list[str | ChatMessageContent],
thread: AgentThread | None = None,
kernel: "Kernel | None" = None,
arguments: KernelArguments | None = None,
**kwargs: Any,
) -> AsyncIterable[AgentResponseItem["StreamingChatMessageContent"]]:
"""Invoke the agent with a stream of messages."""
raise NotImplementedError("The AutoGenConversableAgent does not support streaming.")
@staticmethod
def _to_chat_message_content(message: dict[str, Any]) -> ChatMessageContent:
"""Translate an AutoGen message to a Semantic Kernel ChatMessageContent."""
items: list[TextContent | FunctionCallContent | FunctionResultContent] = []
role = AuthorRole(message.get("role"))
name: str = message.get("name", "")
content = message.get("content")
if content is not None:
text = TextContent(text=content)
items.append(text)
if role == AuthorRole.ASSISTANT:
tool_calls = message.get("tool_calls")
if tool_calls is not None:
for tool_call in tool_calls:
items.append(
FunctionCallContent(
id=tool_call.get("id"),
function_name=tool_call.get("name"),
arguments=tool_call.get("function").get("arguments"),
)
)
if role == AuthorRole.TOOL:
tool_responses = message.get("tool_responses")
if tool_responses is not None:
for tool_response in tool_responses:
items.append(
FunctionResultContent(
id=tool_response.get("tool_call_id"),
result=tool_response.get("content"),
)
)
return ChatMessageContent(role=role, items=items, name=name) # type: ignore
async def _create_reply_content(
self, reply: str | dict[str, Any], thread: AgentThread
) -> AgentResponseItem[ChatMessageContent]:
response: ChatMessageContent
if isinstance(reply, str):
response = ChatMessageContent(content=reply, role=AuthorRole.ASSISTANT)
elif isinstance(reply, dict):
response = ChatMessageContent(**reply)
else:
raise AgentInvokeException(f"Unexpected reply type from `a_generate_reply`: {type(reply)}")
await thread.on_new_message(response)
return AgentResponseItem(
message=response,
thread=thread,
)