diff --git a/camel/agents/chat_agent.py b/camel/agents/chat_agent.py index 037d19aa45..dc79095723 100644 --- a/camel/agents/chat_agent.py +++ b/camel/agents/chat_agent.py @@ -166,6 +166,8 @@ def __init__(self): self.base_content = "" # Content before tool calls self.current_content = [] # Accumulated streaming fragments self.tool_status_messages = [] # Accumulated tool status messages + self.reasoning_content = [] # Accumulated reasoning content + self.is_reasoning_phase = True # Track if we're in reasoning phase def set_base_content(self, content: str): r"""Set the base content (usually empty or pre-tool content).""" @@ -174,6 +176,13 @@ def set_base_content(self, content: str): def add_streaming_content(self, new_content: str): r"""Add new streaming content.""" self.current_content.append(new_content) + self.is_reasoning_phase = ( + False # Once we get content, we're past reasoning + ) + + def add_reasoning_content(self, new_reasoning: str): + r"""Add new reasoning content.""" + self.reasoning_content.append(new_reasoning) def add_tool_status(self, status_message: str): r"""Add a tool status message.""" @@ -185,6 +194,10 @@ def get_full_content(self) -> str: current = "".join(self.current_content) return self.base_content + tool_messages + current + def get_full_reasoning_content(self) -> str: + r"""Get the complete accumulated reasoning content.""" + return "".join(self.reasoning_content) + def get_content_with_new_status(self, status_message: str) -> str: r"""Get content with a new status message appended.""" tool_messages = "".join([*self.tool_status_messages, status_message]) @@ -194,6 +207,8 @@ def get_content_with_new_status(self, status_message: str) -> str: def reset_streaming_content(self): r"""Reset only the streaming content, keep base and tool status.""" self.current_content = [] + self.reasoning_content = [] + self.is_reasoning_phase = True class StreamingChatAgentResponse: @@ -3357,6 +3372,27 @@ def _process_stream_chunks_with_accumulator( choice = chunk.choices[0] delta = choice.delta + # Handle reasoning content streaming (for DeepSeek reasoner) + if ( + hasattr(delta, 'reasoning_content') + and delta.reasoning_content + ): + content_accumulator.add_reasoning_content( + delta.reasoning_content + ) + # Yield partial response with reasoning content + partial_response = ( + self._create_streaming_response_with_accumulator( + content_accumulator, + "", # No regular content yet + step_token_usage, + getattr(chunk, 'id', ''), + tool_call_records.copy(), + reasoning_delta=delta.reasoning_content, + ) + ) + yield partial_response + # Handle content streaming if delta.content: # Use accumulator for proper content management @@ -4146,6 +4182,27 @@ async def _aprocess_stream_chunks_with_accumulator( choice = chunk.choices[0] delta = choice.delta + # Handle reasoning content streaming (for DeepSeek reasoner) + if ( + hasattr(delta, 'reasoning_content') + and delta.reasoning_content + ): + content_accumulator.add_reasoning_content( + delta.reasoning_content + ) + # Yield partial response with reasoning content + partial_response = ( + self._create_streaming_response_with_accumulator( + content_accumulator, + "", # No regular content yet + step_token_usage, + getattr(chunk, 'id', ''), + tool_call_records.copy(), + reasoning_delta=delta.reasoning_content, + ) + ) + yield partial_response + # Handle content streaming if delta.content: # Use accumulator for proper content management @@ -4339,20 +4396,36 @@ def _create_streaming_response_with_accumulator( step_token_usage: Dict[str, int], response_id: str = "", tool_call_records: Optional[List[ToolCallingRecord]] = None, + reasoning_delta: Optional[str] = None, ) -> ChatAgentResponse: r"""Create a streaming response using content accumulator.""" # Add new content; only build full content when needed - accumulator.add_streaming_content(new_content) + if new_content: + accumulator.add_streaming_content(new_content) + if self.stream_accumulate: message_content = accumulator.get_full_content() else: message_content = new_content + # Build meta_dict with reasoning information + meta_dict: Dict[str, Any] = {} + + # Add reasoning content info + full_reasoning = accumulator.get_full_reasoning_content() + if full_reasoning: + meta_dict["reasoning_content"] = ( + full_reasoning + if self.stream_accumulate + else reasoning_delta or "" + ) + meta_dict["is_reasoning"] = accumulator.is_reasoning_phase + message = BaseMessage( role_name=self.role_name, role_type=self.role_type, - meta_dict={}, + meta_dict=meta_dict, content=message_content, ) diff --git a/camel/configs/deepseek_config.py b/camel/configs/deepseek_config.py index 447bef0347..9375f82e1f 100644 --- a/camel/configs/deepseek_config.py +++ b/camel/configs/deepseek_config.py @@ -96,13 +96,12 @@ class DeepSeekConfig(BaseConfig): tool_choice: Optional[Union[dict[str, str], str]] = None logprobs: Optional[bool] = None top_logprobs: Optional[int] = None + stream_options: Optional[dict[str, bool]] = None def __init__(self, include_usage: bool = True, **kwargs): + if kwargs.get("stream") and "stream_options" not in kwargs: + kwargs["stream_options"] = {"include_usage": include_usage} super().__init__(**kwargs) - # Only set stream_options when stream is True - # Otherwise, it will raise error when calling the API - if self.stream: - self.stream_options = {"include_usage": include_usage} DEEPSEEK_API_PARAMS = {param for param in DeepSeekConfig.model_fields.keys()} diff --git a/camel/models/deepseek_model.py b/camel/models/deepseek_model.py index 882289a904..dc2d345d5a 100644 --- a/camel/models/deepseek_model.py +++ b/camel/models/deepseek_model.py @@ -169,6 +169,8 @@ def _post_handle_response( self, response: ChatCompletion ) -> ChatCompletion: r"""Handle reasoning content with tags at the beginning.""" + if isinstance(response, (Stream, AsyncStream)): + return response if ( self.model_type in [ModelType.DEEPSEEK_REASONER] and os.environ.get("GET_REASONING_CONTENT", "false").lower() diff --git a/examples/agents/chatagent_stream.py b/examples/agents/chatagent_stream.py index a8dbb2ab30..a71e8a5aa8 100644 --- a/examples/agents/chatagent_stream.py +++ b/examples/agents/chatagent_stream.py @@ -15,7 +15,7 @@ from camel.models import ModelFactory from camel.types import ModelPlatformType, ModelType -# Create a streaming model +# Create a streaming-capable model backend streaming_model = ModelFactory.create( model_platform=ModelPlatformType.DEFAULT, model_type=ModelType.GPT_4O_MINI, @@ -32,33 +32,44 @@ ) # Example user message -user_message = "Tell me about the benefits of renewable energy and how " -"it impacts the environment." +user_message = "How many Rs are there in the word 'strawberry'?" -# Get streaming response +# Accumulated streaming mode (default) streaming_response = agent_accumulated.step(user_message) -# Stream the response chunks for chunk_response in streaming_response: - # Each chunk_response is a ChatAgentResponse with incremental content - chunk_content = chunk_response.msgs[0].content - print(chunk_content, end="", flush=True) + message = chunk_response.msgs[0] + meta = message.meta_dict or {} + + reasoning_text = meta.get("reasoning_content") + if reasoning_text: + print(reasoning_text, end="", flush=True) + + content_text = message.content + if content_text: + print(content_text, end="", flush=True) print("\n\n---\nDelta streaming mode (stream_accumulate=False):\n") -# Create an agent that yields delta chunks instead of accumulated content +# Delta streaming mode (only new content per chunk) agent_delta = ChatAgent( system_message="You are a helpful assistant that provides concise " "and informative responses.", model=streaming_model, - stream_accumulate=False, # Only yield the delta part per chunk + stream_accumulate=False, ) -# Get streaming response (delta chunks) streaming_response_delta = agent_delta.step(user_message) -# Stream only the delta content per chunk; printing reconstructs the full text for chunk_response in streaming_response_delta: - delta_content = chunk_response.msgs[0].content - print(delta_content, end="", flush=True) + message = chunk_response.msgs[0] + meta = message.meta_dict or {} + + reasoning_delta = meta.get("reasoning_content") or "" + if reasoning_delta: + print(reasoning_delta, end="", flush=True) + + if message.content: + print(message.content, end="", flush=True) + print()