From 5c760ed6e684dc4048ac5d5943683fdac18cfba0 Mon Sep 17 00:00:00 2001 From: Narendranath Gogineni Date: Tue, 21 Jan 2025 14:14:22 +0530 Subject: [PATCH] feat: auto-instrumentation initial commit for instrumentation using monkey patching and open telemetry formatting and linting Do not install dependencies for opentelemetry by default version locking make changes for async client feat: instrumentation for litellm and openai revert log changes langgraph instrumentation remove openai instrumentation fix missing import formatting minor changes minor fixes fixes per comments --- Makefile | 5 +- portkey_ai/api_resources/apis/logger.py | 5 +- portkey_ai/api_resources/base_client.py | 4 ++ portkey_ai/api_resources/client.py | 32 +++++++++ .../api_resources/instrumentation/__init__.py | 29 ++++++++ .../instrumentation/crewai/__init__.py | 3 + .../instrumentation/crewai/instrumentation.py | 68 ++++++++++++++++++ .../instrumentation/crewai/patch.py | 50 +++++++++++++ .../instrumentation/langgraph/__init__.py | 5 ++ .../langgraph/instrumentation.py | 52 ++++++++++++++ .../instrumentation/langgraph/patch.py | 46 ++++++++++++ .../instrumentation/litellm/__init__.py | 5 ++ .../litellm/instrumentation.py | 59 ++++++++++++++++ .../instrumentation/litellm/patch.py | 57 +++++++++++++++ .../instrumentation/openai/__init__.py | 5 ++ .../instrumentation/openai/instrumentation.py | 59 ++++++++++++++++ .../instrumentation/openai/patch.py | 44 ++++++++++++ .../instrumentation/portkey_span_exporter.py | 70 +++++++++++++++++++ .../api_resources/instrumentation/utils.py | 38 ++++++++++ portkey_ai/api_resources/utils.py | 1 + portkey_ai/utils/__init__.py | 4 ++ portkey_ai/utils/hashing_utils.py | 8 +++ portkey_ai/utils/json_utils.py | 33 +++++++++ setup.cfg | 4 ++ 24 files changed, 682 insertions(+), 4 deletions(-) create mode 100644 portkey_ai/api_resources/instrumentation/__init__.py create mode 100644 portkey_ai/api_resources/instrumentation/crewai/__init__.py create mode 100644 portkey_ai/api_resources/instrumentation/crewai/instrumentation.py create mode 100644 portkey_ai/api_resources/instrumentation/crewai/patch.py create mode 100644 portkey_ai/api_resources/instrumentation/langgraph/__init__.py create mode 100644 portkey_ai/api_resources/instrumentation/langgraph/instrumentation.py create mode 100644 portkey_ai/api_resources/instrumentation/langgraph/patch.py create mode 100644 portkey_ai/api_resources/instrumentation/litellm/__init__.py create mode 100644 portkey_ai/api_resources/instrumentation/litellm/instrumentation.py create mode 100644 portkey_ai/api_resources/instrumentation/litellm/patch.py create mode 100644 portkey_ai/api_resources/instrumentation/openai/__init__.py create mode 100644 portkey_ai/api_resources/instrumentation/openai/instrumentation.py create mode 100644 portkey_ai/api_resources/instrumentation/openai/patch.py create mode 100644 portkey_ai/api_resources/instrumentation/portkey_span_exporter.py create mode 100644 portkey_ai/api_resources/instrumentation/utils.py create mode 100644 portkey_ai/utils/__init__.py create mode 100644 portkey_ai/utils/hashing_utils.py create mode 100644 portkey_ai/utils/json_utils.py diff --git a/Makefile b/Makefile index 9a19528d..64c5ffad 100644 --- a/Makefile +++ b/Makefile @@ -35,4 +35,7 @@ langchain_callback: pip install -e ".[langchain_callback]" llama_index_callback: - pip install -e ".[llama_index_callback]" \ No newline at end of file + pip install -e ".[llama_index_callback]" + +instrumentation: + pip install -e ".[opentelemetry]" \ No newline at end of file diff --git a/portkey_ai/api_resources/apis/logger.py b/portkey_ai/api_resources/apis/logger.py index 76c2ff62..859c1740 100644 --- a/portkey_ai/api_resources/apis/logger.py +++ b/portkey_ai/api_resources/apis/logger.py @@ -8,8 +8,7 @@ class Logger: def __init__( - self, - api_key: Optional[str] = None, + self, api_key: Optional[str] = None, base_url: Optional[str] = None ) -> None: api_key = api_key or os.getenv("PORTKEY_API_KEY") if api_key is None: @@ -20,7 +19,7 @@ def __init__( "x-portkey-api-key": api_key, } - self.url = PORTKEY_BASE_URL + "/logs" + self.url = (base_url or PORTKEY_BASE_URL) + "/logs" def log( self, diff --git a/portkey_ai/api_resources/base_client.py b/portkey_ai/api_resources/base_client.py index dd70adc9..8910c68a 100644 --- a/portkey_ai/api_resources/base_client.py +++ b/portkey_ai/api_resources/base_client.py @@ -63,6 +63,7 @@ def __init__( cache_force_refresh: Optional[bool] = None, custom_host: Optional[str] = None, forward_headers: Optional[List[str]] = None, + instrumentation: Optional[bool] = None, openai_project: Optional[str] = None, openai_organization: Optional[str] = None, aws_secret_access_key: Optional[str] = None, @@ -97,6 +98,7 @@ def __init__( self.cache_force_refresh = cache_force_refresh self.custom_host = custom_host self.forward_headers = forward_headers + self.instrumentation = instrumentation or False self.openai_project = openai_project self.openai_organization = openai_organization self.aws_secret_access_key = aws_secret_access_key @@ -709,6 +711,7 @@ def __init__( cache_force_refresh: Optional[bool] = None, custom_host: Optional[str] = None, forward_headers: Optional[List[str]] = None, + instrumentation: Optional[bool] = None, openai_project: Optional[str] = None, openai_organization: Optional[str] = None, aws_secret_access_key: Optional[str] = None, @@ -743,6 +746,7 @@ def __init__( self.cache_force_refresh = cache_force_refresh self.custom_host = custom_host self.forward_headers = forward_headers + self.instrumentation = instrumentation self.openai_project = openai_project self.openai_organization = openai_organization self.aws_secret_access_key = aws_secret_access_key diff --git a/portkey_ai/api_resources/client.py b/portkey_ai/api_resources/client.py index 01c6bc89..69bc0374 100644 --- a/portkey_ai/api_resources/client.py +++ b/portkey_ai/api_resources/client.py @@ -63,6 +63,7 @@ def __init__( cache_force_refresh: Optional[bool] = None, custom_host: Optional[str] = None, forward_headers: Optional[List[str]] = None, + instrumentation: Optional[bool] = None, openai_project: Optional[str] = None, openai_organization: Optional[str] = None, aws_secret_access_key: Optional[str] = None, @@ -98,6 +99,7 @@ def __init__( cache_force_refresh=cache_force_refresh, custom_host=custom_host, forward_headers=forward_headers, + instrumentation=instrumentation, openai_project=openai_project, openai_organization=openai_organization, aws_secret_access_key=aws_secret_access_key, @@ -152,6 +154,18 @@ def __init__( self.logs = apis.Logs(self) self.beta = self.beta(self) # type: ignore + if self.instrumentation: + try: + from portkey_ai.api_resources.instrumentation import ( + initialize_instrumentation, + ) + except ImportError: + raise ImportError( + """Please install opentelemetry for instrumentation, + you can use `make instrumentation` to install the dependencies""" + ) + initialize_instrumentation(api_key=self.api_key, base_url=self.base_url) + def copy( self, *, @@ -168,6 +182,7 @@ def copy( cache_force_refresh: Optional[bool] = None, custom_host: Optional[str] = None, forward_headers: Optional[List[str]] = None, + instrumentation: Optional[bool] = None, openai_project: Optional[str] = None, openai_organization: Optional[str] = None, aws_secret_access_key: Optional[str] = None, @@ -203,6 +218,7 @@ def copy( cache_force_refresh=cache_force_refresh or self.cache_force_refresh, custom_host=custom_host or self.custom_host, forward_headers=forward_headers or self.forward_headers, + instrumentation=instrumentation or self.instrumentation, openai_project=openai_project or self.openai_project, openai_organization=openai_organization or self.openai_organization, aws_secret_access_key=aws_secret_access_key or self.aws_secret_access_key, @@ -287,6 +303,7 @@ def __init__( cache_force_refresh: Optional[bool] = None, custom_host: Optional[str] = None, forward_headers: Optional[List[str]] = None, + instrumentation: Optional[bool] = None, openai_project: Optional[str] = None, openai_organization: Optional[str] = None, aws_secret_access_key: Optional[str] = None, @@ -322,6 +339,7 @@ def __init__( cache_force_refresh=cache_force_refresh, custom_host=custom_host, forward_headers=forward_headers, + instrumentation=instrumentation, openai_project=openai_project, openai_organization=openai_organization, aws_secret_access_key=aws_secret_access_key, @@ -376,6 +394,18 @@ def __init__( self.logs = apis.AsyncLogs(self) self.beta = self.beta(self) # type: ignore + if self.instrumentation: + try: + from portkey_ai.api_resources.instrumentation import ( + initialize_instrumentation, + ) + except ImportError: + raise ImportError( + """Please install opentelemetry for instrumentation, + you can use `make instrumentation` to install the dependencies""" + ) + initialize_instrumentation(api_key=self.api_key, base_url=self.base_url) + def copy( self, *, @@ -392,6 +422,7 @@ def copy( cache_force_refresh: Optional[bool] = None, custom_host: Optional[str] = None, forward_headers: Optional[List[str]] = None, + instrumentation: Optional[bool] = None, openai_project: Optional[str] = None, openai_organization: Optional[str] = None, aws_secret_access_key: Optional[str] = None, @@ -427,6 +458,7 @@ def copy( cache_force_refresh=cache_force_refresh or self.cache_force_refresh, custom_host=custom_host or self.custom_host, forward_headers=forward_headers or self.forward_headers, + instrumentation=instrumentation or self.instrumentation, openai_project=openai_project or self.openai_project, openai_organization=openai_organization or self.openai_organization, aws_secret_access_key=aws_secret_access_key or self.aws_secret_access_key, diff --git a/portkey_ai/api_resources/instrumentation/__init__.py b/portkey_ai/api_resources/instrumentation/__init__.py new file mode 100644 index 00000000..fa88ab94 --- /dev/null +++ b/portkey_ai/api_resources/instrumentation/__init__.py @@ -0,0 +1,29 @@ +import importlib +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor # type: ignore [attr-defined] + +from .crewai import CrewAIInstrumentor +from .litellm import LitellmInstrumentor +from .portkey_span_exporter import PortkeySpanExporter +from .langgraph import LanggraphInstrumentor + +__all__ = ["initialize_instrumentation"] + +package_instrumentor_map: dict[str, BaseInstrumentor] = { + "crewai": CrewAIInstrumentor, + "litellm": LitellmInstrumentor, + "langgraph": LanggraphInstrumentor, +} + + +def initialize_instrumentation(api_key: str, base_url: str): + tracer_provider = TracerProvider() + exporter = PortkeySpanExporter(api_key=api_key, base_url=base_url) + trace.set_tracer_provider(tracer_provider) + tracer_provider.add_span_processor(BatchSpanProcessor(exporter)) + for package, instrumentor in package_instrumentor_map.items(): + if importlib.util.find_spec(package): + instrumentor().instrument() + print(f"Portkey: {package} Instrumentation initialized") diff --git a/portkey_ai/api_resources/instrumentation/crewai/__init__.py b/portkey_ai/api_resources/instrumentation/crewai/__init__.py new file mode 100644 index 00000000..e116f65e --- /dev/null +++ b/portkey_ai/api_resources/instrumentation/crewai/__init__.py @@ -0,0 +1,3 @@ +from .instrumentation import CrewAIInstrumentor + +__all__ = ["CrewAIInstrumentor"] diff --git a/portkey_ai/api_resources/instrumentation/crewai/instrumentation.py b/portkey_ai/api_resources/instrumentation/crewai/instrumentation.py new file mode 100644 index 00000000..fc71e215 --- /dev/null +++ b/portkey_ai/api_resources/instrumentation/crewai/instrumentation.py @@ -0,0 +1,68 @@ +import importlib.metadata +from typing import Any, Collection +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor # type: ignore [attr-defined] +from opentelemetry.trace import get_tracer +from wrapt import wrap_function_wrapper + +from portkey_ai.api_resources.instrumentation.crewai.patch import patch_crew + + +class CrewAIInstrumentor(BaseInstrumentor): + methods_to_patch = [ + { + "module": "crewai.crew", + "method": "Crew.kickoff", + }, + { + "module": "crewai.crew", + "method": "Crew.kickoff_for_each", + }, + { + "module": "crewai.crew", + "method": "Crew.kickoff_async", + }, + { + "module": "crewai.crew", + "method": "Crew.kickoff_for_each_async", + }, + { + "module": "crewai.agent", + "method": "Agent.execute_task", + }, + { + "module": "crewai.task", + "method": "Task.execute_sync", + }, + { + "module": "crewai.memory.storage.rag_storage", + "method": "RAGStorage.save", + }, + { + "module": "crewai.memory.storage.rag_storage", + "method": "RAGStorage.search", + }, + { + "module": "crewai.memory.storage.rag_storage", + "method": "RAGStorage.reset", + }, + ] + + def instrumentation_dependencies(self) -> Collection[str]: + return ["crewai >= 0.32.0"] + + def _instrument(self, **kwargs: Any) -> None: + version = importlib.metadata.version("crewai") + tracer_provider = kwargs.get("tracer_provider") + tracer = get_tracer(__name__, "", tracer_provider) + try: + for method in self.methods_to_patch: + wrap_function_wrapper( + module=method["module"], + name=method["method"], + wrapper=patch_crew(method["method"], version, tracer), + ) + except Exception as e: + print(f"Failed to instrument CrewAI: {e}") + + def _uninstrument(self, **kwargs: Any) -> None: + pass diff --git a/portkey_ai/api_resources/instrumentation/crewai/patch.py b/portkey_ai/api_resources/instrumentation/crewai/patch.py new file mode 100644 index 00000000..172f7573 --- /dev/null +++ b/portkey_ai/api_resources/instrumentation/crewai/patch.py @@ -0,0 +1,50 @@ +from opentelemetry import trace +from opentelemetry.trace import SpanKind, Status, StatusCode + +from portkey_ai.utils.json_utils import serialize_args, serialize_kwargs +from portkey_ai.api_resources.instrumentation.utils import ( + set_members, + set_span_attribute, +) + + +def patch_crew(operation_name: str, version: str, tracer: trace.Tracer): + def traced_func(wrapped, instance, args, kwargs): + with tracer.start_as_current_span( + name=operation_name, kind=SpanKind.CLIENT + ) as span: + try: + module_name = instance.__module__ + class_name = instance.__class__.__name__ + + span.set_attribute("_source", "crewai") + span.set_attribute("_source_type", "agent framework") + span.set_attribute("framework.version", version) + span.set_attribute("module", module_name) + span.set_attribute("method", operation_name) + span.set_attribute("args", serialize_args(*args)) + span.set_attribute("kwargs", serialize_kwargs(**kwargs)) + + result = wrapped(*args, **kwargs) + span.set_status(Status(StatusCode.OK)) + + try: + set_members(span, instance, module_name, class_name) + except Exception as e: + span.record_exception(e) + + set_span_attribute(span, "result", result) + + if class_name == "Crew": + for attr in ["tasks_output", "token_usage", "usage_metrics"]: + if hasattr(result, attr): + span.set_attribute( + f"crewai.crew.{attr}", str(getattr(result, attr)) + ) + except Exception as e: + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + raise + return result + + return traced_func diff --git a/portkey_ai/api_resources/instrumentation/langgraph/__init__.py b/portkey_ai/api_resources/instrumentation/langgraph/__init__.py new file mode 100644 index 00000000..c2a63bff --- /dev/null +++ b/portkey_ai/api_resources/instrumentation/langgraph/__init__.py @@ -0,0 +1,5 @@ +from portkey_ai.api_resources.instrumentation.langgraph.instrumentation import ( + LanggraphInstrumentor, +) + +__all__ = ["LanggraphInstrumentor"] diff --git a/portkey_ai/api_resources/instrumentation/langgraph/instrumentation.py b/portkey_ai/api_resources/instrumentation/langgraph/instrumentation.py new file mode 100644 index 00000000..7a2cb811 --- /dev/null +++ b/portkey_ai/api_resources/instrumentation/langgraph/instrumentation.py @@ -0,0 +1,52 @@ +import importlib.metadata +from typing import Any, Collection +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor # type: ignore [attr-defined] +from opentelemetry.trace import get_tracer +from wrapt import wrap_function_wrapper + +from portkey_ai.api_resources.instrumentation.langgraph.patch import patch_langgraph + + +class LanggraphInstrumentor(BaseInstrumentor): + methods_to_patch = [ + { + "module": "langgraph.graph.state", + "method": "StateGraph.add_node", + }, + { + "module": "langgraph.graph.state", + "method": "StateGraph.add_edge", + }, + { + "module": "langgraph.graph.state", + "method": "StateGraph.set_entry_point", + }, + { + "module": "langgraph.graph.state", + "method": "StateGraph.set_finish_point", + }, + { + "module": "langgraph.graph.state", + "method": "StateGraph.add_conditional_edges", + }, + ] + + def instrumentation_dependencies(self) -> Collection[str]: + return ["langgraph >= 0.2.0"] + + def _instrument(self, **kwargs: Any) -> None: + version = importlib.metadata.version("langgraph") + tracer_provider = kwargs.get("tracer_provider") + tracer = get_tracer(__name__, "", tracer_provider) + try: + for method in self.methods_to_patch: + wrap_function_wrapper( + module=method["module"], + name=method["method"], + wrapper=patch_langgraph(method["method"], version, tracer), + ) + except Exception as e: + print(f"Failed to instrument Langgraph: {e}") + + def _uninstrument(self, **kwargs: Any) -> None: + pass diff --git a/portkey_ai/api_resources/instrumentation/langgraph/patch.py b/portkey_ai/api_resources/instrumentation/langgraph/patch.py new file mode 100644 index 00000000..b4e2b10b --- /dev/null +++ b/portkey_ai/api_resources/instrumentation/langgraph/patch.py @@ -0,0 +1,46 @@ +from opentelemetry import trace +from opentelemetry.trace import SpanKind, Status, StatusCode + +from portkey_ai.utils.json_utils import serialize_args, serialize_kwargs +from portkey_ai.api_resources.instrumentation.utils import ( + set_members, + set_span_attribute, +) + + +def patch_langgraph(operation_name: str, version: str, tracer: trace.Tracer): + def traced_func(wrapped, instance, args, kwargs): + with tracer.start_as_current_span( + name=operation_name, kind=SpanKind.CLIENT + ) as span: + try: + module_name = instance.__module__ + class_name = instance.__class__.__name__ + + span.set_attribute("_source", "langgraph") + span.set_attribute("framework.version", version) + span.set_attribute("module", module_name) + span.set_attribute("method", operation_name) + span.set_attribute("args", serialize_args(*args)) + span.set_attribute("kwargs", serialize_kwargs(**kwargs)) + + result = wrapped(*args, **kwargs) + if isinstance(result, instance.__class__): + pass + else: + set_span_attribute(span, "result", result) + + span.set_status(Status(StatusCode.OK)) + + try: + set_members(span, instance, module_name, class_name) + except Exception as e: + span.record_exception(e) + + except Exception as e: + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + raise + return result + + return traced_func diff --git a/portkey_ai/api_resources/instrumentation/litellm/__init__.py b/portkey_ai/api_resources/instrumentation/litellm/__init__.py new file mode 100644 index 00000000..8db5a3be --- /dev/null +++ b/portkey_ai/api_resources/instrumentation/litellm/__init__.py @@ -0,0 +1,5 @@ +from portkey_ai.api_resources.instrumentation.litellm.instrumentation import ( + LitellmInstrumentor, +) + +__all__ = ["LitellmInstrumentor"] diff --git a/portkey_ai/api_resources/instrumentation/litellm/instrumentation.py b/portkey_ai/api_resources/instrumentation/litellm/instrumentation.py new file mode 100644 index 00000000..f04f1b77 --- /dev/null +++ b/portkey_ai/api_resources/instrumentation/litellm/instrumentation.py @@ -0,0 +1,59 @@ +import importlib.metadata +from typing import Any, Collection +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor # type: ignore [attr-defined] +from opentelemetry.trace import get_tracer +from wrapt import wrap_function_wrapper + +from portkey_ai.api_resources.instrumentation.litellm.patch import patch_litellm + + +class LitellmInstrumentor(BaseInstrumentor): + methods_to_patch = [ + { + "module": "litellm", + "method": "completion", + }, + { + "module": "litellm", + "method": "text_completion", + }, + { + "module": "litellm.main", + "method": "acompletion", + }, + { + "module": "litellm.main", + "method": "image_generation", + }, + { + "module": "litellm.main", + "method": "aimage_generation", + }, + { + "module": "litellm.main", + "method": "embedding", + }, + { + "module": "litellm.main", + "method": "aembedding", + }, + ] + + def instrumentation_dependencies(self) -> Collection[str]: + return ["litellm >= 1.48.0"] + + def _instrument(self, **kwargs: Any) -> None: + version = importlib.metadata.version("litellm") + tracer_provider = kwargs.get("tracer_provider") + tracer = get_tracer(__name__, "", tracer_provider) + for method in self.methods_to_patch: + wrap_function_wrapper( + module=method["module"], + name=method["method"], + wrapper=patch_litellm( + method["module"], method["method"], version, tracer + ), + ) + + def _uninstrument(self, **kwargs: Any) -> None: + pass diff --git a/portkey_ai/api_resources/instrumentation/litellm/patch.py b/portkey_ai/api_resources/instrumentation/litellm/patch.py new file mode 100644 index 00000000..fad4539d --- /dev/null +++ b/portkey_ai/api_resources/instrumentation/litellm/patch.py @@ -0,0 +1,57 @@ +from opentelemetry import trace +from opentelemetry.trace import SpanKind, Status, StatusCode + +from portkey_ai.utils.json_utils import serialize_args, serialize_kwargs +from portkey_ai.api_resources.instrumentation.utils import ( + set_span_attribute, + set_members, +) +from portkey_ai.utils import string_to_uuid + + +def patch_litellm(module: str, operation_name: str, version: str, tracer: trace.Tracer): + def traced_func(wrapped, instance, args, kwargs): + with tracer.start_as_current_span( + name=operation_name, kind=SpanKind.CLIENT + ) as span: + try: + module_name = module + class_name = operation_name + + if operation_name == "completion": + headers = kwargs.get("headers", {}) + headers["x-portkey-trace-id"] = str( + string_to_uuid(span.get_span_context().trace_id) + ) + headers["x-portkey-span-id"] = str(span.get_span_context().span_id) + headers["x-portkey-parent-span-id"] = ( + str(span.parent.span_id) if span.parent else None + ) + headers["x-portkey-span-name"] = span.name + kwargs["headers"] = headers + + span.set_attribute("framework.version", version) + span.set_attribute("_source", "litellm") + span.set_attribute("_source_type", "routing library") + span.set_attribute("module", module_name) + span.set_attribute("method", operation_name) + span.set_attribute("args", serialize_args(*args)) + span.set_attribute("kwargs", serialize_kwargs(**kwargs)) + + result = wrapped(*args, **kwargs) + span.set_status(Status(StatusCode.OK)) + + try: + set_members(span, instance, module_name, class_name) + except Exception as e: + span.record_exception(e) + + set_span_attribute(span, "result", result) + + except Exception as e: + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + raise + return result + + return traced_func diff --git a/portkey_ai/api_resources/instrumentation/openai/__init__.py b/portkey_ai/api_resources/instrumentation/openai/__init__.py new file mode 100644 index 00000000..d953e0aa --- /dev/null +++ b/portkey_ai/api_resources/instrumentation/openai/__init__.py @@ -0,0 +1,5 @@ +from portkey_ai.api_resources.instrumentation.openai.instrumentation import ( + OpenaiInstrumentor, +) + +__all__ = ["OpenaiInstrumentor"] diff --git a/portkey_ai/api_resources/instrumentation/openai/instrumentation.py b/portkey_ai/api_resources/instrumentation/openai/instrumentation.py new file mode 100644 index 00000000..b8b77d64 --- /dev/null +++ b/portkey_ai/api_resources/instrumentation/openai/instrumentation.py @@ -0,0 +1,59 @@ +import importlib.metadata +from typing import Any, Collection +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor # type: ignore [attr-defined] +from opentelemetry.trace import get_tracer +from wrapt import wrap_function_wrapper + +from portkey_ai.api_resources.instrumentation.openai.patch import patch_openai + + +class OpenaiInstrumentor(BaseInstrumentor): + methods_to_patch = [ + { + "module": "openai.resources.chat.completions", + "method": "Completions.create", + }, + { + "module": "openai.resources.chat.completions", + "method": "AsyncCompletions.create", + }, + { + "module": "openai.resources.images", + "method": "Images.generate", + }, + { + "module": "openai.resources.images", + "method": "AsyncImages.generate", + }, + { + "module": "openai.resources.images", + "method": "Images.edit", + }, + { + "module": "openai.resources.embeddings", + "method": "Embeddings.create", + }, + { + "module": "openai.resources.embeddings", + "method": "AsyncEmbeddings.create", + }, + ] + + def instrumentation_dependencies(self) -> Collection[str]: + return ["openai >= 0.27.0", "trace-attributes >= 4.0.5"] + + def _instrument(self, **kwargs: Any) -> None: + version = importlib.metadata.version("openai") + tracer_provider = kwargs.get("tracer_provider") + tracer = get_tracer(__name__, "", tracer_provider) + for method in self.methods_to_patch: + wrap_function_wrapper( + module=method["module"], + name=method["method"], + wrapper=patch_openai( + method["module"], method["method"], version, tracer + ), + ) + + def _uninstrument(self, **kwargs: Any) -> None: + pass diff --git a/portkey_ai/api_resources/instrumentation/openai/patch.py b/portkey_ai/api_resources/instrumentation/openai/patch.py new file mode 100644 index 00000000..675a3cc2 --- /dev/null +++ b/portkey_ai/api_resources/instrumentation/openai/patch.py @@ -0,0 +1,44 @@ +from opentelemetry import trace +from opentelemetry.trace import SpanKind, Status, StatusCode + +from portkey_ai.utils.json_utils import serialize_args, serialize_kwargs +from portkey_ai.api_resources.instrumentation.utils import ( + set_span_attribute, + set_members, +) + + +def patch_openai(module: str, operation_name: str, version: str, tracer: trace.Tracer): + def traced_func(wrapped, instance, args, kwargs): + with tracer.start_as_current_span( + name=operation_name, kind=SpanKind.CLIENT + ) as span: + try: + module_name = module + class_name = operation_name + + span.set_attribute("_source", "openai") + span.set_attribute("_source_type", "LLM provider") + span.set_attribute("framework.version", version) + span.set_attribute("module", module_name) + span.set_attribute("method", operation_name) + span.set_attribute("args", serialize_args(*args)) + span.set_attribute("kwargs", serialize_kwargs(**kwargs)) + + result = wrapped(*args, **kwargs) + span.set_status(Status(StatusCode.OK)) + + try: + set_members(span, instance, module_name, class_name) + except Exception: + span.set_attribute(f"{module_name}.{class_name}.error", instance) + + set_span_attribute(span, "result", result) + + except Exception as e: + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + raise + return result + + return traced_func diff --git a/portkey_ai/api_resources/instrumentation/portkey_span_exporter.py b/portkey_ai/api_resources/instrumentation/portkey_span_exporter.py new file mode 100644 index 00000000..97934166 --- /dev/null +++ b/portkey_ai/api_resources/instrumentation/portkey_span_exporter.py @@ -0,0 +1,70 @@ +import json +from typing import Any, Sequence +from opentelemetry.sdk.trace.export import ( + SpanExporter, + SpanExportResult, + ReadableSpan, +) + +from portkey_ai.api_resources.apis.logger import Logger +from portkey_ai.utils import string_to_uuid + + +class PortkeySpanExporter(SpanExporter): + def __init__(self, api_key: str, base_url: str): + self.api_key = api_key + self.base_url = base_url + + def transform_span_to_log(self, span: ReadableSpan) -> dict: + start_time = span.start_time + end_time = span.end_time + response_time: float + if start_time and end_time: + response_time = (end_time - start_time) * 10**-6 + else: + response_time = 0 + log = { + "metadata": { + "traceId": string_to_uuid(span.context.trace_id), + "spanId": span.context.span_id, + "spanName": span.name, + "parentSpanId": (span.parent.span_id if span.parent else None), + "startTime": start_time, + "endTime": end_time, + "_logType": "opentelemetry", + "_source": ( + span.attributes.get("_source", "unknown") + if span.attributes + else "unknown" + ), + "framework.version": ( + span.attributes.get("framework.version", "unknown") + if span.attributes + else "unknown" + ), + }, + "request": { + "method": "POST", + "headers": {"Content-Type": "application/json"}, + "body": json.loads(span.to_json()), + }, + "response": { + "status": 200, + "headers": {"Content-Type": "application/json"}, + "body": {}, + "response_time": response_time, + }, + } + return log + + def export(self, spans: Sequence[ReadableSpan], **kwargs: Any) -> SpanExportResult: + logger = Logger(api_key=self.api_key, base_url=self.base_url) + logs = [] + for span in spans: + logs.append(self.transform_span_to_log(span)) + try: + logger.log(logs) + except Exception: + return SpanExportResult.FAILURE + + return SpanExportResult.SUCCESS diff --git a/portkey_ai/api_resources/instrumentation/utils.py b/portkey_ai/api_resources/instrumentation/utils.py new file mode 100644 index 00000000..a62ee8ab --- /dev/null +++ b/portkey_ai/api_resources/instrumentation/utils.py @@ -0,0 +1,38 @@ +import json +from typing import Any +from opentelemetry.trace import Span + + +def set_span_attribute(span: Span, key: str, value: Any, _processed=None, depth=0): + if value is None or depth > 2: + return + + # Initialize processed set on first call + if _processed is None: + _processed = set() + + # Get object id to track circular references + obj_id = id(value) + if obj_id in _processed: + return + + try: + if isinstance(value, (dict, list, tuple, set, str, int, float, bool)): + span.set_attribute(key, json.dumps(value)) + else: + _processed.add(obj_id) + for child_key, child_value in value.__dict__.items(): + if child_key.startswith("_") or child_value is None: + continue + set_span_attribute( + span, f"{key}.{child_key}", child_value, _processed, depth + 1 + ) + except Exception: + span.set_attribute(key, str(value)) + + +def set_members(span: Span, instance: Any, module_name: str, class_name: str): + if instance is None: + return + for key, value in instance.__dict__.items(): + set_span_attribute(span, f"{module_name}.{class_name}.{key}", value) diff --git a/portkey_ai/api_resources/utils.py b/portkey_ai/api_resources/utils.py index 864dd082..faa14050 100644 --- a/portkey_ai/api_resources/utils.py +++ b/portkey_ai/api_resources/utils.py @@ -260,6 +260,7 @@ class Constructs(BaseModel): debug: Optional[bool] = None custom_host: Optional[str] = None forward_headers: Optional[str] = None + instrumentation: Optional[bool] = None weight: Optional[float] = None retry: Optional[RetrySettings] = None deployment_id: Optional[str] = None diff --git a/portkey_ai/utils/__init__.py b/portkey_ai/utils/__init__.py new file mode 100644 index 00000000..91eba614 --- /dev/null +++ b/portkey_ai/utils/__init__.py @@ -0,0 +1,4 @@ +from .json_utils import serialize_kwargs, serialize_args +from .hashing_utils import string_to_uuid + +__all__ = ["serialize_kwargs", "serialize_args", "string_to_uuid"] diff --git a/portkey_ai/utils/hashing_utils.py b/portkey_ai/utils/hashing_utils.py new file mode 100644 index 00000000..1fe6e3dc --- /dev/null +++ b/portkey_ai/utils/hashing_utils.py @@ -0,0 +1,8 @@ +import uuid + + +def string_to_uuid(input_string: str) -> str: + if input_string is None: + return None + # Using UUID v5 (SHA-1-based) - more secure but slower + return str(uuid.uuid5(uuid.NAMESPACE_DNS, str(input_string))) diff --git a/portkey_ai/utils/json_utils.py b/portkey_ai/utils/json_utils.py new file mode 100644 index 00000000..6b016caf --- /dev/null +++ b/portkey_ai/utils/json_utils.py @@ -0,0 +1,33 @@ +import json + + +def serialize_kwargs(**kwargs): + # Function to check if a value is serializable + def is_serializable(value): + try: + json.dumps(value) + return True + except (TypeError, ValueError): + return False + + # Filter out non-serializable items + serializable_kwargs = {k: v for k, v in kwargs.items() if is_serializable(v)} + + # Convert to string representation + return json.dumps(serializable_kwargs) + + +def serialize_args(*args): + # Function to check if a value is serializable + def is_serializable(value): + try: + json.dumps(value) + return True + except (TypeError, ValueError): + return False + + # Filter out non-serializable items + serializable_args = [arg for arg in args if is_serializable(arg)] + + # Convert to string representation + return json.dumps(serializable_args) diff --git a/setup.cfg b/setup.cfg index ad7c6ae6..518a5dda 100644 --- a/setup.cfg +++ b/setup.cfg @@ -52,6 +52,10 @@ langchain_callback = langchain-core llama_index_callback = llama-index +opentelemetry = + opentelemetry-sdk>=1.29.0,<2.0 + opentelemetry-instrumentation>=0.50b0,<1.0 + wrapt>=1.17.0,<2.0 [mypy] ignore_missing_imports = true