Skip to content

Commit 3e10862

Browse files
[LiveKit] Fixes race condition and import issues (#90)
- Adds a check for the `agent_session._started`​ flag and waits for it to start up - Fixes import errors if packages are not present for google - since we do process operations sequentially, we would process data only when session is available
1 parent 226f48c commit 3e10862

File tree

7 files changed

+57
-20
lines changed

7 files changed

+57
-20
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ See [cookbook/agno_agent.py](cookbook/agno_agent.py) for an example of tracing a
6060

6161
## Version changelog
6262

63+
### 3.11.4
64+
- fix: Fixes race condition in LiveKit realtime tracing
65+
- fix: Fixes import errors for Gemini and Google realtime session imports
66+
6367
### 3.11.3
6468
- fix: Fixed Nested Spans issue for Google ADK
6569

maxim/logger/livekit/agent_session.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import inspect
33
import traceback
44
import time
5+
from typing import Optional
56
import uuid
67
import weakref
78
from datetime import datetime, timezone
@@ -41,7 +42,15 @@ def intercept_session_start(self: AgentSession, room, room_name, agent: Agent):
4142
The session info along with room_id, agent_id, etc is stored in the thread-local store.
4243
"""
4344
maxim_logger = get_maxim_logger()
44-
scribe().debug(f"[Internal][{self.__class__.__name__}] Session started")
45+
46+
# Wait for start signal (max ~5s) before proceeding
47+
for _ in range(500):
48+
if getattr(self, "_started", False):
49+
scribe().debug(f"[Internal][{self.__class__.__name__}] Session started")
50+
break
51+
time.sleep(0.01)
52+
else:
53+
scribe().debug(f"[Internal][{self.__class__.__name__}] start not signaled within timeout; continuing")
4554
# getting the room_id
4655
if isinstance(room, str):
4756
room_id = room
@@ -100,10 +109,10 @@ def intercept_session_start(self: AgentSession, room, room_name, agent: Agent):
100109
)
101110

102111
current_turn_id = str(uuid.uuid4())
103-
if self.stt is not None or self._agent.stt is not NOT_GIVEN:
112+
if self.stt is not None or agent.stt is not NOT_GIVEN:
104113
# Only add generation if we are not in realtime session
105-
llm_opts: _LLMOptions = self.llm._opts if self.llm is not None else self._agent.llm._opts
106-
model = self.llm.model if self.llm is not None else self._agent.llm.model
114+
llm_opts: _LLMOptions = self.llm._opts if self.llm is not None else agent.llm._opts
115+
model = self.llm.model if self.llm is not None else agent.llm.model
107116
if llm_opts is not None:
108117
model_parameters = extract_llm_model_parameters(llm_opts)
109118
else:

maxim/logger/livekit/gemini/gemini_realtime_session.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,21 @@
3232
from io import BytesIO
3333
from typing import Union
3434

35-
from google.genai.types import (
36-
LiveConnectConfig,
37-
LiveServerContent,
38-
LiveServerToolCall,
39-
UsageMetadata,
40-
Content,
41-
)
35+
try:
36+
from google.genai.types import (
37+
LiveConnectConfig,
38+
LiveServerContent,
39+
LiveServerToolCall,
40+
UsageMetadata,
41+
Content,
42+
)
43+
except ImportError:
44+
pass
4245
from livekit.agents.llm import InputTranscriptionCompleted
43-
from livekit.plugins.google.beta.realtime.realtime_api import RealtimeSession
46+
try:
47+
from livekit.plugins.google.realtime.realtime_api import RealtimeSession
48+
except ImportError:
49+
from livekit.plugins.google.beta.realtime.realtime_api import RealtimeSession
4450
from livekit.rtc import AudioFrame
4551

4652
from maxim.scribe import scribe

maxim/logger/livekit/gemini/instrumenter.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,18 @@
1313
still capturing detailed observability and logging data for each session.
1414
"""
1515

16-
from maxim.logger.livekit.gemini.gemini_realtime_session import instrument_gemini_session
16+
# Import instrument_gemini_session conditionally to avoid dependency issues
1717

1818

1919
def instrument_gemini():
2020
"""Monkey-patch Gemini's `RealtimeSession` methods with instrumentation.
2121
"""
2222
try:
23-
from livekit.plugins.google.beta.realtime.realtime_api import RealtimeSession
23+
try:
24+
from livekit.plugins.google.realtime.realtime_api import RealtimeSession
25+
except ImportError:
26+
from livekit.plugins.google.beta.realtime.realtime_api import RealtimeSession
27+
from maxim.logger.livekit.gemini.gemini_realtime_session import instrument_gemini_session
2428

2529
for name, orig in [
2630
(n, getattr(RealtimeSession, n))
@@ -29,5 +33,6 @@ def instrument_gemini():
2933
]:
3034
if name != "__class__" and not name.startswith("__"):
3135
setattr(RealtimeSession, name, instrument_gemini_session(orig, name))
32-
except ImportError:
36+
except (ImportError, NameError):
37+
# Gemini dependencies not available, skip instrumentation
3338
pass

maxim/logger/livekit/instrumenter.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from ...logger import Logger
66
from .agent_activity import instrument_agent_activity
77
from .agent_session import instrument_agent_session
8-
from .gemini.instrumenter import instrument_gemini
8+
# Import instrument_gemini conditionally to avoid dependency issues
99
from .llm import instrument_llm_init
1010
from .realtime_session import instrument_realtime_session
1111
from .stt import instrument_stt_init
@@ -115,4 +115,9 @@ def instrument_livekit(logger: Logger, callback: MaximLiveKitCallback = None):
115115
setattr(TTS, name, instrument_tts(orig, name))
116116

117117
# Instrument gemini models if present
118-
instrument_gemini()
118+
try:
119+
from .gemini.instrumenter import instrument_gemini
120+
instrument_gemini()
121+
except (ImportError, NameError):
122+
# Gemini dependencies not available, skip instrumentation
123+
pass

maxim/logger/livekit/realtime_session.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,14 @@
3030
from ...scribe import scribe
3131
from ..components import FileDataAttachment
3232
from ..utils import pcm16_to_wav_bytes
33-
from .gemini.gemini_realtime_session import handle_google_input_transcription_completed
33+
# Import Gemini handler conditionally to avoid dependency issues
34+
try:
35+
from .gemini.gemini_realtime_session import handle_google_input_transcription_completed
36+
GEMINI_HANDLER_AVAILABLE = True
37+
except (ImportError, NameError):
38+
# Gemini dependencies not available
39+
handle_google_input_transcription_completed = None
40+
GEMINI_HANDLER_AVAILABLE = False
3441
from .openai.realtime.handler import (
3542
handle_openai_client_event_queued,
3643
handle_openai_input_transcription_completed,
@@ -73,7 +80,8 @@ def intercept_realtime_session_emit(self: RealtimeSession, event, data):
7380
if session_info.provider == "openai-realtime":
7481
handle_openai_input_transcription_completed(session_info, data)
7582
elif session_info.provider == "google-realtime":
76-
handle_google_input_transcription_completed(session_info, data)
83+
if GEMINI_HANDLER_AVAILABLE and handle_google_input_transcription_completed:
84+
handle_google_input_transcription_completed(session_info, data)
7785
elif event == "error":
7886
scribe().debug(f"[Internal][{self.__class__.__name__}] error;")
7987
if data is not None and isinstance(data, RealtimeModelError):

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "maxim-py"
7-
version = "3.11.3"
7+
version = "3.11.4"
88
description = "A package that allows you to use the Maxim Python Library to interact with the Maxim Platform."
99
readme = "README.md"
1010
requires-python = ">=3.9.20"

0 commit comments

Comments
 (0)