diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/src/opentelemetry/instrumentation/openai_agents/__init__.py b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/src/opentelemetry/instrumentation/openai_agents/__init__.py index 339ccc5ba2..84942b20e3 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/src/opentelemetry/instrumentation/openai_agents/__init__.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/src/opentelemetry/instrumentation/openai_agents/__init__.py @@ -191,10 +191,82 @@ def _instrument(self, **kwargs) -> None: provider.set_processors([*existing, processor]) self._processor = processor + try: + from wrapt import wrap_function_wrapper + from opentelemetry import context as context_api + from .handler import RealtimeTelemetryHandler + except ImportError: + logger.debug("Realtime instrumentation dependencies not available, skipping") + else: + # Attribute storing the telemetry handler on the model instance + _OTEL_HANDLER_ATTR = "_otel_telemetry_handler" + + def _wrap_init(wrapped: Any, instance: Any, args: Any, kwargs: Any) -> Any: + wrapped(*args, **kwargs) + try: + handler = RealtimeTelemetryHandler( + tracer=tracer, + ) + setattr(instance.model, _OTEL_HANDLER_ATTR, handler) + logger.debug("Attached realtime telemetry handler to model %s", instance.model) + except Exception: + logger.warning("Failed to auto-attach telemetry handler", exc_info=True) + + async def _wrap_close(wrapped: Any, instance: Any, args: Any, kwargs: Any) -> Any: + try: + return await wrapped(*args, **kwargs) + finally: + try: + model = getattr(instance, "model", None) + if model is not None: + handler = getattr(model, _OTEL_HANDLER_ATTR, None) + if handler is not None and hasattr(handler, "cleanup"): + handler.cleanup() + setattr(model, _OTEL_HANDLER_ATTR, None) + except Exception: + logger.debug("Error during auto telemetry cleanup", exc_info=True) + + async def _wrap_emit_event(wrapped: Any, instance: Any, args: Any, kwargs: Any) -> Any: + handler = getattr(instance, _OTEL_HANDLER_ATTR, None) + if handler is not None: + event = args[0] if args else kwargs.get("event") + if event is not None: + ctx = handler.handle_event(event) + if ctx is not None: + token = context_api.attach(ctx) + try: + return await wrapped(*args, **kwargs) + finally: + context_api.detach(token) + return await wrapped(*args, **kwargs) + + + wrap_function_wrapper( + "agents.realtime.session", "RealtimeSession.__init__", _wrap_init + ) + wrap_function_wrapper( + "agents.realtime.session", "RealtimeSession.close", _wrap_close + ) + wrap_function_wrapper( + "agents.realtime.openai_realtime", + "OpenAIRealtimeWebSocketModel._emit_event", + _wrap_emit_event, + ) + self._realtime_patched = True + def _uninstrument(self, **kwargs) -> None: if self._processor is None: return + if getattr(self, "_realtime_patched", False): + from agents.realtime import session as session_module + from agents.realtime import openai_realtime as realtime_model_module + from opentelemetry.instrumentation.utils import unwrap + unwrap(session_module.RealtimeSession, "__init__") + unwrap(session_module.RealtimeSession, "close") + unwrap(realtime_model_module.OpenAIRealtimeWebSocketModel, "_emit_event") + self._realtime_patched = False + tracing = _load_tracing_module() provider = tracing.get_trace_provider() current = _get_registered_processors(provider) diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/src/opentelemetry/instrumentation/openai_agents/_constants.py b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/src/opentelemetry/instrumentation/openai_agents/_constants.py new file mode 100644 index 0000000000..5de1913a56 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/src/opentelemetry/instrumentation/openai_agents/_constants.py @@ -0,0 +1,130 @@ +"""Shared constants for the OpenAI Agents instrumentation. + +Centralises meter identity, metric instrument names, semantic convention +attribute keys, and operation name values so that ``span_processor`` and +``handler`` stay in sync without duplicating strings. +""" + +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAIAttributes, +) +from opentelemetry.semconv._incubating.attributes import ( + server_attributes as ServerAttributes, +) +from opentelemetry.semconv.attributes import ( + error_attributes as ErrorAttributes, +) + +METER_NAME = "opentelemetry.instrumentation.openai_agents" +METER_VERSION = "0.1.0" + + + +def _attr(name: str, fallback: str) -> str: + return getattr(GenAIAttributes, name, fallback) + + +ERROR_TYPE = ErrorAttributes.ERROR_TYPE + +GEN_AI_SYSTEM_KEY = getattr(GenAIAttributes, "GEN_AI_SYSTEM", "gen_ai.system") + +GEN_AI_PROVIDER_NAME = _attr("GEN_AI_PROVIDER_NAME", "gen_ai.provider.name") +GEN_AI_OPERATION_NAME = _attr("GEN_AI_OPERATION_NAME", "gen_ai.operation.name") +GEN_AI_REQUEST_MODEL = _attr("GEN_AI_REQUEST_MODEL", "gen_ai.request.model") +GEN_AI_REQUEST_MAX_TOKENS = _attr( + "GEN_AI_REQUEST_MAX_TOKENS", "gen_ai.request.max_tokens" +) +GEN_AI_REQUEST_TEMPERATURE = _attr( + "GEN_AI_REQUEST_TEMPERATURE", "gen_ai.request.temperature" +) +GEN_AI_REQUEST_TOP_P = _attr("GEN_AI_REQUEST_TOP_P", "gen_ai.request.top_p") +GEN_AI_REQUEST_TOP_K = _attr("GEN_AI_REQUEST_TOP_K", "gen_ai.request.top_k") +GEN_AI_REQUEST_FREQUENCY_PENALTY = _attr( + "GEN_AI_REQUEST_FREQUENCY_PENALTY", "gen_ai.request.frequency_penalty" +) +GEN_AI_REQUEST_PRESENCE_PENALTY = _attr( + "GEN_AI_REQUEST_PRESENCE_PENALTY", "gen_ai.request.presence_penalty" +) +GEN_AI_REQUEST_CHOICE_COUNT = _attr( + "GEN_AI_REQUEST_CHOICE_COUNT", "gen_ai.request.choice.count" +) +GEN_AI_REQUEST_STOP_SEQUENCES = _attr( + "GEN_AI_REQUEST_STOP_SEQUENCES", "gen_ai.request.stop_sequences" +) +GEN_AI_REQUEST_ENCODING_FORMATS = _attr( + "GEN_AI_REQUEST_ENCODING_FORMATS", "gen_ai.request.encoding_formats" +) +GEN_AI_REQUEST_SEED = _attr("GEN_AI_REQUEST_SEED", "gen_ai.request.seed") +GEN_AI_RESPONSE_ID = _attr("GEN_AI_RESPONSE_ID", "gen_ai.response.id") +GEN_AI_RESPONSE_MODEL = _attr( + "GEN_AI_RESPONSE_MODEL", "gen_ai.response.model" +) +GEN_AI_RESPONSE_FINISH_REASONS = _attr( + "GEN_AI_RESPONSE_FINISH_REASONS", "gen_ai.response.finish_reasons" +) +GEN_AI_USAGE_INPUT_TOKENS = _attr( + "GEN_AI_USAGE_INPUT_TOKENS", "gen_ai.usage.input_tokens" +) +GEN_AI_USAGE_OUTPUT_TOKENS = _attr( + "GEN_AI_USAGE_OUTPUT_TOKENS", "gen_ai.usage.output_tokens" +) +GEN_AI_CONVERSATION_ID = _attr( + "GEN_AI_CONVERSATION_ID", "gen_ai.conversation.id" +) +GEN_AI_AGENT_ID = _attr("GEN_AI_AGENT_ID", "gen_ai.agent.id") +GEN_AI_AGENT_NAME = _attr("GEN_AI_AGENT_NAME", "gen_ai.agent.name") +GEN_AI_AGENT_DESCRIPTION = _attr( + "GEN_AI_AGENT_DESCRIPTION", "gen_ai.agent.description" +) +GEN_AI_TOOL_NAME = _attr("GEN_AI_TOOL_NAME", "gen_ai.tool.name") +GEN_AI_TOOL_TYPE = _attr("GEN_AI_TOOL_TYPE", "gen_ai.tool.type") +GEN_AI_TOOL_CALL_ID = _attr("GEN_AI_TOOL_CALL_ID", "gen_ai.tool.call.id") +GEN_AI_TOOL_DESCRIPTION = _attr( + "GEN_AI_TOOL_DESCRIPTION", "gen_ai.tool.description" +) +GEN_AI_OUTPUT_TYPE = _attr("GEN_AI_OUTPUT_TYPE", "gen_ai.output.type") +GEN_AI_SYSTEM_INSTRUCTIONS = _attr( + "GEN_AI_SYSTEM_INSTRUCTIONS", "gen_ai.system_instructions" +) +GEN_AI_INPUT_MESSAGES = _attr("GEN_AI_INPUT_MESSAGES", "gen_ai.input.messages") +GEN_AI_OUTPUT_MESSAGES = _attr( + "GEN_AI_OUTPUT_MESSAGES", "gen_ai.output.messages" +) +GEN_AI_DATA_SOURCE_ID = _attr( + "GEN_AI_DATA_SOURCE_ID", "gen_ai.data_source.id" +) +GEN_AI_TOKEN_TYPE = _attr("GEN_AI_TOKEN_TYPE", "gen_ai.token.type") + +# The semantic conventions currently expose multiple usage token attributes; +# we retain the completion/prompt aliases for backwards compatibility. +GEN_AI_USAGE_PROMPT_TOKENS = _attr( + "GEN_AI_USAGE_PROMPT_TOKENS", "gen_ai.usage.prompt_tokens" +) +GEN_AI_USAGE_COMPLETION_TOKENS = _attr( + "GEN_AI_USAGE_COMPLETION_TOKENS", "gen_ai.usage.completion_tokens" +) + +# Attributes not (yet) defined in the spec retain their literal values. +GEN_AI_TOOL_CALL_ARGUMENTS = "gen_ai.tool.call.arguments" +GEN_AI_TOOL_CALL_RESULT = "gen_ai.tool.call.result" +GEN_AI_TOOL_DEFINITIONS = "gen_ai.tool.definitions" +GEN_AI_ORCHESTRATOR_AGENT_DEFINITIONS = "gen_ai.orchestrator.agent.definitions" +GEN_AI_GUARDRAIL_NAME = "gen_ai.guardrail.name" +GEN_AI_GUARDRAIL_TRIGGERED = "gen_ai.guardrail.triggered" +GEN_AI_HANDOFF_FROM_AGENT = "gen_ai.handoff.from_agent" +GEN_AI_HANDOFF_TO_AGENT = "gen_ai.handoff.to_agent" +GEN_AI_EMBEDDINGS_DIMENSION_COUNT = "gen_ai.embeddings.dimension.count" + +GEN_AI_SESSION_ID = "gen_ai.session.id" +GEN_AI_RESPONSE_STATUS = "gen_ai.response.status" + + +SERVER_ADDRESS = ServerAttributes.SERVER_ADDRESS +SERVER_PORT = ServerAttributes.SERVER_PORT + + +INVOKE_AGENT = GenAIAttributes.GenAiOperationNameValues.INVOKE_AGENT.value +EXECUTE_TOOL = GenAIAttributes.GenAiOperationNameValues.EXECUTE_TOOL.value +GENERATE_CONTENT = ( + GenAIAttributes.GenAiOperationNameValues.GENERATE_CONTENT.value +) diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/src/opentelemetry/instrumentation/openai_agents/handler.py b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/src/opentelemetry/instrumentation/openai_agents/handler.py new file mode 100644 index 0000000000..2001db2438 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/src/opentelemetry/instrumentation/openai_agents/handler.py @@ -0,0 +1,451 @@ +"""OpenTelemetry handler for OpenAI Realtime API Sessions + +Translates raw server events from the OpenAI Realtime API into +OpenTelemetry spans and metrics following the GenAI semantic conventions. +Invoked from the ``_emit_event`` patch before listeners are dispatched. + +Spans: + * Session span -- ``gen_ai.operation.name = invoke_agent`` + * Response span -- ``gen_ai.operation.name = generate_content`` + * Tool execution span -- ``gen_ai.operation.name = execute_tool`` + +Metrics (histograms): + * ``gen_ai.client.token.usage`` + * ``gen_ai.client.operation.duration`` + * ``gen_ai.server.time_to_first_token`` +""" + +from __future__ import annotations + +import logging +import time +from typing import Any + +from agents.realtime.model_events import RealtimeModelEvent +from agents.realtime.openai_realtime import get_server_event_type_adapter +from openai.types.realtime import ( + ConversationItemAdded, + InputAudioBufferSpeechStartedEvent, + InputAudioBufferSpeechStoppedEvent, + RealtimeConversationItemFunctionCallOutput, + RealtimeErrorEvent, + RealtimeResponseUsage, + RealtimeSessionCreateRequest, + ResponseCreatedEvent, + ResponseDoneEvent, + ResponseFunctionCallArgumentsDoneEvent, + SessionCreatedEvent, +) + +from opentelemetry import metrics, trace +from opentelemetry.context import Context +from opentelemetry.instrumentation.openai_agents._constants import ( + ERROR_TYPE, + EXECUTE_TOOL, + GEN_AI_AGENT_NAME, + GEN_AI_OPERATION_NAME, + GEN_AI_PROVIDER_NAME, + GEN_AI_REQUEST_MODEL, + GEN_AI_RESPONSE_FINISH_REASONS, + GEN_AI_RESPONSE_ID, + GEN_AI_RESPONSE_MODEL, + GEN_AI_RESPONSE_STATUS, + GEN_AI_SESSION_ID, + GEN_AI_TOKEN_TYPE, + GEN_AI_TOOL_CALL_ID, + GEN_AI_TOOL_NAME, + GEN_AI_TOOL_TYPE, + GEN_AI_USAGE_INPUT_TOKENS, + GEN_AI_USAGE_OUTPUT_TOKENS, + GENERATE_CONTENT, + INVOKE_AGENT, + METER_NAME, + METER_VERSION, + SERVER_ADDRESS, + SERVER_PORT, +) +from opentelemetry.semconv._incubating.metrics import gen_ai_metrics +from opentelemetry.trace import Span, SpanKind, StatusCode, get_current_span +from opentelemetry.trace.propagation import set_span_in_context +from opentelemetry.util.genai.instruments import ( + create_duration_histogram, + create_token_histogram, +) + +logger = logging.getLogger(__name__) + +_UNKNOWN = "unknown" + +class RealtimeEventType: + """Server event ``type`` strings from the OpenAI Realtime API.""" + + SESSION_CREATED = "session.created" + SESSION_UPDATED = "session.updated" + RESPONSE_CREATED = "response.created" + RESPONSE_DONE = "response.done" + FUNCTION_CALL = "response.function_call_arguments.done" + CONVERSATION_ITEM_ADDED = "conversation.item.added" + AUDIO_DELTA = "response.output_audio.delta" + TRANSCRIPT_DELTA = "response.output_audio_transcript.delta" + TRANSCRIPT_DONE = "response.output_audio_transcript.done" + TEXT_DELTA = "response.text.delta" + SPEECH_STARTED = "input_audio_buffer.speech_started" + SPEECH_STOPPED = "input_audio_buffer.speech_stopped" + INPUT_TRANSCRIPTION_COMPLETED = "conversation.item.input_audio_transcription.completed" + INPUT_TRANSCRIPTION_FAILED = "conversation.item.input_audio_transcription.failed" + ERROR = "error" + +class SpanName: + """Display names used when creating OpenTelemetry spans.""" + + SESSION_CREATED = "realtime_session" + AGENT_RESPONSE = "agent.response" + FUNCTION_CALL = "execute_tool" + USER_INPUT = "user.input" + + +class RealtimeTelemetryHandler: + def __init__( + self, + *, + tracer: trace.Tracer, + server_address: str | None = None, + server_port: int | None = None, + provider_name: str | None = None, + agent_name: str | None = None, + ) -> None: + self._tracer = tracer + self._server_address = server_address + self._server_port = server_port + self.agent_name: str | None = agent_name or "realtime_agent" + self.provider_name = provider_name or "openai" + self._root_context: Context = set_span_in_context(get_current_span()) + self._spans: dict[str, Span] = {} + self._session_id: str | None = None + self._model: str | None = None + self._response_start_times: dict[str, float] = {} + self._first_token_recorded: set[str] = set() + + self._init_metrics() + + def _init_metrics(self) -> None: + """Create metric instruments using shared util-genai histogram factories.""" + meter = metrics.get_meter(METER_NAME, METER_VERSION) + self._token_usage_histogram = create_token_histogram(meter) + self._operation_duration_histogram = create_duration_histogram(meter) + self._ttft_histogram = meter.create_histogram( + name=gen_ai_metrics.GEN_AI_SERVER_TIME_TO_FIRST_TOKEN, + description="Time to generate first token for successful responses", + unit="s", + ) + + def _context_for(self, key: str | None = None) -> Context: + """Return context for the first matching span, falling back to session then root.""" + if key is not None: + if span := self._spans.get(key): + return set_span_in_context(span) + if session := self._spans.get("session"): + return set_span_in_context(session) + return self._root_context + + def _end_span(self, key: str) -> None: + span = self._spans.pop(key, None) + if span is not None: + try: + if span.is_recording(): + span.end() + except Exception: + logger.debug("Failed to end span for key %s", key) + + def cleanup(self) -> None: + for key in list(self._spans): + self._end_span(key) + + def handle_event(self, event: RealtimeModelEvent) -> Context: + if event.type != "raw_server_event": + match event.type: + case "function_call": + return self._context_for(event.call_id) + case _: + return self._context_for() + + try: + parsed = get_server_event_type_adapter().validate_python(event.data) + except Exception: + logger.debug("Failed to parse realtime server event", exc_info=True) + return self._context_for() + + match parsed.type: + case RealtimeEventType.SESSION_CREATED: + return self._handle_session_created(parsed) + case RealtimeEventType.SPEECH_STARTED: + return self._handle_speech_started(parsed) + case RealtimeEventType.RESPONSE_CREATED: + return self._handle_response_created(parsed) + case RealtimeEventType.SPEECH_STOPPED: + return self._handle_speech_stopped(parsed) + case RealtimeEventType.RESPONSE_DONE: + return self._handle_response_done(parsed) + case RealtimeEventType.FUNCTION_CALL: + return self._handle_function_call_arguments_done(parsed) + case RealtimeEventType.CONVERSATION_ITEM_ADDED: + return self._handle_conversation_item_added(parsed) + case ( + RealtimeEventType.AUDIO_DELTA + | RealtimeEventType.TRANSCRIPT_DELTA + | RealtimeEventType.TEXT_DELTA + ): + self._maybe_record_ttft(parsed.response_id) + return self._context_for() + + case RealtimeEventType.ERROR: + return self._handle_error(parsed) + case _: + return self._context_for() + + def _handle_session_created(self, event: SessionCreatedEvent) -> Context: + span = self._tracer.start_span( + SpanName.SESSION_CREATED, context=self._root_context, kind=SpanKind.INTERNAL + ) + self._spans["session"] = span + + session_id = getattr(event.session, "id", None) + if session_id: + self._session_id = session_id + span.set_attribute(GEN_AI_SESSION_ID, session_id) + + span.set_attribute(GEN_AI_OPERATION_NAME, INVOKE_AGENT) + span.set_attribute(GEN_AI_PROVIDER_NAME, self.provider_name) + if self._server_address is not None: + span.set_attribute(SERVER_ADDRESS, self._server_address) + if self._server_port is not None: + span.set_attribute(SERVER_PORT, self._server_port) + if self.agent_name is not None: + span.set_attribute(GEN_AI_AGENT_NAME, self.agent_name) + + if isinstance(event.session, RealtimeSessionCreateRequest): + if event.session.model is not None: + self._model = event.session.model + span.set_attribute(GEN_AI_REQUEST_MODEL, self._model) + return set_span_in_context(span) + + + def _handle_speech_started( + self, event: InputAudioBufferSpeechStartedEvent + ) -> Context: + ctx = self._context_for() + span = self._tracer.start_span( + SpanName.USER_INPUT, context=ctx, kind=SpanKind.INTERNAL + ) + item_id = event.item_id + self._spans[item_id] = span + + span.set_attribute(GEN_AI_OPERATION_NAME, SpanName.USER_INPUT) + span.set_attribute(GEN_AI_PROVIDER_NAME, self.provider_name) + return set_span_in_context(span) + + def _handle_speech_stopped( + self, event: InputAudioBufferSpeechStoppedEvent + ) -> Context: + self._end_span(event.item_id) + return self._context_for() + + + def _handle_response_created(self, event: ResponseCreatedEvent) -> Context: + ctx = self._context_for() + span = self._tracer.start_span( + SpanName.AGENT_RESPONSE, context=ctx, kind=SpanKind.INTERNAL + ) + response = event.response + response_id = response.id or _UNKNOWN + self._spans[response_id] = span + + span.set_attribute(GEN_AI_OPERATION_NAME, SpanName.AGENT_RESPONSE) + span.set_attribute(GEN_AI_PROVIDER_NAME, self.provider_name) + span.set_attribute(GEN_AI_RESPONSE_ID, response_id) + if self._model: + span.set_attribute(GEN_AI_REQUEST_MODEL, self._model) + + self._response_start_times[response_id] = time.monotonic() + return set_span_in_context(span) + + def _handle_response_done(self, event: ResponseDoneEvent) -> Context: + response = event.response + response_id = response.id or _UNKNOWN + span = self._spans.get(response_id) + + if span: + if response.status: + span.set_attribute(GEN_AI_RESPONSE_STATUS, response.status) + if self._model: + span.set_attribute(GEN_AI_RESPONSE_MODEL, self._model) + + if status_details := response.status_details: + if status_details.reason: + span.set_attribute( + GEN_AI_RESPONSE_FINISH_REASONS, + status_details.reason, + ) + if status_details.error: + err = status_details.error + span.set_status( + StatusCode.ERROR, + f"{err.type}: {err.code}" + if err.code + else str(err.type), + ) + span.set_attribute( + ERROR_TYPE, err.type or _UNKNOWN + ) + + if response.status in ("failed", "incomplete") and ( + not response.status_details + or not response.status_details.error + ): + span.set_status( + StatusCode.ERROR, f"Response {response.status}" + ) + + if usage := response.usage: + span.set_attributes(_extract_token_attributes(usage)) + self._record_token_usage_metric(usage) + + # Operation duration metric + start_time = self._response_start_times.pop(response_id, None) + self._first_token_recorded.discard(response_id) + if start_time is not None: + duration = time.monotonic() - start_time + attrs: dict[str, str] = { + GEN_AI_OPERATION_NAME: "realtime_session", + GEN_AI_PROVIDER_NAME: self.provider_name, + } + if self._model: + attrs[GEN_AI_REQUEST_MODEL] = self._model + if response.status and response.status in ("failed", "incomplete"): + attrs[ERROR_TYPE] = response.status + self._operation_duration_histogram.record(duration, attrs) + + + self._end_span(response_id) + return self._context_for() + + def _handle_function_call_arguments_done( + self, event: ResponseFunctionCallArgumentsDoneEvent + ) -> Context: + ctx = self._context_for(event.response_id) + function_name = event.name + call_id = event.call_id + + span = self._tracer.start_span( + f"{SpanName.FUNCTION_CALL} {function_name}", + context=ctx, + kind=SpanKind.INTERNAL, + ) + self._spans[call_id] = span + + span.set_attribute(GEN_AI_OPERATION_NAME, EXECUTE_TOOL) + span.set_attribute(GEN_AI_PROVIDER_NAME, self.provider_name) + span.set_attribute(GEN_AI_TOOL_CALL_ID, call_id) + span.set_attribute(GEN_AI_TOOL_NAME, function_name) + span.set_attribute(GEN_AI_TOOL_TYPE, "function") + return set_span_in_context(span) + + def _handle_conversation_item_added( + self, event: ConversationItemAdded + ) -> Context: + if isinstance(event.item, RealtimeConversationItemFunctionCallOutput): + self._end_span(event.item.call_id) + return self._context_for() + + + def _handle_error(self, event: RealtimeErrorEvent) -> Context: + error = event.error + logger.error( + "Realtime API error: [%s] %s (code=%s, event_id=%s)", + error.type, + error.message, + error.code, + error.event_id, + ) + span = self._spans.get("session") + if span: + span.set_status(StatusCode.ERROR, error.message) + span.set_attribute(ERROR_TYPE, error.type or _UNKNOWN) + if error.event_id is not None: + span.set_attribute("gen_ai.error.event_id", error.event_id) + return self._context_for() + + + def _record_token_usage_metric(self, usage: RealtimeResponseUsage) -> None: + input_tokens = usage.input_tokens + output_tokens = usage.output_tokens + + if input_tokens is None and usage.input_token_details: + input_tokens = (usage.input_token_details.audio_tokens or 0) + ( + usage.input_token_details.text_tokens or 0 + ) + if output_tokens is None and usage.output_token_details: + output_tokens = (usage.output_token_details.audio_tokens or 0) + ( + usage.output_token_details.text_tokens or 0 + ) + + base_attrs: dict[str, Any] = { + GEN_AI_OPERATION_NAME: GENERATE_CONTENT, + GEN_AI_PROVIDER_NAME: self.provider_name, + } + if self._server_address is not None: + base_attrs[SERVER_ADDRESS] = self._server_address + if self._server_port is not None: + base_attrs[SERVER_PORT] = self._server_port + if self._model: + base_attrs[GEN_AI_REQUEST_MODEL] = self._model + base_attrs[GEN_AI_RESPONSE_MODEL] = self._model + + if input_tokens is not None: + self._token_usage_histogram.record( + input_tokens, {**base_attrs, GEN_AI_TOKEN_TYPE: "input"} + ) + if output_tokens is not None: + self._token_usage_histogram.record( + output_tokens, {**base_attrs, GEN_AI_TOKEN_TYPE: "output"} + ) + + def _maybe_record_ttft(self, response_id: str) -> None: + """Record time-to-first-token once per response on the first content delta.""" + if response_id in self._first_token_recorded: + return + start_time = self._response_start_times.get(response_id) + if start_time is None: + return + self._first_token_recorded.add(response_id) + ttft = time.monotonic() - start_time + attrs: dict[str, str] = { + GEN_AI_OPERATION_NAME: "realtime_session", + GEN_AI_PROVIDER_NAME: self.provider_name, + } + if self._model: + attrs[GEN_AI_REQUEST_MODEL] = self._model + attrs[GEN_AI_RESPONSE_MODEL] = self._model + self._ttft_histogram.record(ttft, attrs) + + + +def _extract_token_attributes(usage: RealtimeResponseUsage) -> dict[str, Any]: + """Extract token usage attributes for span recording.""" + attrs: dict[str, Any] = {} + + if usage.input_tokens is not None: + attrs[GEN_AI_USAGE_INPUT_TOKENS] = usage.input_tokens + elif usage.input_token_details: + attrs[GEN_AI_USAGE_INPUT_TOKENS] = ( + usage.input_token_details.audio_tokens or 0 + ) + (usage.input_token_details.text_tokens or 0) + + if usage.output_tokens is not None: + attrs[GEN_AI_USAGE_OUTPUT_TOKENS] = usage.output_tokens + elif usage.output_token_details: + attrs[GEN_AI_USAGE_OUTPUT_TOKENS] = ( + usage.output_token_details.audio_tokens or 0 + ) + (usage.output_token_details.text_tokens or 0) + + return attrs diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/src/opentelemetry/instrumentation/openai_agents/span_processor.py b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/src/opentelemetry/instrumentation/openai_agents/span_processor.py index 55539a4931..0a04c5cb1f 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/src/opentelemetry/instrumentation/openai_agents/span_processor.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/src/opentelemetry/instrumentation/openai_agents/span_processor.py @@ -55,6 +55,52 @@ ) # type: ignore[assignment] from opentelemetry.context import attach, detach +from opentelemetry.instrumentation.openai_agents._constants import ( + GEN_AI_AGENT_DESCRIPTION, + GEN_AI_AGENT_ID, + GEN_AI_AGENT_NAME, + GEN_AI_CONVERSATION_ID, + GEN_AI_DATA_SOURCE_ID, + GEN_AI_EMBEDDINGS_DIMENSION_COUNT, + GEN_AI_GUARDRAIL_NAME, + GEN_AI_GUARDRAIL_TRIGGERED, + GEN_AI_HANDOFF_FROM_AGENT, + GEN_AI_HANDOFF_TO_AGENT, + GEN_AI_INPUT_MESSAGES, + GEN_AI_OPERATION_NAME, + GEN_AI_ORCHESTRATOR_AGENT_DEFINITIONS, + GEN_AI_OUTPUT_MESSAGES, + GEN_AI_OUTPUT_TYPE, + GEN_AI_PROVIDER_NAME, + GEN_AI_REQUEST_CHOICE_COUNT, + GEN_AI_REQUEST_ENCODING_FORMATS, + GEN_AI_REQUEST_FREQUENCY_PENALTY, + GEN_AI_REQUEST_MAX_TOKENS, + GEN_AI_REQUEST_MODEL, + GEN_AI_REQUEST_PRESENCE_PENALTY, + GEN_AI_REQUEST_SEED, + GEN_AI_REQUEST_STOP_SEQUENCES, + GEN_AI_REQUEST_TEMPERATURE, + GEN_AI_REQUEST_TOP_K, + GEN_AI_REQUEST_TOP_P, + GEN_AI_RESPONSE_FINISH_REASONS, + GEN_AI_RESPONSE_ID, + GEN_AI_RESPONSE_MODEL, + GEN_AI_SYSTEM_INSTRUCTIONS, + GEN_AI_SYSTEM_KEY, + GEN_AI_TOKEN_TYPE, + GEN_AI_TOOL_CALL_ARGUMENTS, + GEN_AI_TOOL_CALL_ID, + GEN_AI_TOOL_CALL_RESULT, + GEN_AI_TOOL_DEFINITIONS, + GEN_AI_TOOL_DESCRIPTION, + GEN_AI_TOOL_NAME, + GEN_AI_TOOL_TYPE, + GEN_AI_USAGE_INPUT_TOKENS, + GEN_AI_USAGE_OUTPUT_TOKENS, + METER_NAME, + METER_VERSION, +) from opentelemetry.metrics import Histogram, get_meter from opentelemetry.semconv._incubating.attributes import ( gen_ai_attributes as GenAIAttributes, @@ -72,8 +118,7 @@ ) from opentelemetry.util.types import AttributeValue -# Import all semantic convention constants -# ---- GenAI semantic convention helpers (embedded from constants.py) ---- +# ---- GenAI semantic convention helpers ---- def _enum_values(enum_cls) -> dict[str, str]: @@ -155,93 +200,6 @@ class GenAIEvaluationAttributes: EXPLANATION = "gen_ai.evaluation.explanation" -def _attr(name: str, fallback: str) -> str: - return getattr(GenAIAttributes, name, fallback) - - -GEN_AI_PROVIDER_NAME = _attr("GEN_AI_PROVIDER_NAME", "gen_ai.provider.name") -GEN_AI_OPERATION_NAME = _attr("GEN_AI_OPERATION_NAME", "gen_ai.operation.name") -GEN_AI_REQUEST_MODEL = _attr("GEN_AI_REQUEST_MODEL", "gen_ai.request.model") -GEN_AI_REQUEST_MAX_TOKENS = _attr( - "GEN_AI_REQUEST_MAX_TOKENS", "gen_ai.request.max_tokens" -) -GEN_AI_REQUEST_TEMPERATURE = _attr( - "GEN_AI_REQUEST_TEMPERATURE", "gen_ai.request.temperature" -) -GEN_AI_REQUEST_TOP_P = _attr("GEN_AI_REQUEST_TOP_P", "gen_ai.request.top_p") -GEN_AI_REQUEST_TOP_K = _attr("GEN_AI_REQUEST_TOP_K", "gen_ai.request.top_k") -GEN_AI_REQUEST_FREQUENCY_PENALTY = _attr( - "GEN_AI_REQUEST_FREQUENCY_PENALTY", "gen_ai.request.frequency_penalty" -) -GEN_AI_REQUEST_PRESENCE_PENALTY = _attr( - "GEN_AI_REQUEST_PRESENCE_PENALTY", "gen_ai.request.presence_penalty" -) -GEN_AI_REQUEST_CHOICE_COUNT = _attr( - "GEN_AI_REQUEST_CHOICE_COUNT", "gen_ai.request.choice.count" -) -GEN_AI_REQUEST_STOP_SEQUENCES = _attr( - "GEN_AI_REQUEST_STOP_SEQUENCES", "gen_ai.request.stop_sequences" -) -GEN_AI_REQUEST_ENCODING_FORMATS = _attr( - "GEN_AI_REQUEST_ENCODING_FORMATS", "gen_ai.request.encoding_formats" -) -GEN_AI_REQUEST_SEED = _attr("GEN_AI_REQUEST_SEED", "gen_ai.request.seed") -GEN_AI_RESPONSE_ID = _attr("GEN_AI_RESPONSE_ID", "gen_ai.response.id") -GEN_AI_RESPONSE_MODEL = _attr("GEN_AI_RESPONSE_MODEL", "gen_ai.response.model") -GEN_AI_RESPONSE_FINISH_REASONS = _attr( - "GEN_AI_RESPONSE_FINISH_REASONS", "gen_ai.response.finish_reasons" -) -GEN_AI_USAGE_INPUT_TOKENS = _attr( - "GEN_AI_USAGE_INPUT_TOKENS", "gen_ai.usage.input_tokens" -) -GEN_AI_USAGE_OUTPUT_TOKENS = _attr( - "GEN_AI_USAGE_OUTPUT_TOKENS", "gen_ai.usage.output_tokens" -) -GEN_AI_CONVERSATION_ID = _attr( - "GEN_AI_CONVERSATION_ID", "gen_ai.conversation.id" -) -GEN_AI_AGENT_ID = _attr("GEN_AI_AGENT_ID", "gen_ai.agent.id") -GEN_AI_AGENT_NAME = _attr("GEN_AI_AGENT_NAME", "gen_ai.agent.name") -GEN_AI_AGENT_DESCRIPTION = _attr( - "GEN_AI_AGENT_DESCRIPTION", "gen_ai.agent.description" -) -GEN_AI_TOOL_NAME = _attr("GEN_AI_TOOL_NAME", "gen_ai.tool.name") -GEN_AI_TOOL_TYPE = _attr("GEN_AI_TOOL_TYPE", "gen_ai.tool.type") -GEN_AI_TOOL_CALL_ID = _attr("GEN_AI_TOOL_CALL_ID", "gen_ai.tool.call.id") -GEN_AI_TOOL_DESCRIPTION = _attr( - "GEN_AI_TOOL_DESCRIPTION", "gen_ai.tool.description" -) -GEN_AI_OUTPUT_TYPE = _attr("GEN_AI_OUTPUT_TYPE", "gen_ai.output.type") -GEN_AI_SYSTEM_INSTRUCTIONS = _attr( - "GEN_AI_SYSTEM_INSTRUCTIONS", "gen_ai.system_instructions" -) -GEN_AI_INPUT_MESSAGES = _attr("GEN_AI_INPUT_MESSAGES", "gen_ai.input.messages") -GEN_AI_OUTPUT_MESSAGES = _attr( - "GEN_AI_OUTPUT_MESSAGES", "gen_ai.output.messages" -) -GEN_AI_DATA_SOURCE_ID = _attr("GEN_AI_DATA_SOURCE_ID", "gen_ai.data_source.id") - -# The semantic conventions currently expose multiple usage token attributes; we retain the -# completion/prompt aliases for backwards compatibility where used. -GEN_AI_USAGE_PROMPT_TOKENS = _attr( - "GEN_AI_USAGE_PROMPT_TOKENS", "gen_ai.usage.prompt_tokens" -) -GEN_AI_USAGE_COMPLETION_TOKENS = _attr( - "GEN_AI_USAGE_COMPLETION_TOKENS", "gen_ai.usage.completion_tokens" -) - -# Attributes not (yet) defined in the spec retain their literal values. -GEN_AI_TOOL_CALL_ARGUMENTS = "gen_ai.tool.call.arguments" -GEN_AI_TOOL_CALL_RESULT = "gen_ai.tool.call.result" -GEN_AI_TOOL_DEFINITIONS = "gen_ai.tool.definitions" -GEN_AI_ORCHESTRATOR_AGENT_DEFINITIONS = "gen_ai.orchestrator.agent.definitions" -GEN_AI_GUARDRAIL_NAME = "gen_ai.guardrail.name" -GEN_AI_GUARDRAIL_TRIGGERED = "gen_ai.guardrail.triggered" -GEN_AI_HANDOFF_FROM_AGENT = "gen_ai.handoff.from_agent" -GEN_AI_HANDOFF_TO_AGENT = "gen_ai.handoff.to_agent" -GEN_AI_EMBEDDINGS_DIMENSION_COUNT = "gen_ai.embeddings.dimension.count" -GEN_AI_TOKEN_TYPE = _attr("GEN_AI_TOKEN_TYPE", "gen_ai.token.type") - # ---- Normalization utilities (embedded from utils.py) ---- @@ -301,8 +259,6 @@ def normalize_output_type(output_type: Optional[str]) -> str: logger = logging.getLogger(__name__) -GEN_AI_SYSTEM_KEY = getattr(GenAIAttributes, "GEN_AI_SYSTEM", "gen_ai.system") - class ContentCaptureMode(Enum): """Controls whether sensitive content is recorded on spans, events, or both.""" @@ -531,9 +487,7 @@ def _get_server_attributes(self) -> dict[str, Any]: def _init_metrics(self): """Initialize metric instruments.""" - self._meter = get_meter( - "opentelemetry.instrumentation.openai_agents", "0.1.0" - ) + self._meter = get_meter(METER_NAME, METER_VERSION) # Operation duration histogram self._duration_histogram = self._meter.create_histogram(