Skip to content

Lorenze/imp/streaming-conversational#6360

Open
lorenzejay wants to merge 3 commits into
mainfrom
lorenze/imp/streaming
Open

Lorenze/imp/streaming-conversational#6360
lorenzejay wants to merge 3 commits into
mainfrom
lorenze/imp/streaming

Conversation

@lorenzejay

@lorenzejay lorenzejay commented Jun 26, 2026

Copy link
Copy Markdown
Collaborator

Note

Medium Risk
Changes nested kickoff and experimental conversational streaming paths; behavior is covered by new tests but affects core flow execution timing and LLM stream flags.

Overview
Adds streaming support for conversational Flow turns when stream=True, including LLM chunk emission during built-in converse handling and correct behavior when handle_turn returns a FlowStreamingOutput.

The flow runtime now tracks an active streaming run (_streaming_run / _should_stream_llm_calls) so inner kickoff can stream without relying on toggling self.stream alone. For streaming kickoff, pending conversational kickoff context (user message, intents, intent LLM) is captured before the background run, restored inside the worker, and cleared in finally—fixing lost user messages when handle_turn clears pending state before the deferred thread executes.

The CLI crew-run TUI drains streaming turn results via _consume_conversation_streaming_result before reading .result, avoiding premature access errors.

Reviewed by Cursor Bugbot for commit 4fd8327. Bugbot is set up for automated code reviews on this repo. Configure here.

Summary by CodeRabbit

  • New Features

    • Conversational flows now support streamed responses during turns, with streaming automatically enabled for the duration of the turn and restored afterward.
    • Pending conversational state is preserved across kickoff and restored when execution resumes.
  • Bug Fixes

    • Streaming conversation outputs are fully consumed before returning the final response.
    • Streamed turn handling now retains pending user messages correctly, keeping conversation history consistent.
  • Tests

    • Added coverage for streaming consumption behavior and kickoff/pending state preservation in conversational flow scenarios.

- Introduced  function to determine if a result is a streaming output.
- Added  method to handle streaming results before accessing them.
- Updated  method to utilize the new streaming result handling.
- Implemented context management for LLM streaming in the conversational mixin.
- Added tests to verify streaming behavior and ensure proper handling of user messages during streaming.

@corridor-security corridor-security Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary: This PR changes conversational flow streaming behavior and TUI handling of streaming results, without adding new public endpoints, authentication paths, file/SQL/network inputs, or privilege boundaries. No exploitable security vulnerabilities were identified.

Risk: Low risk. The changes operate on internal flow/runtime state and local CLI result consumption, with no attacker-controlled security-sensitive data handling introduced.

@coderabbitai

coderabbitai Bot commented Jun 26, 2026

Copy link
Copy Markdown

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro Plus

Run ID: cbb8f64e-243d-424a-883b-257fbdb1b7ee

📥 Commits

Reviewing files that changed from the base of the PR and between 6827131 and 4fd8327.

📒 Files selected for processing (1)
  • lib/cli/src/crewai_cli/crew_run_tui.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • lib/cli/src/crewai_cli/crew_run_tui.py

📝 Walkthrough

Walkthrough

The PR adds CLI streaming-output handling and updates conversational flow runtime code to enable LLM streaming during turns, capture and restore pending kickoff context, and wrap sync and async kickoff execution with streaming-run state.

Changes

Streaming conversation and kickoff flow

Layer / File(s) Summary
Detect streaming results
lib/cli/src/crewai_cli/crew_run_tui.py
Adds imports and _is_streaming_output(value) to identify iterable outputs that expose get_full_text and result.
Consume conversation streams
lib/cli/src/crewai_cli/crew_run_tui.py, lib/cli/tests/test_crew_run_tui.py
_run_conversation_turn_worker now routes streaming results through _consume_conversation_streaming_result(result), and the new test covers chunk consumption with FlowStreamingOutput and StreamChunk.
Streaming control for turns
lib/crewai/src/crewai/experimental/conversational_mixin.py, lib/crewai/src/crewai/flow/runtime/__init__.py, lib/crewai/tests/test_flow_conversation.py
Adds _streaming_conversation_llm, _should_stream_llm_calls, _streaming_run, and related state so converse_turn() runs the LLM call in a temporary streaming context; tests cover the stream toggle.
Pending kickoff context and execution
lib/crewai/src/crewai/experimental/conversational_mixin.py, lib/crewai/src/crewai/flow/runtime/__init__.py, lib/crewai/tests/test_flow_conversation.py
Adds pending kickoff context capture/restore/clear hooks and wraps sync and async kickoff execution with context restoration, streaming-run state, and cleanup; tests cover handle_turn() preserving the pending user message.

