Skip to content

Commit

Permalink
feat: auto-instrumentation
Browse files Browse the repository at this point in the history
initial commit for instrumentation using monkey patching and open telemetry

formatting and linting

Do not install dependencies for opentelemetry by default

version locking

make changes for async client

feat: instrumentation for litellm and openai

revert log changes

langgraph instrumentation

remove openai instrumentation

fix missing import

formatting

minor changes

minor fixes

fixes per comments
  • Loading branch information
narengogi committed Feb 13, 2025
1 parent 0cf1a3a commit 5c760ed
Show file tree
Hide file tree
Showing 24 changed files with 682 additions and 4 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,7 @@ langchain_callback:
pip install -e ".[langchain_callback]"

llama_index_callback:
pip install -e ".[llama_index_callback]"
pip install -e ".[llama_index_callback]"

instrumentation:
pip install -e ".[opentelemetry]"
5 changes: 2 additions & 3 deletions portkey_ai/api_resources/apis/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@

class Logger:
def __init__(
self,
api_key: Optional[str] = None,
self, api_key: Optional[str] = None, base_url: Optional[str] = None
) -> None:
api_key = api_key or os.getenv("PORTKEY_API_KEY")
if api_key is None:
Expand All @@ -20,7 +19,7 @@ def __init__(
"x-portkey-api-key": api_key,
}

self.url = PORTKEY_BASE_URL + "/logs"
self.url = (base_url or PORTKEY_BASE_URL) + "/logs"

