Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions docs/my-website/docs/proxy/logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -2503,6 +2503,52 @@ curl --location 'http://0.0.0.0:4000/chat/completions' \
}'
```

## Helicone

We will use the `--config` to set

- `litellm.success_callback = ["helicone_v2"]`

**Step 1** Set Helicone API key

```shell
HELICONE_API_KEY = "your-helicone-api-key"
```

**Step 2**: Create a `config.yaml` file and set `litellm_settings`: `success_callback`

```yaml
model_list:
- model_name: gpt-3.5-turbo
litellm_params:
model: gpt-3.5-turbo
litellm_settings:
success_callback: ["helicone_v2"]
```

**Step 3**: Start the proxy, make a test request

Start proxy

```shell
litellm --config config.yaml --debug
```

Test Request

```
curl --location 'http://0.0.0.0:4000/chat/completions' \
--header 'Content-Type: application/json' \
--data ' {
"model": "gpt-3.5-turbo",
"messages": [
{
"role": "user",
"content": "which llm are you"
}
]
}'
```

<!-- ## (BETA) Moderation with Azure Content Safety

Expand Down
15 changes: 12 additions & 3 deletions litellm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@
"gitlab",
"cloudzero",
"posthog",
"helicone_v2",
]
configured_cold_storage_logger: Optional[
_custom_logger_compatible_callbacks_literal
Expand Down Expand Up @@ -263,7 +264,9 @@
ssl_verify: Union[str, bool] = True
ssl_security_level: Optional[str] = None
ssl_certificate: Optional[str] = None
ssl_ecdh_curve: Optional[str] = None # Set to 'X25519' to disable PQC and improve performance
ssl_ecdh_curve: Optional[
str
] = None # Set to 'X25519' to disable PQC and improve performance
disable_streaming_logging: bool = False
disable_token_counter: bool = False
disable_add_transform_inline_image_block: bool = False
Expand Down Expand Up @@ -412,7 +415,9 @@
from litellm.litellm_core_utils.get_model_cost_map import get_model_cost_map

model_cost = get_model_cost_map(url=model_cost_map_url)
cost_discount_config: Dict[str, float] = {} # Provider-specific cost discounts {"vertex_ai": 0.05} = 5% discount
cost_discount_config: Dict[
str, float
] = {} # Provider-specific cost discounts {"vertex_ai": 0.05} = 5% discount
custom_prompt_dict: Dict[str, dict] = {}
check_provider_endpoint = False

Expand Down Expand Up @@ -1172,7 +1177,9 @@ def add_known_models():
)
from .llms.cohere.chat.transformation import CohereChatConfig
from .llms.bedrock.embed.cohere_transformation import BedrockCohereEmbeddingConfig
from .llms.bedrock.embed.twelvelabs_marengo_transformation import TwelveLabsMarengoEmbeddingConfig
from .llms.bedrock.embed.twelvelabs_marengo_transformation import (
TwelveLabsMarengoEmbeddingConfig,
)
from .llms.openai.openai import OpenAIConfig, MistralEmbeddingConfig
from .llms.openai.image_variations.transformation import OpenAIImageVariationConfig
from .llms.deepinfra.chat.transformation import DeepInfraConfig
Expand Down Expand Up @@ -1377,9 +1384,11 @@ def set_global_bitbucket_config(config: Dict[str, Any]) -> None:
global global_bitbucket_config
global_bitbucket_config = config


### GLOBAL CONFIG ###
global_gitlab_config: Optional[Dict[str, Any]] = None


def set_global_gitlab_config(config: Dict[str, Any]) -> None:
"""Set global BitBucket configuration for prompt management."""
global global_gitlab_config
Expand Down
258 changes: 258 additions & 0 deletions litellm/integrations/helicone_v2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
"""
Helicone integration that leverages StandardLoggingPayload and supports batching via CustomBatchLogger.
"""

import asyncio
import json
import os
from typing import Any, Dict, Optional

import litellm
from litellm._logging import verbose_logger
from litellm.integrations.custom_batch_logger import CustomBatchLogger
from litellm.litellm_core_utils.safe_json_dumps import safe_dumps
from litellm.llms.custom_httpx.http_handler import (
get_async_httpx_client,
httpxSpecialProvider,
)
from litellm.types.utils import StandardLoggingPayload

__all__ = ["HeliconeLogger"]


class HeliconeLogger(CustomBatchLogger):
"""Batching Helicone logger that consumes the StandardLoggingPayload."""

def __init__(
self,
api_key: Optional[str] = None,
api_base: Optional[str] = None,
**kwargs: Any,
) -> None:
base = api_base or os.getenv("HELICONE_API_BASE") or "https://api.hconeai.com"
self.api_base = base[:-1] if base.endswith("/") else base
self.api_key = api_key or os.getenv("HELICONE_API_KEY")

self.async_httpx_client = get_async_httpx_client(
llm_provider=httpxSpecialProvider.LoggingCallback
)
self.flush_lock: Optional[asyncio.Lock] = None
try:
asyncio.create_task(self.periodic_flush())
self.flush_lock = asyncio.Lock()
except (
Exception
) as exc: # pragma: no cover - dependent on runtime loop availability
verbose_logger.debug(
"HeliconeLogger async batching disabled; running synchronously. %s",
exc,
)
self.flush_lock = None

super().__init__(flush_lock=self.flush_lock, **kwargs)

batch_size_override = os.getenv("HELICONE_BATCH_SIZE")
if batch_size_override:
try:
self.batch_size = int(batch_size_override)
except ValueError:
verbose_logger.debug(
"HeliconeLogger: ignoring invalid HELICONE_BATCH_SIZE=%s",
batch_size_override,
)

def log_success_event(
self,
kwargs: Dict[str, Any],
response_obj: Any,
start_time: Any,
end_time: Any,
) -> None:
try:
data = self._build_data(kwargs, response_obj, start_time, end_time)
if data is None:
return
self._send_sync(data)
except Exception:
verbose_logger.exception("HeliconeLogger: sync logging failed")

async def async_log_success_event(
self,
kwargs: Dict[str, Any],
response_obj: Any,
start_time: Any,
end_time: Any,
) -> None:
try:
data = self._build_data(kwargs, response_obj, start_time, end_time)
if data is None:
return

if self.flush_lock is None:
await self._send_async(data)
return

self.log_queue.append(data)
if len(self.log_queue) >= self.batch_size:
await self.flush_queue()
except Exception:
verbose_logger.exception("HeliconeLogger: async logging failed")

async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
try:
verbose_logger.debug(
"HeliconeLogger: Async logging - Enters logging function for model %s",
kwargs,
)
data = self._build_data(kwargs, response_obj, start_time, end_time)

if data is None:
return

if self.flush_lock is None:
await self._send_async(data)
return

self.log_queue.append(data)
if len(self.log_queue) >= self.batch_size:
await self.flush_queue()
except Exception as e:
verbose_logger.exception(f"HeliconeLogger Layer Error - {str(e)}")
pass

async def async_send_batch(self, *args: Any, **kwargs: Any) -> None:
if not self.log_queue:
return

events = list(self.log_queue)
for event in events:
try:
await self._send_async(event)
except Exception:
verbose_logger.exception(
"HeliconeLogger: failed to send batched Helicone event"
)

def _build_data(
self, kwargs: Dict[str, Any], response_obj: Any, start_time: Any, end_time: Any
) -> dict:
logging_payload: Optional[StandardLoggingPayload] = kwargs.get(
"standard_logging_object", None
)
if logging_payload is None:
raise ValueError("standard_logging_object not found in kwargs")

provider_url = logging_payload.get("api_base", "")
provider_request = self._pick_request_json(kwargs)
meta: dict = {}
providerRequest = {
"url": provider_url,
"json": provider_request,
"meta": meta,
}

# provider_response = logging_payload.get("response", {})
provider_response = self._pick_response(logging_payload)
# provider_response_header = self._pick_response_headers(logging_payload)
provider_response_status = self._pick_status_code(logging_payload)
provider_response = {
"json": provider_response,
"headers": {},
"status": provider_response_status,
}

start_time_seconds = int(start_time.timestamp())
start_time_milliseconds = int(
(start_time.timestamp() - start_time_seconds) * 1000
)
end_time_seconds = int(end_time.timestamp())
end_time_milliseconds = int((end_time.timestamp() - end_time_seconds) * 1000)
timing = {
"startTime": {
"seconds": start_time_seconds,
"milliseconds": start_time_milliseconds,
},
"endTime": {
"seconds": end_time_seconds,
"milliseconds": end_time_milliseconds,
},
}

payload_json = {
"providerRequest": providerRequest,
"providerResponse": provider_response,
"timing": timing,
}
return self._sanitize(payload_json)

def _pick_request_json(self, kwargs: Dict[str, Any]) -> Dict[str, Any]:
if kwargs:
additional_args = kwargs.get("additional_args") or {}
if isinstance(additional_args, dict):
complete_input_dict = additional_args.get("complete_input_dict")
if isinstance(complete_input_dict, dict):
return complete_input_dict
return {}

def _pick_response(self, logging_payload: StandardLoggingPayload) -> Any:
if logging_payload.get("status") == "success":
return logging_payload.get("response", {})
return logging_payload.get("error_str", {})

def _pick_response_headers(
self, logging_payload: StandardLoggingPayload
) -> Dict[str, Any]:
headers: Dict[str, Any] = {}
hidden_params = logging_payload.get("hidden_params")
if isinstance(hidden_params, dict):
provider_headers = hidden_params.get("response_headers")
if isinstance(provider_headers, dict):
headers.update(provider_headers)
return headers

def _pick_status_code(self, logging_payload: StandardLoggingPayload) -> int:
error_information = logging_payload.get("error_information") or {}
if isinstance(error_information, dict):
error_code = error_information.get("error_code")
if isinstance(error_code, str) and error_code:
return int(error_code)
return 200

@staticmethod
def _sanitize(payload: Dict[str, Any]) -> Dict[str, Any]:
"""Return a JSON-serializable representation of the payload."""
return json.loads(safe_dumps(payload))

def _send_sync(self, data: Dict[str, Any]) -> None:
url = f"{self.api_base}/custom/v1/log"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}

response = litellm.module_level_client.post(
url=url,
headers=headers,
json=data,
)
verbose_logger.debug(
"HeliconeLogger: logged Helicone event (status %s)",
getattr(response, "status_code", "unknown"),
)

async def _send_async(self, data: Dict[str, Any]) -> None:
url = f"{self.api_base}/custom/v1/log"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
response = await self.async_httpx_client.post(
url=url,
headers=headers,
json=data,
)
response.raise_for_status()
verbose_logger.debug(
"HeliconeLogger: logged Helicone event (status %s)",
response.status_code,
)
2 changes: 2 additions & 0 deletions litellm/litellm_core_utils/custom_logger_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from litellm.integrations.galileo import GalileoObserve
from litellm.integrations.gcs_bucket.gcs_bucket import GCSBucketLogger
from litellm.integrations.gcs_pubsub.pub_sub import GcsPubSubLogger
from litellm.integrations.helicone_v2 import HeliconeLogger
from litellm.integrations.humanloop import HumanloopLogger
from litellm.integrations.lago import LagoLogger
from litellm.integrations.langfuse.langfuse_prompt_management import (
Expand Down Expand Up @@ -96,6 +97,7 @@ class CustomLoggerRegistry:
"gitlab": GitLabPromptManager,
"cloudzero": CloudZeroLogger,
"posthog": PostHogLogger,
"helicone_v2": HeliconeLogger
}

try:
Expand Down
Loading
Loading