Sequence Diagram(s)

sequenceDiagram
  participant Flow
  participant _ConversationalMixin
  participant llm_instance
  Flow->>Flow: _capture_pending_kickoff_context()
  Flow->>Flow: _should_stream_llm_calls()
  Flow->>_ConversationalMixin: _restore_pending_kickoff_context(context)
  Flow->>Flow: enter _streaming_run()
  Flow->>llm_instance: call(...)
  llm_instance-->>Flow: response
  Flow->>_ConversationalMixin: _clear_pending_kickoff_context()
Loading

Suggested reviewers

  • vinibrsl
  • joaomdmoura
🚥 Pre-merge checks | ✅ 3 | ❌ 2

❌ Failed checks (1 warning, 1 inconclusive)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 28.57% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title is a branch-like label and does not clearly describe the PR’s main change. Replace it with a concise, descriptive sentence naming the primary change, such as streaming conversational flow updates.
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch lorenze/imp/streaming

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@lib/crewai/src/crewai/experimental/conversational_mixin.py`:
- Around line 66-78: The _streaming_conversation_llm context manager currently
mutates llm.stream on a shared BaseLLM instance, which can race across
overlapping turns and restore the wrong state. Update the conversational flow so
each turn uses a per-call/per-session LLM instance or another concurrency-safe
streaming mechanism instead of modifying shared state in
_streaming_conversation_llm, and keep the streaming enablement scoped to
call()/acall() usage in the conversational mixin.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro Plus

Run ID: 4a28d76e-edba-458e-b824-74ba467b2ab7

📥 Commits

Reviewing files that changed from the base of the PR and between a149a30 and 6827131.

📒 Files selected for processing (5)
  • lib/cli/src/crewai_cli/crew_run_tui.py
  • lib/cli/tests/test_crew_run_tui.py
  • lib/crewai/src/crewai/experimental/conversational_mixin.py
  • lib/crewai/src/crewai/flow/runtime/__init__.py
  • lib/crewai/tests/test_flow_conversation.py

Comment on lines +66 to +78
@contextmanager
def _streaming_conversation_llm(llm: BaseLLM, *, enabled: bool) -> Any:
"""Temporarily enable LLM streaming for Flow streaming turns."""
if not enabled:
yield
return

previous_stream = llm.stream
llm.stream = True
try:
yield
finally:
llm.stream = previous_stream

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟠 Major | 🏗️ Heavy lift

🧩 Analysis chain

🌐 Web query:

Does the crewAI BaseLLM / LLM.call API accept a per-call streaming parameter, or is streaming controlled only via the instance-level llm.stream attribute?

💡 Result:

In crewAI, streaming is controlled at the instance level through the stream attribute of the LLM object, rather than through a per-call parameter in the call() or acall() API [1][2]. When initializing an LLM instance, you enable streaming by setting stream=True [1][2]: llm = LLM( model="openai/gpt-4o", stream=True) The call() and acall() methods do not accept a streaming flag [3]. Instead, when stream is set to True on the instance, crewAI automatically handles the streaming process by emitting LLMStreamChunkEvent objects via the crewai_event_bus for every token received [4][3][5][6]. To consume these streamed tokens, you must register an event handler with the event bus to listen for these LLMStreamChunkEvent occurrences [4][5][2]. Top results: [1][3][5][2][6]

Citations:


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Inspect the relevant files and symbols with line numbers.
git ls-files lib/crewai/src/crewai/experimental/conversational_mixin.py lib/crewai/src/crewai/llms/base_llm.py lib/crewai/src/crewai/llms/llm.py 2>/dev/null || true

printf '\n--- conversational_mixin.py outline ---\n'
ast-grep outline lib/crewai/src/crewai/experimental/conversational_mixin.py --view expanded

printf '\n--- base_llm.py outline ---\n'
ast-grep outline lib/crewai/src/crewai/llms/base_llm.py --view expanded

printf '\n--- search for stream usage and call signatures ---\n'
rg -n "def (call|acall)\(|stream\s*=|stream:" lib/crewai/src/crewai/llms lib/crewai/src/crewai/experimental -g '*.py'

Repository: crewAIInc/crewAI

Length of output: 15588


🏁 Script executed:

#!/bin/bash
set -euo pipefail

printf 'FILE: conversational_mixin.py\n'
sed -n '1,120p' lib/crewai/src/crewai/experimental/conversational_mixin.py | cat -n

printf '\nFILE: base_llm.py\n'
sed -n '1,260p' lib/crewai/src/crewai/llms/base_llm.py | cat -n

printf '\nFILE: llm.py\n'
sed -n '1,260p' lib/crewai/src/crewai/llms/llm.py | cat -n

Repository: crewAIInc/crewAI

Length of output: 15468


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Map the relevant files first, then read only the needed sections.
printf '--- conversational_mixin.py outline ---\n'
ast-grep outline lib/crewai/src/crewai/experimental/conversational_mixin.py --view expanded

printf '\n--- conversational_mixin.py lines 1-220 ---\n'
sed -n '1,220p' lib/crewai/src/crewai/experimental/conversational_mixin.py | cat -n

printf '\n--- conversational_mixin.py lines 220-820 ---\n'
sed -n '220,820p' lib/crewai/src/crewai/experimental/conversational_mixin.py | cat -n

printf '\n--- base_llm.py outline ---\n'
ast-grep outline lib/crewai/src/crewai/llms/base_llm.py --view expanded

printf '\n--- base_llm.py lines 1-260 ---\n'
sed -n '1,260p' lib/crewai/src/crewai/llms/base_llm.py | cat -n

Repository: crewAIInc/crewAI

Length of output: 245


🏁 Script executed:

#!/bin/bash
set -euo pipefail

set +e
printf '%s\n' '--- conversational_mixin.py around _default_conversation_llm ---'
sed -n '1010,1050p' lib/crewai/src/crewai/experimental/conversational_mixin.py | cat -n

printf '\n%s\n' '--- conversational_mixin.py around converse_turn ---'
sed -n '220,260p' lib/crewai/src/crewai/experimental/conversational_mixin.py | cat -n

printf '\n%s\n' '--- base_llm.py around call/acall ---'
sed -n '280,360p' lib/crewai/src/crewai/llms/base_llm.py | cat -n

printf '\n%s\n' '--- base_llm.py around stream handling helper ---'
sed -n '480,550p' lib/crewai/src/crewai/llms/base_llm.py | cat -n

Repository: crewAIInc/crewAI

Length of output: 11071


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Find where conversational_config / llm are assigned and used.
rg -n "conversational_config|_default_conversation_llm\(|llm =" lib/crewai/src/crewai/experimental/conversational_mixin.py lib/crewai/src/crewai/experimental -g '*.py'

# Inspect the ConversationConfig definition to see whether llm is mutable/shared by design.
rg -n "class ConversationConfig|llm:|router_llm:|config.llm" lib/crewai/src/crewai/experimental -g '*.py'

Repository: crewAIInc/crewAI

Length of output: 5604


🏁 Script executed:

#!/bin/bash
set -euo pipefail

printf '%s\n' '--- conversational_mixin.py around _default_conversation_llm ---'
sed -n '1010,1050p' lib/crewai/src/crewai/experimental/conversational_mixin.py | cat -n

printf '\n%s\n' '--- conversational_mixin.py around converse_turn ---'
sed -n '220,260p' lib/crewai/src/crewai/experimental/conversational_mixin.py | cat -n

printf '\n%s\n' '--- base_llm.py around call/acall ---'
sed -n '280,360p' lib/crewai/src/crewai/llms/base_llm.py | cat -n

printf '\n%s\n' '--- base_llm.py around stream handling helper ---'
sed -n '480,550p' lib/crewai/src/crewai/llms/base_llm.py | cat -n

printf '\n%s\n' '--- conversational config and llm assignment/search ---'
rg -n "class ConversationConfig|conversational_config|_default_conversation_llm\(|config\.llm|llm =" lib/crewai/src/crewai/experimental -g '*.py'

Repository: crewAIInc/crewAI

Length of output: 13283


Avoid mutating llm.stream on a shared instance

config.llm comes from the class-level conversational_config, so overlapping turns/sessions can race on this save/restore and leave stream in the wrong state. call()/acall() don’t accept a per-call streaming flag, so this needs a per-turn LLM instance or another concurrency-safe streaming hook instead of mutating shared state.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@lib/crewai/src/crewai/experimental/conversational_mixin.py` around lines 66 -
78, The _streaming_conversation_llm context manager currently mutates llm.stream
on a shared BaseLLM instance, which can race across overlapping turns and
restore the wrong state. Update the conversational flow so each turn uses a
per-call/per-session LLM instance or another concurrency-safe streaming
mechanism instead of modifying shared state in _streaming_conversation_llm, and
keep the streaming enablement scoped to call()/acall() usage in the
conversational mixin.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants