diff --git a/py/plugins/firebase/src/genkit/plugins/firebase/__init__.py b/py/plugins/firebase/src/genkit/plugins/firebase/__init__.py index 2cd52a4ff6..d4f13ddeb6 100644 --- a/py/plugins/firebase/src/genkit/plugins/firebase/__init__.py +++ b/py/plugins/firebase/src/genkit/plugins/firebase/__init__.py @@ -17,6 +17,8 @@ """Firebase Plugin for Genkit.""" +from genkit.plugins.google_cloud.telemetry.tracing import add_gcp_telemetry + def package_name() -> str: """Get the package name for the Firebase plugin. @@ -27,4 +29,13 @@ def package_name() -> str: return 'genkit.plugins.firebase' -__all__ = ['package_name'] +def add_firebase_telemetry() -> None: + """Add Firebase telemetry export to Google Cloud Observability. + + Exports traces to Cloud Trace and metrics to Cloud Monitoring. + In development (GENKIT_ENV=dev), telemetry is disabled by default. + """ + add_gcp_telemetry(force_export=False) + + +__all__ = ['package_name', 'add_firebase_telemetry'] diff --git a/py/plugins/firebase/src/genkit/plugins/firebase/tests/test_telemetry.py b/py/plugins/firebase/src/genkit/plugins/firebase/tests/test_telemetry.py new file mode 100644 index 0000000000..d5e9955ca7 --- /dev/null +++ b/py/plugins/firebase/src/genkit/plugins/firebase/tests/test_telemetry.py @@ -0,0 +1,113 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for Firebase telemetry functionality.""" + +from unittest.mock import MagicMock, patch + +from opentelemetry.sdk.trace import ReadableSpan + +from genkit.plugins.firebase import add_firebase_telemetry +from genkit.plugins.google_cloud.telemetry.metrics import record_generate_metrics + + +def _create_model_span( + model_name: str = "gemini-pro", + path: str = "/{myflow,t:flow}", + output: str = '{"usage": {"inputTokens": 100, "outputTokens": 50}}', + is_ok: bool = True, + start_time: int = 1000000000, + end_time: int = 1500000000, +) -> MagicMock: + """Helper function to create a model action span for testing. + + Args: + model_name: The model name for genkit:name attribute + path: The genkit:path value + output: The genkit:output JSON string + is_ok: Whether the span status is ok + start_time: Span start time in nanoseconds + end_time: Span end time in nanoseconds + + Returns: + A mocked ReadableSpan with model action attributes + """ + mock_span = MagicMock(spec=ReadableSpan) + mock_span.attributes = { + "genkit:type": "action", + "genkit:metadata:subtype": "model", + "genkit:name": model_name, + "genkit:path": path, + "genkit:output": output, + } + mock_span.status.is_ok = is_ok + mock_span.start_time = start_time + mock_span.end_time = end_time + return mock_span + + +@patch("genkit.plugins.firebase.add_gcp_telemetry") +def test_firebase_telemetry_delegates_to_gcp(mock_add_gcp_telemetry): + """Test that Firebase telemetry delegates to GCP telemetry.""" + add_firebase_telemetry() + mock_add_gcp_telemetry.assert_called_once_with(force_export=False) + + +@patch("genkit.plugins.google_cloud.telemetry.metrics._output_tokens") +@patch("genkit.plugins.google_cloud.telemetry.metrics._input_tokens") +@patch("genkit.plugins.google_cloud.telemetry.metrics._latency") +@patch("genkit.plugins.google_cloud.telemetry.metrics._failures") +@patch("genkit.plugins.google_cloud.telemetry.metrics._requests") +def test_record_generate_metrics_with_model_action( + mock_requests, + mock_failures, + mock_latency, + mock_input_tokens, + mock_output_tokens, +): + """Test that metrics are recorded for model action spans with usage data.""" + # Setup mocks + mock_request_counter = MagicMock() + mock_latency_histogram = MagicMock() + mock_input_counter = MagicMock() + mock_output_counter = MagicMock() + + mock_requests.return_value = mock_request_counter + mock_failures.return_value = MagicMock() + mock_latency.return_value = mock_latency_histogram + mock_input_tokens.return_value = mock_input_counter + mock_output_tokens.return_value = mock_output_counter + + # Create test span using helper + mock_span = _create_model_span( + model_name="gemini-pro", + path="/{myflow,t:flow}", + output='{"usage": {"inputTokens": 100, "outputTokens": 50}}', + ) + + # Execute + record_generate_metrics(mock_span) + + # Verify dimensions + expected_dimensions = {"model": "gemini-pro", "source": "myflow", "error": "none"} + + # Verify requests counter + mock_request_counter.add.assert_called_once_with(1, expected_dimensions) + + # Verify latency (500ms = 1.5s - 1.0s) + mock_latency_histogram.record.assert_called_once_with(500.0, expected_dimensions) + + # Verify token counts + mock_input_counter.add.assert_called_once_with(100, expected_dimensions) + mock_output_counter.add.assert_called_once_with(50, expected_dimensions) diff --git a/py/plugins/google-cloud/pyproject.toml b/py/plugins/google-cloud/pyproject.toml index a4ce51d04e..55f6fd3ca1 100644 --- a/py/plugins/google-cloud/pyproject.toml +++ b/py/plugins/google-cloud/pyproject.toml @@ -19,6 +19,7 @@ classifiers = [ dependencies = [ "genkit", "opentelemetry-exporter-gcp-trace>=1.9.0", + "opentelemetry-exporter-gcp-monitoring>=1.9.0", "strenum>=0.4.15; python_version < '3.11'", ] description = "Genkit Google Cloud Plugin" diff --git a/py/plugins/google-cloud/src/genkit/plugins/google_cloud/__init__.py b/py/plugins/google-cloud/src/genkit/plugins/google_cloud/__init__.py index 8db5b68e81..6cd822e5ae 100644 --- a/py/plugins/google-cloud/src/genkit/plugins/google_cloud/__init__.py +++ b/py/plugins/google-cloud/src/genkit/plugins/google_cloud/__init__.py @@ -17,6 +17,8 @@ """Google Cloud Plugin for Genkit.""" +from .telemetry import add_gcp_telemetry + def package_name() -> str: """Get the package name for the Google Cloud plugin. @@ -27,4 +29,4 @@ def package_name() -> str: return 'genkit.plugins.google_cloud' -__all__ = ['package_name'] +__all__ = ['package_name', 'add_gcp_telemetry'] diff --git a/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/__init__.py b/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/__init__.py new file mode 100644 index 0000000000..75034428d7 --- /dev/null +++ b/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/__init__.py @@ -0,0 +1,21 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Telemetry exports for Google Cloud plugin.""" + +from .tracing import add_gcp_telemetry + +__all__ = ['add_gcp_telemetry'] diff --git a/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/metrics.py b/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/metrics.py new file mode 100644 index 0000000000..16307097dc --- /dev/null +++ b/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/metrics.py @@ -0,0 +1,220 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + +"""AI monitoring metrics for Genkit.""" + +import re + +import structlog +from opentelemetry import metrics +from opentelemetry.sdk.trace import ReadableSpan + +logger = structlog.get_logger(__name__) + +meter = metrics.get_meter('genkit') + + +def _metric(name: str, desc: str, unit: str = '1') -> tuple[str, str, str]: + """Create metric name with genkit/ai/ prefix. + + Args: + name: Metric name + desc: Metric description + unit: Metric unit (default: '1') + + Returns: + Tuple of (prefixed_name, description, unit) + """ + return f'genkit/ai/{name}', desc, unit + + +# Metric cache for lazy initialization +_metrics_cache: dict[str, metrics.Counter | metrics.Histogram] = {} + + +def _get_counter(name: str, desc: str, unit: str = '1') -> metrics.Counter: + """Get or create counter metric with lazy initialization. + + Args: + name: Metric name + desc: Metric description + unit: Metric unit (default: '1') + + Returns: + OpenTelemetry Counter metric + """ + if name not in _metrics_cache: + _metrics_cache[name] = meter.create_counter(name, description=desc, unit=unit) + return _metrics_cache[name] + + +def _get_histogram(name: str, desc: str, unit: str = '1') -> metrics.Histogram: + """Get or create histogram metric with lazy initialization. + + Args: + name: Metric name + desc: Metric description + unit: Metric unit (default: '1') + + Returns: + OpenTelemetry Histogram metric + """ + if name not in _metrics_cache: + _metrics_cache[name] = meter.create_histogram(name, description=desc, unit=unit) + return _metrics_cache[name] + + +# Metric definitions +def _requests() -> metrics.Counter: + return _get_counter(*_metric('generate/requests', 'Generate requests')) + + +def _failures() -> metrics.Counter: + return _get_counter(*_metric('generate/failures', 'Generate failures')) + + +def _latency() -> metrics.Histogram: + return _get_histogram(*_metric('generate/latency', 'Generate latency', 'ms')) + + +def _input_tokens() -> metrics.Counter: + return _get_counter(*_metric('generate/input/tokens', 'Input tokens')) + + +def _output_tokens() -> metrics.Counter: + return _get_counter(*_metric('generate/output/tokens', 'Output tokens')) + + +def _input_characters() -> metrics.Counter: + return _get_counter(*_metric('generate/input/characters', 'Input characters')) + + +def _output_characters() -> metrics.Counter: + return _get_counter(*_metric('generate/output/characters', 'Output characters')) + + +def _input_images() -> metrics.Counter: + return _get_counter(*_metric('generate/input/images', 'Input images')) + + +def _output_images() -> metrics.Counter: + return _get_counter(*_metric('generate/output/images', 'Output images')) + + +def _input_videos() -> metrics.Counter: + return _get_counter(*_metric('generate/input/videos', 'Input videos')) + + +def _output_videos() -> metrics.Counter: + return _get_counter(*_metric('generate/output/videos', 'Output videos')) + + +def _input_audio() -> metrics.Counter: + return _get_counter(*_metric('generate/input/audio', 'Input audio')) + + +def _output_audio() -> metrics.Counter: + return _get_counter(*_metric('generate/output/audio', 'Output audio')) + + +def record_generate_metrics(span: ReadableSpan) -> None: + """Record AI monitoring metrics from a model action span. + + Args: + span: OpenTelemetry span containing model execution data + """ + import json + + attrs = span.attributes + if not attrs: + return + + # Check if this is a model action + if attrs.get('genkit:type') != 'action' or attrs.get('genkit:metadata:subtype') != 'model': + return + + # Extract dimensions + model = str(attrs.get('genkit:name', ''))[:1000] + path = str(attrs.get('genkit:path', ''))[:1000] + source = _extract_feature_name(path) + is_error = not span.status.is_ok + error = 'error' if is_error else 'none' + + dimensions = {'model': model, 'source': source, 'error': error} + + try: + _requests().add(1, dimensions) + if is_error: + _failures().add(1, dimensions) + + # Latency + latency_ms = None + if span.end_time and span.start_time: + latency_ms = (span.end_time - span.start_time) / 1_000_000 + _latency().record(latency_ms, dimensions) + + usage = {} + output_json = attrs.get('genkit:output') + if output_json and isinstance(output_json, str): + try: + output_data = json.loads(output_json) + usage = output_data.get('usage', {}) + except (json.JSONDecodeError, AttributeError): + pass + + usage_metrics = { + 'inputTokens': _input_tokens, + 'outputTokens': _output_tokens, + 'inputCharacters': _input_characters, + 'outputCharacters': _output_characters, + 'inputImages': _input_images, + 'outputImages': _output_images, + 'inputVideos': _input_videos, + 'outputVideos': _output_videos, + 'inputAudio': _input_audio, + 'outputAudio': _output_audio, + } + + for key, metric_fn in usage_metrics.items(): + value = usage.get(key) + if value is not None: + try: + metric_fn().add(int(value), dimensions) + except (ValueError, TypeError): + pass + + except Exception as e: + logger.warning('Error recording metrics', error=str(e)) + + +def _extract_feature_name(path: str) -> str: + """Extract feature name from Genkit action path. + + Args: + path: Genkit action path in format '/{name,t:type}' or '/{outer,t:flow}/{inner,t:flow}' + + Returns: + Extracted feature name or '' if path cannot be parsed + """ + if not path: + return '' + + parts = path.split('/') + if len(parts) < 2: + return '' + + match = re.match(r'\{([^,}]+)', parts[1]) + return match.group(1) if match else '' diff --git a/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/tracing.py b/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/tracing.py index 3fdd91c717..75532be278 100644 --- a/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/tracing.py +++ b/py/plugins/google-cloud/src/genkit/plugins/google_cloud/telemetry/tracing.py @@ -25,19 +25,30 @@ - A custom span exporter for sending trace data to a telemetry GCP server """ +import logging +import uuid from collections.abc import Sequence import structlog from google.api_core import exceptions as core_exceptions, retry as retries from google.cloud.trace_v2 import BatchWriteSpansRequest +from opentelemetry import metrics +from opentelemetry.exporter.cloud_monitoring import CloudMonitoringMetricsExporter from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter -from opentelemetry.sdk.trace import ReadableSpan -from opentelemetry.sdk.trace.export import ( - SpanExportResult, +from opentelemetry.resourcedetector.gcp_resource_detector import ( + GoogleCloudResourceDetector, ) +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader +from opentelemetry.sdk.resources import SERVICE_INSTANCE_ID, SERVICE_NAME, Resource +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.export import SpanExportResult +from genkit.core.environment import is_dev_environment from genkit.core.tracing import add_custom_exporter +from .metrics import record_generate_metrics + logger = structlog.get_logger(__name__) @@ -67,6 +78,9 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: server-side success). """ try: + for span in spans: + record_generate_metrics(span) + self.client.batch_write_spans( request=BatchWriteSpansRequest( name=f'projects/{self.project_id}', @@ -82,37 +96,45 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: deadline=120.0, ), ) - # pylint: disable=broad-except except Exception as ex: logger.error('Error while writing to Cloud Trace', exc_info=ex) return SpanExportResult.FAILURE return SpanExportResult.SUCCESS - def add_tracer_attributes(self, spans: Sequence[ReadableSpan]) -> Sequence[ReadableSpan]: - """Adds the instrumentation library attribute. - Args: - spans: Sequence of spans to modify. +def add_gcp_telemetry(force_export: bool = True) -> None: + """Configure GCP telemetry export for traces and metrics. - Returns: - Sequence of spans modified. - """ - modified_spans: list[ReadableSpan] = [] - - for span in spans: - modified_spans.append( - span.attributes.update({ - 'instrumentationLibrary': { - 'name': 'genkit-tracer', - 'version': 'v1', - }, - }) - ) + Args: + force_export: Export regardless of environment. Defaults to True. + """ + should_export = force_export or not is_dev_environment() + if not should_export: + return + + add_custom_exporter(GenkitGCPExporter(), 'gcp_telemetry_server') - return modified_spans + try: + resource = Resource.create({ + SERVICE_NAME: 'genkit', + SERVICE_INSTANCE_ID: str(uuid.uuid4()), + }) + # Suppress detector warnings during GCP resource detection + detector_logger = logging.getLogger('opentelemetry.resourcedetector.gcp_resource_detector') + original_level = detector_logger.level + detector_logger.setLevel(logging.ERROR) -def add_gcp_telemetry() -> None: - """Inits and adds GCP telemetry exporter.""" - add_custom_exporter(GenkitGCPExporter(), 'gcp_telemetry_server') + try: + resource = resource.merge(GoogleCloudResourceDetector().detect()) + finally: + detector_logger.setLevel(original_level) + + metric_reader = PeriodicExportingMetricReader( + exporter=CloudMonitoringMetricsExporter(), + export_interval_millis=60000, + ) + metrics.set_meter_provider(MeterProvider(metric_readers=[metric_reader], resource=resource)) + except Exception as e: + logger.error('Failed to configure metrics exporter', error=str(e)) diff --git a/py/samples/firestore-retreiver/src/main.py b/py/samples/firestore-retreiver/src/main.py index bce1f3e13f..4df39b3a21 100644 --- a/py/samples/firestore-retreiver/src/main.py +++ b/py/samples/firestore-retreiver/src/main.py @@ -19,6 +19,7 @@ from google.cloud.firestore_v1.base_vector_query import DistanceMeasure from genkit.ai import Genkit +from genkit.plugins.firebase import add_firebase_telemetry from genkit.plugins.firebase.firestore import ( FirestoreVectorStore, firestore_action_name, @@ -32,6 +33,9 @@ # Important: use the same embedding model for indexing and retrieval. EMBEDDING_MODEL = 'vertexai/text-embedding-004' +# Add Firebase telemetry (metrics, logs, traces) +add_firebase_telemetry() + firestore_client = firestore.Client() ai = Genkit( diff --git a/py/samples/google-genai-hello/src/google_genai_hello.py b/py/samples/google-genai-hello/src/google_genai_hello.py index 89cd25bdb4..2d1a05cfd7 100644 --- a/py/samples/google-genai-hello/src/google_genai_hello.py +++ b/py/samples/google-genai-hello/src/google_genai_hello.py @@ -38,10 +38,12 @@ | Pydantic for Structured Output Schema | `RpgCharacter` | | Unconstrained Structured Output | `generate_character_unconstrained` | | Multi-modal Output Configuration | `generate_images` | -| GCP otel tracing | `add_gcp_telemetry()` | +| GCP Telemetry (Traces and Metrics) | `add_gcp_telemetry()` | """ +import os + import structlog from pydantic import BaseModel, Field @@ -52,9 +54,7 @@ MetricConfig, PluginOptions, ) -from genkit.plugins.google_cloud.telemetry.tracing import ( - add_gcp_telemetry, -) +from genkit.plugins.google_cloud import add_gcp_telemetry from genkit.plugins.google_genai import ( EmbeddingTaskType, GeminiConfigSchema, @@ -185,6 +185,14 @@ async def say_hi(name: str): resp = await ai.generate( prompt=f'hi {name}', ) + + await logger.ainfo( + 'generation_response', + has_usage=hasattr(resp, 'usage'), + usage_dict=resp.usage.model_dump() if hasattr(resp, 'usage') and resp.usage else None, + text_length=len(resp.text), + ) + return resp.text