Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 75 additions & 2 deletions camel/agents/chat_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ def __init__(self):
self.base_content = "" # Content before tool calls
self.current_content = [] # Accumulated streaming fragments
self.tool_status_messages = [] # Accumulated tool status messages
self.reasoning_content = [] # Accumulated reasoning content
self.is_reasoning_phase = True # Track if we're in reasoning phase

def set_base_content(self, content: str):
r"""Set the base content (usually empty or pre-tool content)."""
Expand All @@ -174,6 +176,13 @@ def set_base_content(self, content: str):
def add_streaming_content(self, new_content: str):
r"""Add new streaming content."""
self.current_content.append(new_content)
self.is_reasoning_phase = (
False # Once we get content, we're past reasoning
)

def add_reasoning_content(self, new_reasoning: str):
r"""Add new reasoning content."""
self.reasoning_content.append(new_reasoning)

def add_tool_status(self, status_message: str):
r"""Add a tool status message."""
Expand All @@ -185,6 +194,10 @@ def get_full_content(self) -> str:
current = "".join(self.current_content)
return self.base_content + tool_messages + current

def get_full_reasoning_content(self) -> str:
r"""Get the complete accumulated reasoning content."""
return "".join(self.reasoning_content)

def get_content_with_new_status(self, status_message: str) -> str:
r"""Get content with a new status message appended."""
tool_messages = "".join([*self.tool_status_messages, status_message])
Expand All @@ -194,6 +207,8 @@ def get_content_with_new_status(self, status_message: str) -> str:
def reset_streaming_content(self):
r"""Reset only the streaming content, keep base and tool status."""
self.current_content = []
self.reasoning_content = []
self.is_reasoning_phase = True


