Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

instrumentation using monkey patching and open telemetry #282

Merged
merged 6 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,7 @@ langchain_callback:
pip install -e ".[langchain_callback]"

llama_index_callback:
pip install -e ".[llama_index_callback]"
pip install -e ".[llama_index_callback]"

instrumentation:
pip install -e ".[instrumentation]"
5 changes: 2 additions & 3 deletions portkey_ai/api_resources/apis/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@

class Logger:
def __init__(
self,
api_key: Optional[str] = None,
self, api_key: Optional[str] = None, base_url: Optional[str] = None
) -> None:
api_key = api_key or os.getenv("PORTKEY_API_KEY")
if api_key is None:
Expand All @@ -20,7 +19,7 @@ def __init__(
"x-portkey-api-key": api_key,
}

self.url = PORTKEY_BASE_URL + "/logs"
self.url = (base_url or PORTKEY_BASE_URL) + "/logs"

def log(
self,
Expand Down
4 changes: 4 additions & 0 deletions portkey_ai/api_resources/base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(
cache_force_refresh: Optional[bool] = None,
custom_host: Optional[str] = None,
forward_headers: Optional[List[str]] = None,
instrumentation: Optional[bool] = None,
openai_project: Optional[str] = None,
openai_organization: Optional[str] = None,
aws_secret_access_key: Optional[str] = None,
Expand Down Expand Up @@ -97,6 +98,7 @@ def __init__(
self.cache_force_refresh = cache_force_refresh
self.custom_host = custom_host
self.forward_headers = forward_headers
self.instrumentation = instrumentation or False
self.openai_project = openai_project
self.openai_organization = openai_organization
self.aws_secret_access_key = aws_secret_access_key
Expand Down Expand Up @@ -709,6 +711,7 @@ def __init__(
cache_force_refresh: Optional[bool] = None,
custom_host: Optional[str] = None,
forward_headers: Optional[List[str]] = None,
instrumentation: Optional[bool] = None,
openai_project: Optional[str] = None,
openai_organization: Optional[str] = None,
aws_secret_access_key: Optional[str] = None,
Expand Down Expand Up @@ -743,6 +746,7 @@ def __init__(
self.cache_force_refresh = cache_force_refresh
self.custom_host = custom_host
self.forward_headers = forward_headers
self.instrumentation = instrumentation
self.openai_project = openai_project
self.openai_organization = openai_organization
self.aws_secret_access_key = aws_secret_access_key
Expand Down
32 changes: 32 additions & 0 deletions portkey_ai/api_resources/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(
cache_force_refresh: Optional[bool] = None,
custom_host: Optional[str] = None,
forward_headers: Optional[List[str]] = None,
instrumentation: Optional[bool] = None,
openai_project: Optional[str] = None,
openai_organization: Optional[str] = None,
aws_secret_access_key: Optional[str] = None,
Expand Down Expand Up @@ -98,6 +99,7 @@ def __init__(
cache_force_refresh=cache_force_refresh,
custom_host=custom_host,
forward_headers=forward_headers,
instrumentation=instrumentation,
openai_project=openai_project,
openai_organization=openai_organization,
aws_secret_access_key=aws_secret_access_key,
Expand Down Expand Up @@ -152,6 +154,18 @@ def __init__(
self.logs = apis.Logs(self)
self.beta = self.beta(self) # type: ignore

if self.instrumentation:
try:
from portkey_ai.api_resources.instrumentation import (
initialize_instrumentation,
)
except ImportError:
raise ImportError(
"""Please install opentelemetry for instrumentation,
you can use `portkey-ai[opentelemetry]` to install"""
)
initialize_instrumentation(api_key=self.api_key, base_url=self.base_url)

def copy(
self,
*,
Expand All @@ -168,6 +182,7 @@ def copy(
cache_force_refresh: Optional[bool] = None,
custom_host: Optional[str] = None,
forward_headers: Optional[List[str]] = None,
instrumentation: Optional[bool] = None,
openai_project: Optional[str] = None,
openai_organization: Optional[str] = None,
aws_secret_access_key: Optional[str] = None,
Expand Down Expand Up @@ -203,6 +218,7 @@ def copy(
cache_force_refresh=cache_force_refresh or self.cache_force_refresh,
custom_host=custom_host or self.custom_host,
forward_headers=forward_headers or self.forward_headers,
instrumentation=instrumentation or self.instrumentation,
openai_project=openai_project or self.openai_project,
openai_organization=openai_organization or self.openai_organization,
aws_secret_access_key=aws_secret_access_key or self.aws_secret_access_key,
Expand Down Expand Up @@ -287,6 +303,7 @@ def __init__(
cache_force_refresh: Optional[bool] = None,
custom_host: Optional[str] = None,
forward_headers: Optional[List[str]] = None,
instrumentation: Optional[bool] = None,
openai_project: Optional[str] = None,
openai_organization: Optional[str] = None,
aws_secret_access_key: Optional[str] = None,
Expand Down Expand Up @@ -322,6 +339,7 @@ def __init__(
cache_force_refresh=cache_force_refresh,
custom_host=custom_host,
forward_headers=forward_headers,
instrumentation=instrumentation,
openai_project=openai_project,
openai_organization=openai_organization,
aws_secret_access_key=aws_secret_access_key,
Expand Down Expand Up @@ -376,6 +394,18 @@ def __init__(
self.logs = apis.AsyncLogs(self)
self.beta = self.beta(self) # type: ignore

if self.instrumentation:
try:
from portkey_ai.api_resources.instrumentation import (
initialize_instrumentation,
)
except ImportError:
raise ImportError(
"""Please install opentelemetry for instrumentation,
you can use `portkey-ai[opentelemetry]` to install"""
)
initialize_instrumentation(api_key=self.api_key, base_url=self.base_url)

def copy(
self,
*,
Expand All @@ -392,6 +422,7 @@ def copy(
cache_force_refresh: Optional[bool] = None,
custom_host: Optional[str] = None,
forward_headers: Optional[List[str]] = None,
instrumentation: Optional[bool] = None,
openai_project: Optional[str] = None,
openai_organization: Optional[str] = None,
aws_secret_access_key: Optional[str] = None,
Expand Down Expand Up @@ -427,6 +458,7 @@ def copy(
cache_force_refresh=cache_force_refresh or self.cache_force_refresh,
custom_host=custom_host or self.custom_host,
forward_headers=forward_headers or self.forward_headers,
instrumentation=instrumentation or self.instrumentation,
openai_project=openai_project or self.openai_project,
openai_organization=openai_organization or self.openai_organization,
aws_secret_access_key=aws_secret_access_key or self.aws_secret_access_key,
Expand Down
38 changes: 38 additions & 0 deletions portkey_ai/api_resources/instrumentation/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from importlib.metadata import version, PackageNotFoundError
from typing import Dict
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor # type: ignore [attr-defined]

from .crewai import CrewAIInstrumentor
from .litellm import LitellmInstrumentor
from .portkey_span_exporter import PortkeySpanExporter
from .langgraph import LanggraphInstrumentor

__all__ = ["initialize_instrumentation"]

package_instrumentor_map: Dict[str, BaseInstrumentor] = {
"crewai": CrewAIInstrumentor,
"litellm": LitellmInstrumentor,
"langgraph": LanggraphInstrumentor,
}


def is_package_installed(pkg_name):
try:
version(pkg_name)
return True
except PackageNotFoundError:
return False


def initialize_instrumentation(api_key: str, base_url: str):
tracer_provider = TracerProvider()
exporter = PortkeySpanExporter(api_key=api_key, base_url=base_url)
trace.set_tracer_provider(tracer_provider)
tracer_provider.add_span_processor(BatchSpanProcessor(exporter))
for package, instrumentor in package_instrumentor_map.items():
if is_package_installed(package):
instrumentor().instrument()
print(f"Portkey: {package} Instrumentation initialized")
3 changes: 3 additions & 0 deletions portkey_ai/api_resources/instrumentation/crewai/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .instrumentation import CrewAIInstrumentor

__all__ = ["CrewAIInstrumentor"]
68 changes: 68 additions & 0 deletions portkey_ai/api_resources/instrumentation/crewai/instrumentation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import importlib.metadata
from typing import Any, Collection
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor # type: ignore [attr-defined]
from opentelemetry.trace import get_tracer
from wrapt import wrap_function_wrapper

from portkey_ai.api_resources.instrumentation.crewai.patch import patch_crew


class CrewAIInstrumentor(BaseInstrumentor):
methods_to_patch = [
{
"module": "crewai.crew",
"method": "Crew.kickoff",
},
{
"module": "crewai.crew",
"method": "Crew.kickoff_for_each",
},
{
"module": "crewai.crew",
"method": "Crew.kickoff_async",
},
{
"module": "crewai.crew",
"method": "Crew.kickoff_for_each_async",
},
{
"module": "crewai.agent",
"method": "Agent.execute_task",
},
{
"module": "crewai.task",
"method": "Task.execute_sync",
},
{
"module": "crewai.memory.storage.rag_storage",
"method": "RAGStorage.save",
},
{
"module": "crewai.memory.storage.rag_storage",
"method": "RAGStorage.search",
},
{
"module": "crewai.memory.storage.rag_storage",
"method": "RAGStorage.reset",
},
]

def instrumentation_dependencies(self) -> Collection[str]:
return ["crewai >= 0.32.0"]

def _instrument(self, **kwargs: Any) -> None:
version = importlib.metadata.version("crewai")
tracer_provider = kwargs.get("tracer_provider")
tracer = get_tracer(__name__, "", tracer_provider)
try:
for method in self.methods_to_patch:
wrap_function_wrapper(
module=method["module"],
name=method["method"],
wrapper=patch_crew(method["method"], version, tracer),
)
except Exception as e:
print(f"Failed to instrument CrewAI: {e}")

def _uninstrument(self, **kwargs: Any) -> None:
pass
50 changes: 50 additions & 0 deletions portkey_ai/api_resources/instrumentation/crewai/patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from opentelemetry import trace
from opentelemetry.trace import SpanKind, Status, StatusCode

from portkey_ai.utils.json_utils import serialize_args, serialize_kwargs
from portkey_ai.api_resources.instrumentation.utils import (
set_members,
set_span_attribute,
)


def patch_crew(operation_name: str, version: str, tracer: trace.Tracer):
def traced_func(wrapped, instance, args, kwargs):
with tracer.start_as_current_span(
name=operation_name, kind=SpanKind.CLIENT
) as span:
try:
module_name = instance.__module__
class_name = instance.__class__.__name__

span.set_attribute("_source", "crewai")
span.set_attribute("_source_type", "agent framework")
span.set_attribute("framework.version", version)
span.set_attribute("module", module_name)
span.set_attribute("method", operation_name)
span.set_attribute("args", serialize_args(*args))
span.set_attribute("kwargs", serialize_kwargs(**kwargs))

result = wrapped(*args, **kwargs)
span.set_status(Status(StatusCode.OK))

try:
set_members(span, instance, module_name, class_name)
except Exception as e:
span.record_exception(e)

set_span_attribute(span, "result", result)

if class_name == "Crew":
for attr in ["tasks_output", "token_usage", "usage_metrics"]:
if hasattr(result, attr):
span.set_attribute(
f"crewai.crew.{attr}", str(getattr(result, attr))
)
except Exception as e:
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
raise
return result

return traced_func
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from portkey_ai.api_resources.instrumentation.langgraph.instrumentation import (
LanggraphInstrumentor,
)

__all__ = ["LanggraphInstrumentor"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import importlib.metadata
from typing import Any, Collection
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor # type: ignore [attr-defined]
from opentelemetry.trace import get_tracer
from wrapt import wrap_function_wrapper

from portkey_ai.api_resources.instrumentation.langgraph.patch import patch_langgraph


class LanggraphInstrumentor(BaseInstrumentor):
methods_to_patch = [
{
"module": "langgraph.graph.state",
"method": "StateGraph.add_node",
},
{
"module": "langgraph.graph.state",
"method": "StateGraph.add_edge",
},
{
"module": "langgraph.graph.state",
"method": "StateGraph.set_entry_point",
},
{
"module": "langgraph.graph.state",
"method": "StateGraph.set_finish_point",
},
{
"module": "langgraph.graph.state",
"method": "StateGraph.add_conditional_edges",
},
]

def instrumentation_dependencies(self) -> Collection[str]:
return ["langgraph >= 0.2.0"]

def _instrument(self, **kwargs: Any) -> None:
version = importlib.metadata.version("langgraph")
tracer_provider = kwargs.get("tracer_provider")
tracer = get_tracer(__name__, "", tracer_provider)
try:
for method in self.methods_to_patch:
wrap_function_wrapper(
module=method["module"],
name=method["method"],
wrapper=patch_langgraph(method["method"], version, tracer),
)
except Exception as e:
print(f"Failed to instrument Langgraph: {e}")

def _uninstrument(self, **kwargs: Any) -> None:
pass
Loading
Loading