def log(
self,
Expand Down
4 changes: 4 additions & 0 deletions portkey_ai/api_resources/base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(
cache_force_refresh: Optional[bool] = None,
custom_host: Optional[str] = None,
forward_headers: Optional[List[str]] = None,
instrumentation: Optional[bool] = None,
openai_project: Optional[str] = None,
openai_organization: Optional[str] = None,
aws_secret_access_key: Optional[str] = None,
Expand Down Expand Up @@ -97,6 +98,7 @@ def __init__(
self.cache_force_refresh = cache_force_refresh
self.custom_host = custom_host
self.forward_headers = forward_headers
self.instrumentation = instrumentation or False
self.openai_project = openai_project
self.openai_organization = openai_organization
self.aws_secret_access_key = aws_secret_access_key
Expand Down Expand Up @@ -709,6 +711,7 @@ def __init__(
cache_force_refresh: Optional[bool] = None,
custom_host: Optional[str] = None,
forward_headers: Optional[List[str]] = None,
instrumentation: Optional[bool] = None,
openai_project: Optional[str] = None,
openai_organization: Optional[str] = None,
aws_secret_access_key: Optional[str] = None,
Expand Down Expand Up @@ -743,6 +746,7 @@ def __init__(
self.cache_force_refresh = cache_force_refresh
self.custom_host = custom_host
self.forward_headers = forward_headers
self.instrumentation = instrumentation
self.openai_project = openai_project
self.openai_organization = openai_organization
self.aws_secret_access_key = aws_secret_access_key
Expand Down
32 changes: 32 additions & 0 deletions portkey_ai/api_resources/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(
cache_force_refresh: Optional[bool] = None,
custom_host: Optional[str] = None,
forward_headers: Optional[List[str]] = None,
instrumentation: Optional[bool] = None,
openai_project: Optional[str] = None,
openai_organization: Optional[str] = None,
aws_secret_access_key: Optional[str] = None,
Expand Down Expand Up @@ -98,6 +99,7 @@ def __init__(
cache_force_refresh=cache_force_refresh,
custom_host=custom_host,
forward_headers=forward_headers,
instrumentation=instrumentation,
openai_project=openai_project,
openai_organization=openai_organization,
aws_secret_access_key=aws_secret_access_key,
Expand Down Expand Up @@ -152,6 +154,18 @@ def __init__(
self.logs = apis.Logs(self)
self.beta = self.beta(self) # type: ignore

if self.instrumentation:
try:
from portkey_ai.api_resources.instrumentation import (
initialize_instrumentation,
)
except ImportError:
raise ImportError(
"""Please install opentelemetry for instrumentation,
you can use `make instrumentation` to install the dependencies"""
)
initialize_instrumentation(api_key=self.api_key, base_url=self.base_url)

def copy(
self,
*,
Expand All @@ -168,6 +182,7 @@ def copy(
cache_force_refresh: Optional[bool] = None,
custom_host: Optional[str] = None,
forward_headers: Optional[List[str]] = None,
instrumentation: Optional[bool] = None,
openai_project: Optional[str] = None,
openai_organization: Optional[str] = None,
aws_secret_access_key: Optional[str] = None,
Expand Down Expand Up @@ -203,6 +218,7 @@ def copy(
cache_force_refresh=cache_force_refresh or self.cache_force_refresh,
custom_host=custom_host or self.custom_host,
forward_headers=forward_headers or self.forward_headers,
instrumentation=instrumentation or self.instrumentation,
openai_project=openai_project or self.openai_project,
openai_organization=openai_organization or self.openai_organization,
aws_secret_access_key=aws_secret_access_key or self.aws_secret_access_key,
Expand Down Expand Up @@ -287,6 +303,7 @@ def __init__(
cache_force_refresh: Optional[bool] = None,
custom_host: Optional[str] = None,
forward_headers: Optional[List[str]] = None,
instrumentation: Optional[bool] = None,
openai_project: Optional[str] = None,
openai_organization: Optional[str] = None,
aws_secret_access_key: Optional[str] = None,
Expand Down Expand Up @@ -322,6 +339,7 @@ def __init__(
cache_force_refresh=cache_force_refresh,
custom_host=custom_host,
forward_headers=forward_headers,
instrumentation=instrumentation,
openai_project=openai_project,
openai_organization=openai_organization,
aws_secret_access_key=aws_secret_access_key,
Expand Down Expand Up @@ -376,6 +394,18 @@ def __init__(
self.logs = apis.AsyncLogs(self)
self.beta = self.beta(self) # type: ignore

if self.instrumentation:
try:
from portkey_ai.api_resources.instrumentation import (
initialize_instrumentation,
)
except ImportError:
raise ImportError(
"""Please install opentelemetry for instrumentation,
you can use `make instrumentation` to install the dependencies"""
)
initialize_instrumentation(api_key=self.api_key, base_url=self.base_url)

def copy(
self,
*,
Expand All @@ -392,6 +422,7 @@ def copy(
cache_force_refresh: Optional[bool] = None,
custom_host: Optional[str] = None,
forward_headers: Optional[List[str]] = None,
instrumentation: Optional[bool] = None,
openai_project: Optional[str] = None,
openai_organization: Optional[str] = None,
aws_secret_access_key: Optional[str] = None,
Expand Down Expand Up @@ -427,6 +458,7 @@ def copy(
cache_force_refresh=cache_force_refresh or self.cache_force_refresh,
custom_host=custom_host or self.custom_host,
forward_headers=forward_headers or self.forward_headers,
instrumentation=instrumentation or self.instrumentation,
openai_project=openai_project or self.openai_project,
openai_organization=openai_organization or self.openai_organization,
aws_secret_access_key=aws_secret_access_key or self.aws_secret_access_key,
Expand Down
29 changes: 29 additions & 0 deletions portkey_ai/api_resources/instrumentation/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import importlib
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor # type: ignore [attr-defined]

from .crewai import CrewAIInstrumentor
from .litellm import LitellmInstrumentor
from .portkey_span_exporter import PortkeySpanExporter
from .langgraph import LanggraphInstrumentor

__all__ = ["initialize_instrumentation"]

package_instrumentor_map: dict[str, BaseInstrumentor] = {
"crewai": CrewAIInstrumentor,
"litellm": LitellmInstrumentor,
"langgraph": LanggraphInstrumentor,
}


def initialize_instrumentation(api_key: str, base_url: str):
tracer_provider = TracerProvider()
exporter = PortkeySpanExporter(api_key=api_key, base_url=base_url)
trace.set_tracer_provider(tracer_provider)
tracer_provider.add_span_processor(BatchSpanProcessor(exporter))
for package, instrumentor in package_instrumentor_map.items():
if importlib.util.find_spec(package):
instrumentor().instrument()
print(f"Portkey: {package} Instrumentation initialized")
3 changes: 3 additions & 0 deletions portkey_ai/api_resources/instrumentation/crewai/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .instrumentation import CrewAIInstrumentor

__all__ = ["CrewAIInstrumentor"]
68 changes: 68 additions & 0 deletions portkey_ai/api_resources/instrumentation/crewai/instrumentation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import importlib.metadata
from typing import Any, Collection
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor # type: ignore [attr-defined]
from opentelemetry.trace import get_tracer
from wrapt import wrap_function_wrapper

from portkey_ai.api_resources.instrumentation.crewai.patch import patch_crew


class CrewAIInstrumentor(BaseInstrumentor):
methods_to_patch = [
{
"module": "crewai.crew",
"method": "Crew.kickoff",
},
{
"module": "crewai.crew",
"method": "Crew.kickoff_for_each",
},
{
"module": "crewai.crew",
"method": "Crew.kickoff_async",
},
{
"module": "crewai.crew",
"method": "Crew.kickoff_for_each_async",
},
{
"module": "crewai.agent",
"method": "Agent.execute_task",
},
{
"module": "crewai.task",
"method": "Task.execute_sync",
},
{
"module": "crewai.memory.storage.rag_storage",
"method": "RAGStorage.save",
},
{
"module": "crewai.memory.storage.rag_storage",
"method": "RAGStorage.search",
},
{
"module": "crewai.memory.storage.rag_storage",
"method": "RAGStorage.reset",
},
]

def instrumentation_dependencies(self) -> Collection[str]:
return ["crewai >= 0.32.0"]

def _instrument(self, **kwargs: Any) -> None:
version = importlib.metadata.version("crewai")
tracer_provider = kwargs.get("tracer_provider")
tracer = get_tracer(__name__, "", tracer_provider)
try:
for method in self.methods_to_patch:
wrap_function_wrapper(
module=method["module"],
name=method["method"],
wrapper=patch_crew(method["method"], version, tracer),
)
except Exception as e:
print(f"Failed to instrument CrewAI: {e}")

def _uninstrument(self, **kwargs: Any) -> None:
pass
50 changes: 50 additions & 0 deletions portkey_ai/api_resources/instrumentation/crewai/patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from opentelemetry import trace
from opentelemetry.trace import SpanKind, Status, StatusCode

from portkey_ai.utils.json_utils import serialize_args, serialize_kwargs
from portkey_ai.api_resources.instrumentation.utils import (
set_members,
set_span_attribute,
)


def patch_crew(operation_name: str, version: str, tracer: trace.Tracer):
def traced_func(wrapped, instance, args, kwargs):
with tracer.start_as_current_span(
name=operation_name, kind=SpanKind.CLIENT
) as span:
try:
module_name = instance.__module__
class_name = instance.__class__.__name__

span.set_attribute("_source", "crewai")
span.set_attribute("_source_type", "agent framework")
span.set_attribute("framework.version", version)
span.set_attribute("module", module_name)
span.set_attribute("method", operation_name)
span.set_attribute("args", serialize_args(*args))
span.set_attribute("kwargs", serialize_kwargs(**kwargs))

result = wrapped(*args, **kwargs)
span.set_status(Status(StatusCode.OK))

try:
set_members(span, instance, module_name, class_name)
except Exception as e:
span.record_exception(e)

set_span_attribute(span, "result", result)

if class_name == "Crew":
for attr in ["tasks_output", "token_usage", "usage_metrics"]:
if hasattr(result, attr):
span.set_attribute(
f"crewai.crew.{attr}", str(getattr(result, attr))
)
except Exception as e:
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
raise
return result

return traced_func
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from portkey_ai.api_resources.instrumentation.langgraph.instrumentation import (
LanggraphInstrumentor,
)

__all__ = ["LanggraphInstrumentor"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import importlib.metadata
from typing import Any, Collection
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor # type: ignore [attr-defined]
from opentelemetry.trace import get_tracer
from wrapt import wrap_function_wrapper

from portkey_ai.api_resources.instrumentation.langgraph.patch import patch_langgraph


class LanggraphInstrumentor(BaseInstrumentor):
methods_to_patch = [
{
"module": "langgraph.graph.state",
"method": "StateGraph.add_node",
},
{
"module": "langgraph.graph.state",
"method": "StateGraph.add_edge",
},
{
"module": "langgraph.graph.state",
"method": "StateGraph.set_entry_point",
},
{
"module": "langgraph.graph.state",
"method": "StateGraph.set_finish_point",
},
{
"module": "langgraph.graph.state",
"method": "StateGraph.add_conditional_edges",
},
]

def instrumentation_dependencies(self) -> Collection[str]:
return ["langgraph >= 0.2.0"]

def _instrument(self, **kwargs: Any) -> None:
version = importlib.metadata.version("langgraph")
tracer_provider = kwargs.get("tracer_provider")
tracer = get_tracer(__name__, "", tracer_provider)
try:
for method in self.methods_to_patch:
wrap_function_wrapper(
module=method["module"],
name=method["method"],
wrapper=patch_langgraph(method["method"], version, tracer),
)
except Exception as e:
print(f"Failed to instrument Langgraph: {e}")

def _uninstrument(self, **kwargs: Any) -> None:
pass
Loading

0 comments on commit 5c760ed

Please sign in to comment.