Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 112 additions & 1 deletion backend/app/core/langfuse/langfuse.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import uuid
import logging
from typing import Any, Dict, Optional
from typing import Any, Callable, Dict, Optional
from functools import wraps

from asgi_correlation_id import correlation_id
from langfuse import Langfuse
from langfuse.client import StatefulGenerationClient, StatefulTraceClient
from app.models.llm import CompletionConfig, QueryParams, LLMCallResponse

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -107,3 +109,112 @@ def log_error(self, error_message: str, response_id: Optional[str] = None):

def flush(self):
self.langfuse.flush()


def observe_llm_execution(
session_id: str | None = None,
credentials: dict | None = None,
):
"""Decorator to add Langfuse observability to LLM provider execute methods.

Args:
credentials: Langfuse credentials with public_key, secret_key, and host
session_id: Session ID for grouping traces (conversation_id)

Usage:
decorated_execute = observe_llm_execution(
credentials=langfuse_creds,
session_id=conversation_id
)(provider_instance.execute)
"""

def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(completion_config: CompletionConfig, query: QueryParams, **kwargs):
# Skip observability if no credentials provided
if not credentials or not all(
key in credentials for key in ["public_key", "secret_key", "host"]
):
logger.info("[Langfuse] No credentials - skipping observability")
return func(completion_config, query, **kwargs)

try:
langfuse = Langfuse(
public_key=credentials.get("public_key"),
secret_key=credentials.get("secret_key"),
host=credentials.get("host"),
)
except Exception as e:
logger.warning(f"[Langfuse] Failed to initialize client: {e}")
return func(completion_config, query, **kwargs)

trace_metadata = {
"provider": completion_config.provider,
}

if query.conversation and query.conversation.id:
trace_metadata["conversation_id"] = query.conversation.id

trace = langfuse.trace(
name="unified-llm-call",
input=query.input,
metadata=trace_metadata,
tags=[completion_config.provider],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is provider detail being repeated both in metadata and tags

)

generation = trace.generation(
name=f"{completion_config.provider}-completion",
input=query.input,
model=completion_config.params.get("model"),
)

try:
# Execute the actual LLM call
response: LLMCallResponse | None
error: str | None
response, error = func(completion_config, query, **kwargs)

if response:
generation.end(
output={
"status": "success",
"output": response.response.output.text,
},
usage_details={
"input": response.usage.input_tokens,
"output": response.usage.output_tokens,
},
model=response.response.model,
)

trace.update(
output={
"status": "success",
"output": response.response.output.text,
},
session_id=session_id or response.response.conversation_id,
)
else:
error_msg = error or "Unknown error"
generation.end(output={"error": error_msg})
trace.update(
output={"status": "failure", "error": error_msg},
session_id=session_id,
)

langfuse.flush()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe you can a function for marking the status and error, and then use that function here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right now,
keeping it simple for extensibility

return response, error

except Exception as e:
error_msg = str(e)
generation.end(output={"error": error_msg})
trace.update(
output={"status": "failure", "error": error_msg},
session_id=session_id,
)
langfuse.flush()
raise

return wrapper

return decorator
22 changes: 21 additions & 1 deletion backend/app/services/llm/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@

from app.core.db import engine
from app.crud.config import ConfigVersionCrud
from app.crud.credentials import get_provider_credential
from app.crud.jobs import JobCrud
from app.models import JobStatus, JobType, JobUpdate, LLMCallRequest
from app.models.llm.request import ConfigBlob, LLMCallConfig
from app.utils import APIResponse, send_callback
from app.celery.utils import start_high_priority_job
from app.core.langfuse.langfuse import observe_llm_execution
from app.services.llm.providers.registry import get_llm_provider


Expand Down Expand Up @@ -182,7 +184,25 @@ def execute_job(
)
return handle_job_error(job_id, request.callback_url, callback_response)

response, error = provider_instance.execute(
langfuse_credentials = get_provider_credential(
session=session,
org_id=organization_id,
project_id=project_id,
provider="langfuse",
)

# Extract conversation_id for langfuse session grouping
conversation_id = None
if request.query.conversation and request.query.conversation.id:
conversation_id = request.query.conversation.id

# Apply Langfuse observability decorator to provider execute method
decorated_execute = observe_llm_execution(
credentials=langfuse_credentials,
session_id=conversation_id,
)(provider_instance.execute)

response, error = decorated_execute(
completion_config=config_blob.completion,
query=request.query,
include_provider_raw_response=request.include_provider_raw_response,
Expand Down