class StreamingChatAgentResponse:
Expand Down Expand Up @@ -3357,6 +3372,27 @@ def _process_stream_chunks_with_accumulator(
choice = chunk.choices[0]
delta = choice.delta

# Handle reasoning content streaming (for DeepSeek reasoner)
if (
hasattr(delta, 'reasoning_content')
and delta.reasoning_content
):
content_accumulator.add_reasoning_content(
delta.reasoning_content
)
# Yield partial response with reasoning content
partial_response = (
self._create_streaming_response_with_accumulator(
content_accumulator,
"", # No regular content yet
step_token_usage,
getattr(chunk, 'id', ''),
tool_call_records.copy(),
reasoning_delta=delta.reasoning_content,
)
)
yield partial_response
Comment on lines +3375 to +3394
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this part of code added to ChatAgent? it's not a good design to add model specific logic in ChatAgent

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic applies to all reasoning models,The comment about deepseek was not deleted before, has been deleted now.


# Handle content streaming
if delta.content:
# Use accumulator for proper content management
Expand Down Expand Up @@ -4146,6 +4182,27 @@ async def _aprocess_stream_chunks_with_accumulator(
choice = chunk.choices[0]
delta = choice.delta

# Handle reasoning content streaming (for DeepSeek reasoner)
if (
hasattr(delta, 'reasoning_content')
and delta.reasoning_content
):
content_accumulator.add_reasoning_content(
delta.reasoning_content
)
# Yield partial response with reasoning content
partial_response = (
self._create_streaming_response_with_accumulator(
content_accumulator,
"", # No regular content yet
step_token_usage,
getattr(chunk, 'id', ''),
tool_call_records.copy(),
reasoning_delta=delta.reasoning_content,
)
)
yield partial_response

# Handle content streaming
if delta.content:
# Use accumulator for proper content management
Expand Down Expand Up @@ -4339,20 +4396,36 @@ def _create_streaming_response_with_accumulator(
step_token_usage: Dict[str, int],
response_id: str = "",
tool_call_records: Optional[List[ToolCallingRecord]] = None,
reasoning_delta: Optional[str] = None,
) -> ChatAgentResponse:
r"""Create a streaming response using content accumulator."""

# Add new content; only build full content when needed
accumulator.add_streaming_content(new_content)
if new_content:
accumulator.add_streaming_content(new_content)

if self.stream_accumulate:
message_content = accumulator.get_full_content()
else:
message_content = new_content

# Build meta_dict with reasoning information
meta_dict: Dict[str, Any] = {}

# Add reasoning content info
full_reasoning = accumulator.get_full_reasoning_content()
if full_reasoning:
meta_dict["reasoning_content"] = (
full_reasoning
if self.stream_accumulate
else reasoning_delta or ""
)
meta_dict["is_reasoning"] = accumulator.is_reasoning_phase

message = BaseMessage(
role_name=self.role_name,
role_type=self.role_type,
meta_dict={},
meta_dict=meta_dict,
content=message_content,
)

Expand Down
7 changes: 3 additions & 4 deletions camel/configs/deepseek_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,12 @@ class DeepSeekConfig(BaseConfig):
tool_choice: Optional[Union[dict[str, str], str]] = None
logprobs: Optional[bool] = None
top_logprobs: Optional[int] = None
stream_options: Optional[dict[str, bool]] = None

def __init__(self, include_usage: bool = True, **kwargs):
if kwargs.get("stream") and "stream_options" not in kwargs:
kwargs["stream_options"] = {"include_usage": include_usage}
super().__init__(**kwargs)
# Only set stream_options when stream is True
# Otherwise, it will raise error when calling the API
if self.stream:
self.stream_options = {"include_usage": include_usage}


DEEPSEEK_API_PARAMS = {param for param in DeepSeekConfig.model_fields.keys()}
2 changes: 2 additions & 0 deletions camel/models/deepseek_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ def _post_handle_response(
self, response: ChatCompletion
) -> ChatCompletion:
r"""Handle reasoning content with <think> tags at the beginning."""
if isinstance(response, (Stream, AsyncStream)):
return response
if (
self.model_type in [ModelType.DEEPSEEK_REASONER]
and os.environ.get("GET_REASONING_CONTENT", "false").lower()
Expand Down
39 changes: 25 additions & 14 deletions examples/agents/chatagent_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from camel.models import ModelFactory
from camel.types import ModelPlatformType, ModelType

# Create a streaming model
# Create a streaming-capable model backend
streaming_model = ModelFactory.create(
model_platform=ModelPlatformType.DEFAULT,
model_type=ModelType.GPT_4O_MINI,
Expand All @@ -32,33 +32,44 @@
)

# Example user message
user_message = "Tell me about the benefits of renewable energy and how "
"it impacts the environment."
user_message = "How many Rs are there in the word 'strawberry'?"

# Get streaming response
# Accumulated streaming mode (default)
streaming_response = agent_accumulated.step(user_message)

# Stream the response chunks
for chunk_response in streaming_response:
# Each chunk_response is a ChatAgentResponse with incremental content
chunk_content = chunk_response.msgs[0].content
print(chunk_content, end="", flush=True)
message = chunk_response.msgs[0]
meta = message.meta_dict or {}

reasoning_text = meta.get("reasoning_content")
if reasoning_text:
print(reasoning_text, end="", flush=True)

content_text = message.content
if content_text:
print(content_text, end="", flush=True)

print("\n\n---\nDelta streaming mode (stream_accumulate=False):\n")

# Create an agent that yields delta chunks instead of accumulated content
# Delta streaming mode (only new content per chunk)
agent_delta = ChatAgent(
system_message="You are a helpful assistant that provides concise "
"and informative responses.",
model=streaming_model,
stream_accumulate=False, # Only yield the delta part per chunk
stream_accumulate=False,
)

# Get streaming response (delta chunks)
streaming_response_delta = agent_delta.step(user_message)

# Stream only the delta content per chunk; printing reconstructs the full text
for chunk_response in streaming_response_delta:
delta_content = chunk_response.msgs[0].content
print(delta_content, end="", flush=True)
message = chunk_response.msgs[0]
meta = message.meta_dict or {}

reasoning_delta = meta.get("reasoning_content") or ""
if reasoning_delta:
print(reasoning_delta, end="", flush=True)

if message.content:
print(message.content, end="", flush=True)

print()
Loading