From 4bbde1a26cf053634ea795425f5a0f86fec48b58 Mon Sep 17 00:00:00 2001 From: dan-ince-aai Date: Wed, 5 Nov 2025 10:19:17 +0000 Subject: [PATCH 01/11] chore: return language code + dynamic keyterms --- .../livekit/plugins/assemblyai/stt.py | 43 +++++++++++++++++-- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/livekit-plugins/livekit-plugins-assemblyai/livekit/plugins/assemblyai/stt.py b/livekit-plugins/livekit-plugins-assemblyai/livekit/plugins/assemblyai/stt.py index 239389ec8e..9491fd8d4e 100644 --- a/livekit-plugins/livekit-plugins-assemblyai/livekit/plugins/assemblyai/stt.py +++ b/livekit-plugins/livekit-plugins-assemblyai/livekit/plugins/assemblyai/stt.py @@ -147,6 +147,7 @@ def update_options( end_of_turn_confidence_threshold: NotGivenOr[float] = NOT_GIVEN, min_end_of_turn_silence_when_confident: NotGivenOr[int] = NOT_GIVEN, max_turn_silence: NotGivenOr[int] = NOT_GIVEN, + keyterms_prompt: NotGivenOr[list[str]] = NOT_GIVEN, ) -> None: if is_given(buffer_size_seconds): self._opts.buffer_size_seconds = buffer_size_seconds @@ -158,6 +159,8 @@ def update_options( ) if is_given(max_turn_silence): self._opts.max_turn_silence = max_turn_silence + if is_given(keyterms_prompt): + self._opts.keyterms_prompt = keyterms_prompt for stream in self._streams: stream.update_options( @@ -165,6 +168,7 @@ def update_options( end_of_turn_confidence_threshold=end_of_turn_confidence_threshold, min_end_of_turn_silence_when_confident=min_end_of_turn_silence_when_confident, max_turn_silence=max_turn_silence, + keyterms_prompt=keyterms_prompt, ) @@ -188,6 +192,7 @@ def __init__( self._session = http_session self._speech_duration: float = 0 self._reconnect_event = asyncio.Event() + self._ws: aiohttp.ClientWebSocketResponse | None = None def update_options( self, @@ -196,19 +201,45 @@ def update_options( end_of_turn_confidence_threshold: NotGivenOr[float] = NOT_GIVEN, min_end_of_turn_silence_when_confident: NotGivenOr[int] = NOT_GIVEN, max_turn_silence: NotGivenOr[int] = NOT_GIVEN, + keyterms_prompt: NotGivenOr[list[str]] = NOT_GIVEN, ) -> None: + # Build UpdateConfiguration message for dynamic updates + update_config = {"type": "UpdateConfiguration"} + needs_update = False + if is_given(buffer_size_seconds): self._opts.buffer_size_seconds = buffer_size_seconds if is_given(end_of_turn_confidence_threshold): self._opts.end_of_turn_confidence_threshold = end_of_turn_confidence_threshold + update_config["end_of_turn_confidence_threshold"] = end_of_turn_confidence_threshold + needs_update = True if is_given(min_end_of_turn_silence_when_confident): self._opts.min_end_of_turn_silence_when_confident = ( min_end_of_turn_silence_when_confident ) + update_config["min_end_of_turn_silence_when_confident"] = ( + min_end_of_turn_silence_when_confident + ) + needs_update = True if is_given(max_turn_silence): self._opts.max_turn_silence = max_turn_silence - - self._reconnect_event.set() + update_config["max_turn_silence"] = max_turn_silence + needs_update = True + if is_given(keyterms_prompt): + self._opts.keyterms_prompt = keyterms_prompt + update_config["keyterms_prompt"] = keyterms_prompt + needs_update = True + + # Send UpdateConfiguration message to active websocket if available + if needs_update and self._ws is not None and not self._ws.closed: + update_msg = json.dumps(update_config) + asyncio.create_task(self._ws.send_str(update_msg)) + logger.debug(f"Sent UpdateConfiguration: {update_msg}") + return # Don't trigger reconnection for dynamic updates + + # Only trigger reconnection if buffer_size_seconds changed (requires reconnect) + if is_given(buffer_size_seconds): + self._reconnect_event.set() async def _run(self) -> None: """ @@ -280,6 +311,7 @@ async def recv_task(ws: aiohttp.ClientWebSocketResponse) -> None: while True: try: ws = await self._connect_ws() + self._ws = ws # Store reference for dynamic updates tasks = [ asyncio.create_task(send_task(ws)), asyncio.create_task(recv_task(ws)), @@ -304,11 +336,13 @@ async def recv_task(ws: aiohttp.ClientWebSocketResponse) -> None: finally: if ws is not None: await ws.close() + self._ws = None # Clear reference async def _connect_ws(self) -> aiohttp.ClientWebSocketResponse: live_config = { "sample_rate": self._opts.sample_rate, "encoding": self._opts.encoding, + "language_detection": True, "speech_model": self._opts.speech_model, "format_turns": self._opts.format_turns if is_given(self._opts.format_turns) else None, "end_of_turn_confidence_threshold": self._opts.end_of_turn_confidence_threshold @@ -349,6 +383,7 @@ def _process_stream_event(self, data: dict) -> None: turn_is_formatted = data.get("turn_is_formatted", False) utterance = data.get("utterance", "") transcript = data.get("transcript", "") + language_code = data.get("language_code", "en") if words: interim_text = " ".join(word.get("text", "") for word in words) @@ -361,7 +396,7 @@ def _process_stream_event(self, data: dict) -> None: if utterance: final_event = stt.SpeechEvent( type=stt.SpeechEventType.PREFLIGHT_TRANSCRIPT, - alternatives=[stt.SpeechData(language="en", text=utterance)], + alternatives=[stt.SpeechData(language=language_code, text=utterance)], ) self._event_ch.send_nowait(final_event) @@ -371,7 +406,7 @@ def _process_stream_event(self, data: dict) -> None: ): final_event = stt.SpeechEvent( type=stt.SpeechEventType.FINAL_TRANSCRIPT, - alternatives=[stt.SpeechData(language="en", text=transcript)], + alternatives=[stt.SpeechData(language=language_code, text=transcript)], ) self._event_ch.send_nowait(final_event) From 719fded1d8bf42db9f485032a6127182f87a8ffa Mon Sep 17 00:00:00 2001 From: dan-ince-aai Date: Wed, 5 Nov 2025 10:21:47 +0000 Subject: [PATCH 02/11] chore: normalize preemptive generation text before validation --- livekit-agents/livekit/agents/voice/agent_activity.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/livekit-agents/livekit/agents/voice/agent_activity.py b/livekit-agents/livekit/agents/voice/agent_activity.py index 182eec7849..92c522adf8 100644 --- a/livekit-agents/livekit/agents/voice/agent_activity.py +++ b/livekit-agents/livekit/agents/voice/agent_activity.py @@ -4,6 +4,7 @@ import contextvars import heapq import json +import string import time from collections.abc import AsyncIterable, Coroutine, Sequence from dataclasses import dataclass @@ -1391,8 +1392,13 @@ async def _user_turn_completed_task( if preemptive := self._preemptive_generation: # make sure the on_user_turn_completed didn't change some request parameters # otherwise invalidate the preemptive generation + + # Normalize text by lowercasing and removing punctuation for comparison + def normalize_text(text: str) -> str: + return text.lower().translate(str.maketrans('', '', string.punctuation)) + if ( - preemptive.info.new_transcript == user_message.text_content + normalize_text(preemptive.info.new_transcript) == normalize_text(user_message.text_content) and preemptive.chat_ctx.is_equivalent(temp_mutable_chat_ctx) and preemptive.tools == self.tools and preemptive.tool_choice == self._tool_choice From 82d83597ba09303e1454f72bef683ee4ffca0d66 Mon Sep 17 00:00:00 2001 From: dan-ince-aai Date: Wed, 5 Nov 2025 10:36:38 +0000 Subject: [PATCH 03/11] chore: store language code from utterance for final transcript --- .../livekit/plugins/assemblyai/stt.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/livekit-plugins/livekit-plugins-assemblyai/livekit/plugins/assemblyai/stt.py b/livekit-plugins/livekit-plugins-assemblyai/livekit/plugins/assemblyai/stt.py index 9491fd8d4e..a002bb653e 100644 --- a/livekit-plugins/livekit-plugins-assemblyai/livekit/plugins/assemblyai/stt.py +++ b/livekit-plugins/livekit-plugins-assemblyai/livekit/plugins/assemblyai/stt.py @@ -193,6 +193,7 @@ def __init__( self._speech_duration: float = 0 self._reconnect_event = asyncio.Event() self._ws: aiohttp.ClientWebSocketResponse | None = None + self._current_language_code: str = "en" # Track language code from utterances def update_options( self, @@ -383,7 +384,9 @@ def _process_stream_event(self, data: dict) -> None: turn_is_formatted = data.get("turn_is_formatted", False) utterance = data.get("utterance", "") transcript = data.get("transcript", "") - language_code = data.get("language_code", "en") + # language_code is only returned with utterances, so track it for final transcript + if "language_code" in data: + self._current_language_code = data["language_code"] if words: interim_text = " ".join(word.get("text", "") for word in words) @@ -396,7 +399,9 @@ def _process_stream_event(self, data: dict) -> None: if utterance: final_event = stt.SpeechEvent( type=stt.SpeechEventType.PREFLIGHT_TRANSCRIPT, - alternatives=[stt.SpeechData(language=language_code, text=utterance)], + alternatives=[ + stt.SpeechData(language=self._current_language_code, text=utterance) + ], ) self._event_ch.send_nowait(final_event) @@ -406,7 +411,9 @@ def _process_stream_event(self, data: dict) -> None: ): final_event = stt.SpeechEvent( type=stt.SpeechEventType.FINAL_TRANSCRIPT, - alternatives=[stt.SpeechData(language=language_code, text=transcript)], + alternatives=[ + stt.SpeechData(language=self._current_language_code, text=transcript) + ], ) self._event_ch.send_nowait(final_event) From 425404ecf164b92e3fa63c9354356d58b1a771f2 Mon Sep 17 00:00:00 2001 From: dan-ince-aai Date: Wed, 5 Nov 2025 10:39:31 +0000 Subject: [PATCH 04/11] ruff --- livekit-agents/livekit/agents/voice/agent_activity.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/livekit-agents/livekit/agents/voice/agent_activity.py b/livekit-agents/livekit/agents/voice/agent_activity.py index 92c522adf8..97fa4b6f10 100644 --- a/livekit-agents/livekit/agents/voice/agent_activity.py +++ b/livekit-agents/livekit/agents/voice/agent_activity.py @@ -1392,13 +1392,14 @@ async def _user_turn_completed_task( if preemptive := self._preemptive_generation: # make sure the on_user_turn_completed didn't change some request parameters # otherwise invalidate the preemptive generation - + # Normalize text by lowercasing and removing punctuation for comparison def normalize_text(text: str) -> str: - return text.lower().translate(str.maketrans('', '', string.punctuation)) - + return text.lower().translate(str.maketrans("", "", string.punctuation)) + if ( - normalize_text(preemptive.info.new_transcript) == normalize_text(user_message.text_content) + normalize_text(preemptive.info.new_transcript) + == normalize_text(user_message.text_content) and preemptive.chat_ctx.is_equivalent(temp_mutable_chat_ctx) and preemptive.tools == self.tools and preemptive.tool_choice == self._tool_choice From cc24d1da8214b6fb4042d4d04206fd15582db76d Mon Sep 17 00:00:00 2001 From: dan-ince-aai Date: Wed, 5 Nov 2025 10:46:40 +0000 Subject: [PATCH 05/11] type checks --- .../livekit/plugins/assemblyai/stt.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/livekit-plugins/livekit-plugins-assemblyai/livekit/plugins/assemblyai/stt.py b/livekit-plugins/livekit-plugins-assemblyai/livekit/plugins/assemblyai/stt.py index a002bb653e..14629d1614 100644 --- a/livekit-plugins/livekit-plugins-assemblyai/livekit/plugins/assemblyai/stt.py +++ b/livekit-plugins/livekit-plugins-assemblyai/livekit/plugins/assemblyai/stt.py @@ -21,7 +21,7 @@ import os import weakref from dataclasses import dataclass -from typing import Literal +from typing import Any, Literal from urllib.parse import urlencode import aiohttp @@ -205,7 +205,7 @@ def update_options( keyterms_prompt: NotGivenOr[list[str]] = NOT_GIVEN, ) -> None: # Build UpdateConfiguration message for dynamic updates - update_config = {"type": "UpdateConfiguration"} + update_config: dict[str, Any] = {"type": "UpdateConfiguration"} needs_update = False if is_given(buffer_size_seconds): From 2676b6ca1786d9ae8d3a9fe36819ef3c6bc7af3d Mon Sep 17 00:00:00 2001 From: dan-ince-aai Date: Thu, 6 Nov 2025 11:17:36 +0000 Subject: [PATCH 06/11] type check fix --- livekit-agents/livekit/agents/voice/agent_activity.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/livekit-agents/livekit/agents/voice/agent_activity.py b/livekit-agents/livekit/agents/voice/agent_activity.py index 97fa4b6f10..2020209be1 100644 --- a/livekit-agents/livekit/agents/voice/agent_activity.py +++ b/livekit-agents/livekit/agents/voice/agent_activity.py @@ -1394,7 +1394,9 @@ async def _user_turn_completed_task( # otherwise invalidate the preemptive generation # Normalize text by lowercasing and removing punctuation for comparison - def normalize_text(text: str) -> str: + def normalize_text(text: str | None) -> str: + if text is None: + return "" return text.lower().translate(str.maketrans("", "", string.punctuation)) if ( From 5c7ce7a52021f889453c161d40c02fa95b91bb47 Mon Sep 17 00:00:00 2001 From: dan-ince-aai Date: Wed, 12 Nov 2025 13:19:21 +0000 Subject: [PATCH 07/11] remove punctuation normalization --- livekit-agents/livekit/agents/voice/agent_activity.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/livekit-agents/livekit/agents/voice/agent_activity.py b/livekit-agents/livekit/agents/voice/agent_activity.py index 2020209be1..d3a44c4a2b 100644 --- a/livekit-agents/livekit/agents/voice/agent_activity.py +++ b/livekit-agents/livekit/agents/voice/agent_activity.py @@ -1393,15 +1393,9 @@ async def _user_turn_completed_task( # make sure the on_user_turn_completed didn't change some request parameters # otherwise invalidate the preemptive generation - # Normalize text by lowercasing and removing punctuation for comparison - def normalize_text(text: str | None) -> str: - if text is None: - return "" - return text.lower().translate(str.maketrans("", "", string.punctuation)) - if ( - normalize_text(preemptive.info.new_transcript) - == normalize_text(user_message.text_content) + preemptive.info.new_transcript.lower() + == user_message.text_content.lower() and preemptive.chat_ctx.is_equivalent(temp_mutable_chat_ctx) and preemptive.tools == self.tools and preemptive.tool_choice == self._tool_choice From 4e1ef17ef2db791bc883c21fc9e31e7e8d9e092c Mon Sep 17 00:00:00 2001 From: dan-ince-aai Date: Wed, 12 Nov 2025 13:28:26 +0000 Subject: [PATCH 08/11] ruff fix --- livekit-agents/livekit/agents/voice/agent_activity.py | 1 - 1 file changed, 1 deletion(-) diff --git a/livekit-agents/livekit/agents/voice/agent_activity.py b/livekit-agents/livekit/agents/voice/agent_activity.py index d3a44c4a2b..9d7c8f0183 100644 --- a/livekit-agents/livekit/agents/voice/agent_activity.py +++ b/livekit-agents/livekit/agents/voice/agent_activity.py @@ -4,7 +4,6 @@ import contextvars import heapq import json -import string import time from collections.abc import AsyncIterable, Coroutine, Sequence from dataclasses import dataclass From 3b2915012e25489f1a1e0a48e85d7a32d9af6318 Mon Sep 17 00:00:00 2001 From: dan-ince-aai Date: Wed, 12 Nov 2025 20:03:08 +0000 Subject: [PATCH 09/11] ruff format --- livekit-agents/livekit/agents/voice/agent_activity.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/livekit-agents/livekit/agents/voice/agent_activity.py b/livekit-agents/livekit/agents/voice/agent_activity.py index 9d7c8f0183..3cbe7923e6 100644 --- a/livekit-agents/livekit/agents/voice/agent_activity.py +++ b/livekit-agents/livekit/agents/voice/agent_activity.py @@ -1393,8 +1393,7 @@ async def _user_turn_completed_task( # otherwise invalidate the preemptive generation if ( - preemptive.info.new_transcript.lower() - == user_message.text_content.lower() + preemptive.info.new_transcript.lower() == user_message.text_content.lower() and preemptive.chat_ctx.is_equivalent(temp_mutable_chat_ctx) and preemptive.tools == self.tools and preemptive.tool_choice == self._tool_choice From ca71588f2f8d7cfe5d4acd4c4f1270b035814c23 Mon Sep 17 00:00:00 2001 From: dan-ince-aai Date: Tue, 18 Nov 2025 11:23:45 +0000 Subject: [PATCH 10/11] add confidence values --- livekit-agents/livekit/agents/voice/agent_activity.py | 2 +- .../livekit/plugins/assemblyai/stt.py | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/livekit-agents/livekit/agents/voice/agent_activity.py b/livekit-agents/livekit/agents/voice/agent_activity.py index 3cbe7923e6..3654713a2f 100644 --- a/livekit-agents/livekit/agents/voice/agent_activity.py +++ b/livekit-agents/livekit/agents/voice/agent_activity.py @@ -1393,7 +1393,7 @@ async def _user_turn_completed_task( # otherwise invalidate the preemptive generation if ( - preemptive.info.new_transcript.lower() == user_message.text_content.lower() + (preemptive.info.new_transcript or "").lower() == (user_message.text_content or "").lower() and preemptive.chat_ctx.is_equivalent(temp_mutable_chat_ctx) and preemptive.tools == self.tools and preemptive.tool_choice == self._tool_choice diff --git a/livekit-plugins/livekit-plugins-assemblyai/livekit/plugins/assemblyai/stt.py b/livekit-plugins/livekit-plugins-assemblyai/livekit/plugins/assemblyai/stt.py index 14629d1614..4f47e7af92 100644 --- a/livekit-plugins/livekit-plugins-assemblyai/livekit/plugins/assemblyai/stt.py +++ b/livekit-plugins/livekit-plugins-assemblyai/livekit/plugins/assemblyai/stt.py @@ -384,6 +384,8 @@ def _process_stream_event(self, data: dict) -> None: turn_is_formatted = data.get("turn_is_formatted", False) utterance = data.get("utterance", "") transcript = data.get("transcript", "") + confidence = words[-1].get("confidence", 0.0) if words else 0.0 + # language_code is only returned with utterances, so track it for final transcript if "language_code" in data: self._current_language_code = data["language_code"] @@ -392,7 +394,7 @@ def _process_stream_event(self, data: dict) -> None: interim_text = " ".join(word.get("text", "") for word in words) interim_event = stt.SpeechEvent( type=stt.SpeechEventType.INTERIM_TRANSCRIPT, - alternatives=[stt.SpeechData(language="en", text=interim_text)], + alternatives=[stt.SpeechData(language="en", text=interim_text, confidence=confidence)], ) self._event_ch.send_nowait(interim_event) @@ -400,7 +402,7 @@ def _process_stream_event(self, data: dict) -> None: final_event = stt.SpeechEvent( type=stt.SpeechEventType.PREFLIGHT_TRANSCRIPT, alternatives=[ - stt.SpeechData(language=self._current_language_code, text=utterance) + stt.SpeechData(language=self._current_language_code, text=utterance, confidence=confidence) ], ) self._event_ch.send_nowait(final_event) @@ -412,7 +414,7 @@ def _process_stream_event(self, data: dict) -> None: final_event = stt.SpeechEvent( type=stt.SpeechEventType.FINAL_TRANSCRIPT, alternatives=[ - stt.SpeechData(language=self._current_language_code, text=transcript) + stt.SpeechData(language=self._current_language_code, text=transcript, confidence=confidence) ], ) self._event_ch.send_nowait(final_event) From 1c3af49642145063f93aeb3035d5b60a9b2271da Mon Sep 17 00:00:00 2001 From: dan-ince-aai Date: Tue, 18 Nov 2025 11:24:42 +0000 Subject: [PATCH 11/11] ruff format --- .../livekit/agents/voice/agent_activity.py | 3 ++- .../livekit/plugins/assemblyai/stt.py | 18 ++++++++++++++---- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/livekit-agents/livekit/agents/voice/agent_activity.py b/livekit-agents/livekit/agents/voice/agent_activity.py index 3654713a2f..40cb436db8 100644 --- a/livekit-agents/livekit/agents/voice/agent_activity.py +++ b/livekit-agents/livekit/agents/voice/agent_activity.py @@ -1393,7 +1393,8 @@ async def _user_turn_completed_task( # otherwise invalidate the preemptive generation if ( - (preemptive.info.new_transcript or "").lower() == (user_message.text_content or "").lower() + (preemptive.info.new_transcript or "").lower() + == (user_message.text_content or "").lower() and preemptive.chat_ctx.is_equivalent(temp_mutable_chat_ctx) and preemptive.tools == self.tools and preemptive.tool_choice == self._tool_choice diff --git a/livekit-plugins/livekit-plugins-assemblyai/livekit/plugins/assemblyai/stt.py b/livekit-plugins/livekit-plugins-assemblyai/livekit/plugins/assemblyai/stt.py index 4f47e7af92..d5bc898941 100644 --- a/livekit-plugins/livekit-plugins-assemblyai/livekit/plugins/assemblyai/stt.py +++ b/livekit-plugins/livekit-plugins-assemblyai/livekit/plugins/assemblyai/stt.py @@ -385,7 +385,7 @@ def _process_stream_event(self, data: dict) -> None: utterance = data.get("utterance", "") transcript = data.get("transcript", "") confidence = words[-1].get("confidence", 0.0) if words else 0.0 - + # language_code is only returned with utterances, so track it for final transcript if "language_code" in data: self._current_language_code = data["language_code"] @@ -394,7 +394,9 @@ def _process_stream_event(self, data: dict) -> None: interim_text = " ".join(word.get("text", "") for word in words) interim_event = stt.SpeechEvent( type=stt.SpeechEventType.INTERIM_TRANSCRIPT, - alternatives=[stt.SpeechData(language="en", text=interim_text, confidence=confidence)], + alternatives=[ + stt.SpeechData(language="en", text=interim_text, confidence=confidence) + ], ) self._event_ch.send_nowait(interim_event) @@ -402,7 +404,11 @@ def _process_stream_event(self, data: dict) -> None: final_event = stt.SpeechEvent( type=stt.SpeechEventType.PREFLIGHT_TRANSCRIPT, alternatives=[ - stt.SpeechData(language=self._current_language_code, text=utterance, confidence=confidence) + stt.SpeechData( + language=self._current_language_code, + text=utterance, + confidence=confidence, + ) ], ) self._event_ch.send_nowait(final_event) @@ -414,7 +420,11 @@ def _process_stream_event(self, data: dict) -> None: final_event = stt.SpeechEvent( type=stt.SpeechEventType.FINAL_TRANSCRIPT, alternatives=[ - stt.SpeechData(language=self._current_language_code, text=transcript, confidence=confidence) + stt.SpeechData( + language=self._current_language_code, + text=transcript, + confidence=confidence, + ) ], ) self._event_ch.send_nowait(final_event)