Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
473623b
support rae cot 1
dsfaccini Nov 25, 2025
ff4d45b
include live gpt-oss streaming test and remove computer-use model
dsfaccini Nov 25, 2025
9c8007f
simplify filter
dsfaccini Nov 25, 2025
88de1ed
note about test flakiness
dsfaccini Nov 25, 2025
364b711
re-add computer use names
dsfaccini Nov 25, 2025
4afd4b7
handle raw cot in parts manager
dsfaccini Nov 27, 2025
50bc7fa
Merge branch 'main' into lm-studio-openai-responses-with-gpt-oss
dsfaccini Nov 27, 2025
ddd7df4
Merge branch 'main' into lm-studio-openai-responses-with-gpt-oss
dsfaccini Nov 27, 2025
65ae9a5
refactor parts manager
dsfaccini Nov 27, 2025
d0c7d77
add defensive handling of potential summary after rawCoT
dsfaccini Nov 28, 2025
8d52d65
Clarify usage of agent factories
dsfaccini Nov 28, 2025
99812f8
migrate to callback
dsfaccini Nov 29, 2025
0a245b6
dont emit empty events
dsfaccini Nov 30, 2025
3128b4a
Merge branch 'pydantic:main' into main
dsfaccini Nov 30, 2025
dc7aa6a
Merge branch 'main' into lm-studio-openai-responses-with-gpt-oss
dsfaccini Dec 1, 2025
3b6013f
complex testcase
dsfaccini Dec 1, 2025
f87896a
improvde dostring
dsfaccini Dec 1, 2025
2c3d767
narrow docstring
dsfaccini Dec 1, 2025
e69a7c2
Clarify agent instantiation options in documentation
dsfaccini Dec 2, 2025
bc2e31e
address review points
dsfaccini Dec 4, 2025
d4a6c8b
Merge remote-tracking branch 'origin/main' into lm-studio-openai-resp…
dsfaccini Dec 4, 2025
d5f6503
Merge upstream/main into lm-studio-openai-responses-with-gpt-oss
dsfaccini Dec 4, 2025
c7d43bd
Merge branch 'main' into lm-studio-openai-responses-with-gpt-oss
dsfaccini Dec 4, 2025
85636a8
chain callables or dict mergings
dsfaccini Dec 4, 2025
53579d0
Merge branch 'main' into lm-studio-openai-responses-with-gpt-oss
dsfaccini Dec 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/thinking.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ agent = Agent(model, model_settings=settings)
...
```

!!! note "Raw reasoning without summaries"
Some OpenAI-compatible APIs (such as LM Studio, vLLM, or OpenRouter with gpt-oss models) may return raw reasoning content without reasoning summaries. In this case, [`ThinkingPart.content`][pydantic_ai.messages.ThinkingPart.content] will be empty, but the raw reasoning is available in `provider_details['raw_content']`.

## Anthropic

To enable thinking, use the [`AnthropicModelSettings.anthropic_thinking`][pydantic_ai.models.anthropic.AnthropicModelSettings.anthropic_thinking] [model setting](agents.md#model-run-settings).
Expand Down
87 changes: 65 additions & 22 deletions pydantic_ai_slim/pydantic_ai/_parts_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from __future__ import annotations as _annotations

from collections.abc import Hashable
from collections.abc import Hashable, Iterator
from dataclasses import dataclass, field, replace
from typing import Any

Expand Down Expand Up @@ -76,7 +76,7 @@ def handle_text_delta(
provider_details: dict[str, Any] | None = None,
thinking_tags: tuple[str, str] | None = None,
ignore_leading_whitespace: bool = False,
) -> ModelResponseStreamEvent | None:
) -> Iterator[ModelResponseStreamEvent]:
"""Handle incoming text content, creating or updating a TextPart in the manager as appropriate.

When `vendor_part_id` is None, the latest part is updated if it exists and is a TextPart;
Expand All @@ -93,10 +93,9 @@ def handle_text_delta(
thinking_tags: If provided, will handle content between the thinking tags as thinking parts.
ignore_leading_whitespace: If True, will ignore leading whitespace in the content.

Returns:
- A `PartStartEvent` if a new part was created.
- A `PartDeltaEvent` if an existing part was updated.
- `None` if no new event is emitted (e.g., the first text part was all whitespace).
Yields:
A `PartStartEvent` if a new part was created, or a `PartDeltaEvent` if an existing part was updated.
Yields nothing if no event should be emitted (e.g., the first text part was all whitespace).

Raises:
UnexpectedModelBehavior: If attempting to apply text content to a part that is not a TextPart.
Expand All @@ -121,11 +120,12 @@ def handle_text_delta(
if content == thinking_tags[1]:
# When we see the thinking end tag, we're done with the thinking part and the next text delta will need a new part
self._vendor_id_to_part_index.pop(vendor_part_id)
return None
return
else:
return self.handle_thinking_delta(
yield from self.handle_thinking_delta(
vendor_part_id=vendor_part_id, content=content, provider_details=provider_details
)
return
elif isinstance(existing_part, TextPart):
existing_text_part_and_index = existing_part, part_index
else:
Expand All @@ -134,29 +134,54 @@ def handle_text_delta(
if thinking_tags and content == thinking_tags[0]:
# When we see a thinking start tag (which is a single token), we'll build a new thinking part instead
self._vendor_id_to_part_index.pop(vendor_part_id, None)
return self.handle_thinking_delta(
yield from self.handle_thinking_delta(
vendor_part_id=vendor_part_id, content='', provider_details=provider_details
)
return

if existing_text_part_and_index is None:
# This is a workaround for models that emit `<think>\n</think>\n\n` or an empty text part ahead of tool calls (e.g. Ollama + Qwen3),
# which we don't want to end up treating as a final result when using `run_stream` with `str` a valid `output_type`.
if ignore_leading_whitespace and (len(content) == 0 or content.isspace()):
return None
return

# There is no existing text part that should be updated, so create a new one
new_part_index = len(self._parts)
part = TextPart(content=content, id=id, provider_details=provider_details)
if vendor_part_id is not None:
self._vendor_id_to_part_index[vendor_part_id] = new_part_index
self._parts.append(part)
return PartStartEvent(index=new_part_index, part=part)
yield PartStartEvent(index=new_part_index, part=part)
else:
# Update the existing TextPart with the new content delta
existing_text_part, part_index = existing_text_part_and_index
part_delta = TextPartDelta(content_delta=content, provider_details=provider_details)
self._parts[part_index] = part_delta.apply(existing_text_part)
return PartDeltaEvent(index=part_index, delta=part_delta)
yield PartDeltaEvent(index=part_index, delta=part_delta)

def _update_raw_content(
self,
part: ThinkingPart,
part_index: int,
raw_content_delta: str,
raw_content_index: int,
) -> ThinkingPart:
"""Update raw_content in provider_details and return updated part."""
existing_details = dict(part.provider_details or {})
raw_content_list = list(existing_details.get('raw_content', []))
while len(raw_content_list) <= raw_content_index:
raw_content_list.append('')
raw_content_list[raw_content_index] += raw_content_delta
existing_details['raw_content'] = raw_content_list
updated = ThinkingPart(
content=part.content,
id=part.id,
signature=part.signature,
provider_name=part.provider_name,
provider_details=existing_details,
)
self._parts[part_index] = updated
return updated

def handle_thinking_delta(
self,
Expand All @@ -167,7 +192,9 @@ def handle_thinking_delta(
signature: str | None = None,
provider_name: str | None = None,
provider_details: dict[str, Any] | None = None,
) -> ModelResponseStreamEvent:
raw_content_delta: str | None = None,
raw_content_index: int = 0,
) -> Iterator[ModelResponseStreamEvent]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got confused writing the callback so I ended up with this instead. lmk what you think

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to call about the callback :)

"""Handle incoming thinking content, creating or updating a ThinkingPart in the manager as appropriate.

When `vendor_part_id` is None, the latest part is updated if it exists and is a ThinkingPart;
Expand All @@ -183,9 +210,13 @@ def handle_thinking_delta(
signature: An optional signature for the thinking content.
provider_name: An optional provider name for the thinking part.
provider_details: An optional dictionary of provider-specific details for the thinking part.
raw_content_delta: Raw chain-of-thought content delta (stored in provider_details['raw_content'],
not shown to users).
raw_content_index: Index into the raw_content list for the current delta (default 0).

Returns:
Yields:
A `PartStartEvent` if a new part was created, or a `PartDeltaEvent` if an existing part was updated.
Yields nothing if only raw_content was updated (raw content updates don't emit visible events).

Raises:
UnexpectedModelBehavior: If attempting to apply a thinking delta to a part that is not a ThinkingPart.
Expand All @@ -209,35 +240,47 @@ def handle_thinking_delta(
existing_thinking_part_and_index = existing_part, part_index

if existing_thinking_part_and_index is None:
if content is not None or signature is not None:
if content is not None or signature is not None or raw_content_delta is not None:
# There is no existing thinking part that should be updated, so create a new one
new_part_index = len(self._parts)
new_provider_details = dict(provider_details) if provider_details else {}
if raw_content_delta is not None:
raw_content_list: list[str] = [''] * (raw_content_index + 1)
raw_content_list[raw_content_index] = raw_content_delta
new_provider_details['raw_content'] = raw_content_list
part = ThinkingPart(
content=content or '',
id=id,
signature=signature,
provider_name=provider_name,
provider_details=provider_details,
provider_details=new_provider_details or None,
)
if vendor_part_id is not None: # pragma: no branch
self._vendor_id_to_part_index[vendor_part_id] = new_part_index
self._parts.append(part)
return PartStartEvent(index=new_part_index, part=part)
yield PartStartEvent(index=new_part_index, part=part)
else:
raise UnexpectedModelBehavior('Cannot create a ThinkingPart with no content or signature')
raise UnexpectedModelBehavior('Cannot create a ThinkingPart with no content, signature, or raw_content')
else:
existing_thinking_part, part_index = existing_thinking_part_and_index

if raw_content_delta is not None:
existing_thinking_part = self._update_raw_content(
existing_thinking_part, part_index, raw_content_delta, raw_content_index
)
if content is None and signature is None:
return

if content is not None or signature is not None:
# Update the existing ThinkingPart with the new content and/or signature delta
existing_thinking_part, part_index = existing_thinking_part_and_index
part_delta = ThinkingPartDelta(
content_delta=content,
signature_delta=signature,
provider_name=provider_name,
provider_details=provider_details,
)
self._parts[part_index] = part_delta.apply(existing_thinking_part)
return PartDeltaEvent(index=part_index, delta=part_delta)
else:
yield PartDeltaEvent(index=part_index, delta=part_delta)
elif raw_content_delta is None: # pragma: no branch
raise UnexpectedModelBehavior('Cannot update a ThinkingPart with no content or signature')

def handle_tool_call_delta(
Expand Down
34 changes: 18 additions & 16 deletions pydantic_ai_slim/pydantic_ai/models/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -1106,25 +1106,26 @@ async def _get_event_iterator(self) -> AsyncIterator[ModelResponseStreamEvent]:
elif isinstance(event, BetaRawContentBlockStartEvent):
current_block = event.content_block
if isinstance(current_block, BetaTextBlock) and current_block.text:
maybe_event = self._parts_manager.handle_text_delta(
for event_ in self._parts_manager.handle_text_delta(
vendor_part_id=event.index, content=current_block.text
)
if maybe_event is not None: # pragma: no branch
yield maybe_event
):
yield event_
elif isinstance(current_block, BetaThinkingBlock):
yield self._parts_manager.handle_thinking_delta(
for event_ in self._parts_manager.handle_thinking_delta(
vendor_part_id=event.index,
content=current_block.thinking,
signature=current_block.signature,
provider_name=self.provider_name,
)
):
yield event_
elif isinstance(current_block, BetaRedactedThinkingBlock):
yield self._parts_manager.handle_thinking_delta(
for event_ in self._parts_manager.handle_thinking_delta(
vendor_part_id=event.index,
id='redacted_thinking',
signature=current_block.data,
provider_name=self.provider_name,
)
):
yield event_
elif isinstance(current_block, BetaToolUseBlock):
maybe_event = self._parts_manager.handle_tool_call_delta(
vendor_part_id=event.index,
Expand Down Expand Up @@ -1185,23 +1186,24 @@ async def _get_event_iterator(self) -> AsyncIterator[ModelResponseStreamEvent]:

elif isinstance(event, BetaRawContentBlockDeltaEvent):
if isinstance(event.delta, BetaTextDelta):
maybe_event = self._parts_manager.handle_text_delta(
for event_ in self._parts_manager.handle_text_delta(
vendor_part_id=event.index, content=event.delta.text
)
if maybe_event is not None: # pragma: no branch
yield maybe_event
):
yield event_
elif isinstance(event.delta, BetaThinkingDelta):
yield self._parts_manager.handle_thinking_delta(
for event_ in self._parts_manager.handle_thinking_delta(
vendor_part_id=event.index,
content=event.delta.thinking,
provider_name=self.provider_name,
)
):
yield event_
elif isinstance(event.delta, BetaSignatureDelta):
yield self._parts_manager.handle_thinking_delta(
for event_ in self._parts_manager.handle_thinking_delta(
vendor_part_id=event.index,
signature=event.delta.signature,
provider_name=self.provider_name,
)
):
yield event_
elif isinstance(event.delta, BetaInputJSONDelta):
maybe_event = self._parts_manager.handle_tool_call_delta(
vendor_part_id=event.index,
Expand Down
15 changes: 8 additions & 7 deletions pydantic_ai_slim/pydantic_ai/models/bedrock.py
Original file line number Diff line number Diff line change
Expand Up @@ -751,24 +751,25 @@ async def _get_event_iterator(self) -> AsyncIterator[ModelResponseStreamEvent]:
delta = content_block_delta['delta']
if 'reasoningContent' in delta:
if redacted_content := delta['reasoningContent'].get('redactedContent'):
yield self._parts_manager.handle_thinking_delta(
for event in self._parts_manager.handle_thinking_delta(
vendor_part_id=index,
id='redacted_content',
signature=redacted_content.decode('utf-8'),
provider_name=self.provider_name,
)
):
yield event
else:
signature = delta['reasoningContent'].get('signature')
yield self._parts_manager.handle_thinking_delta(
for event in self._parts_manager.handle_thinking_delta(
vendor_part_id=index,
content=delta['reasoningContent'].get('text'),
signature=signature,
provider_name=self.provider_name if signature else None,
)
):
yield event
if text := delta.get('text'):
maybe_event = self._parts_manager.handle_text_delta(vendor_part_id=index, content=text)
if maybe_event is not None: # pragma: no branch
yield maybe_event
for event in self._parts_manager.handle_text_delta(vendor_part_id=index, content=text):
yield event
if 'toolUse' in delta:
tool_use = delta['toolUse']
maybe_event = self._parts_manager.handle_tool_call_delta(
Expand Down
12 changes: 6 additions & 6 deletions pydantic_ai_slim/pydantic_ai/models/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,26 +292,26 @@ class FunctionStreamedResponse(StreamedResponse):
def __post_init__(self):
self._usage += _estimate_usage([])

async def _get_event_iterator(self) -> AsyncIterator[ModelResponseStreamEvent]:
async def _get_event_iterator(self) -> AsyncIterator[ModelResponseStreamEvent]: # noqa: C901
async for item in self._iter:
if isinstance(item, str):
response_tokens = _estimate_string_tokens(item)
self._usage += usage.RequestUsage(output_tokens=response_tokens)
maybe_event = self._parts_manager.handle_text_delta(vendor_part_id='content', content=item)
if maybe_event is not None: # pragma: no branch
yield maybe_event
for event in self._parts_manager.handle_text_delta(vendor_part_id='content', content=item):
yield event
elif isinstance(item, dict) and item:
for dtc_index, delta in item.items():
if isinstance(delta, DeltaThinkingPart):
if delta.content: # pragma: no branch
response_tokens = _estimate_string_tokens(delta.content)
self._usage += usage.RequestUsage(output_tokens=response_tokens)
yield self._parts_manager.handle_thinking_delta(
for event in self._parts_manager.handle_thinking_delta(
vendor_part_id=dtc_index,
content=delta.content,
signature=delta.signature,
provider_name='function' if delta.signature else None,
)
):
yield event
elif isinstance(delta, DeltaToolCall):
if delta.json_args:
response_tokens = _estimate_string_tokens(delta.json_args)
Expand Down
7 changes: 3 additions & 4 deletions pydantic_ai_slim/pydantic_ai/models/gemini.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,11 +465,10 @@ async def _get_event_iterator(self) -> AsyncIterator[ModelResponseStreamEvent]:
if 'text' in gemini_part:
# Using vendor_part_id=None means we can produce multiple text parts if their deltas are sprinkled
# amongst the tool call deltas
maybe_event = self._parts_manager.handle_text_delta(
for event in self._parts_manager.handle_text_delta(
vendor_part_id=None, content=gemini_part['text']
)
if maybe_event is not None: # pragma: no branch
yield maybe_event
):
yield event

elif 'function_call' in gemini_part:
# Here, we assume all function_call parts are complete and don't have deltas.
Expand Down
Loading