Skip to content

feat: Unify traces of sub-pipelines within pipelines with Langfuse #1624

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ async def shutdown_event():
)
print(response["llm"]["replies"][0])
print(response["tracer"]["trace_url"])
print(response["tracer"]["trace_id"])
```

For advanced use cases, you can also customize how spans are created and processed by
Expand Down Expand Up @@ -175,6 +176,7 @@ def run(self, invocation_context: Optional[Dict[str, Any]] = None):
:returns: A dictionary with the following keys:
- `name`: The name of the tracing component.
- `trace_url`: The URL to the tracing data.
- `trace_id`: The ID of the trace.
"""
logger.debug(
"Langfuse tracer invoked with the following context: '{invocation_context}'",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
# We keep them here to avoid making typos when using them.
_PIPELINE_INPUT_KEY = "haystack.pipeline.input_data"
_PIPELINE_OUTPUT_KEY = "haystack.pipeline.output_data"
_ASYNC_PIPELINE_RUN_KEY = "haystack.async_pipeline.run"
_PIPELINE_RUN_KEY = "haystack.pipeline.run"
_COMPONENT_NAME_KEY = "haystack.component.name"
_COMPONENT_TYPE_KEY = "haystack.component.type"
Expand Down Expand Up @@ -250,12 +251,16 @@ class DefaultSpanHandler(SpanHandler):

def create_span(self, context: SpanContext) -> LangfuseSpan:
if self.tracer is None:
message = "Tracer is not initialized"
message = (
"Tracer is not initialized. "
"Make sure the environment variable HAYSTACK_CONTENT_TRACING_ENABLED is set to true before "
"importing Haystack."
)
raise RuntimeError(message)

tracing_ctx = tracing_context_var.get({})
if not context.parent_span:
if context.operation_name != _PIPELINE_RUN_KEY:
if context.operation_name not in [_PIPELINE_RUN_KEY, _ASYNC_PIPELINE_RUN_KEY]:
logger.warning(
"Creating a new trace without a parent span is not recommended for operation '{operation_name}'.",
operation_name=context.operation_name,
Expand Down Expand Up @@ -353,19 +358,22 @@ def trace(
span_name = tags.get(_COMPONENT_NAME_KEY, operation_name)
component_type = tags.get(_COMPONENT_TYPE_KEY)

# Create span using the handler
span = self._span_handler.create_span(
SpanContext(
name=span_name,
operation_name=operation_name,
component_type=component_type,
tags=tags,
parent_span=parent_span,
trace_name=self._name,
public=self._public,
)
# Create a new span context
span_context = SpanContext(
name=span_name,
operation_name=operation_name,
component_type=component_type,
tags=tags,
# We use the current active span as the parent span if not provided to handle nested pipelines
# The nested pipeline (or sub-pipeline) will be a child of the current active span
parent_span=parent_span or self.current_span(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is what fixed the issue

trace_name=self._name,
public=self._public,
)

# Create span using the handler
span = self._span_handler.create_span(span_context)

self._context.append(span)
span.set_tags(tags)

Expand All @@ -374,11 +382,11 @@ def trace(
# Let the span handler process the span
self._span_handler.handle(span, component_type)

raw_span = span.raw_span()

# In this section, we finalize both regular spans and generation spans created using the LangfuseSpan class.
# It's important to end() these spans to ensure they are properly closed and all relevant data is recorded.
# Note that we do not call end() on the main trace span itself, as its lifecycle is managed differently.
# Note that we do not call end() on the main trace span itself (StatefulTraceClient), as its lifecycle is
# managed differently.
raw_span = span.raw_span()
if isinstance(raw_span, (StatefulSpanClient, StatefulGenerationClient)):
raw_span.end()
self._context.pop()
Expand Down
213 changes: 213 additions & 0 deletions integrations/langfuse/tests/test_langfuse_connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
# SPDX-FileCopyrightText: 2023-present deepset GmbH <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0

import os

os.environ["HAYSTACK_CONTENT_TRACING_ENABLED"] = "true"

from unittest.mock import Mock

from haystack import Pipeline
from haystack.components.builders import ChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.utils import Secret

from haystack_integrations.components.connectors.langfuse import LangfuseConnector
from haystack_integrations.tracing.langfuse import DefaultSpanHandler


class CustomSpanHandler(DefaultSpanHandler):
def handle(self, span, component_type=None):
pass


class TestLangfuseConnector:
def test_run(self, monkeypatch):
monkeypatch.setenv("LANGFUSE_SECRET_KEY", "secret")
monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "public")

langfuse_connector = LangfuseConnector(
name="Chat example - OpenAI",
public=True,
secret_key=Secret.from_env_var("LANGFUSE_SECRET_KEY"),
public_key=Secret.from_env_var("LANGFUSE_PUBLIC_KEY"),
)

mock_tracer = Mock()
mock_tracer.get_trace_url.return_value = "https://example.com/trace"
mock_tracer.get_trace_id.return_value = "12345"
langfuse_connector.tracer = mock_tracer

response = langfuse_connector.run(invocation_context={"some_key": "some_value"})
assert response["name"] == "Chat example - OpenAI"
assert response["trace_url"] == "https://example.com/trace"
assert response["trace_id"] == "12345"

def test_to_dict(self, monkeypatch):
monkeypatch.setenv("LANGFUSE_SECRET_KEY", "secret")
monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "public")

langfuse_connector = LangfuseConnector(name="Chat example - OpenAI")
serialized = langfuse_connector.to_dict()

assert serialized == {
"type": "haystack_integrations.components.connectors.langfuse.langfuse_connector.LangfuseConnector",
"init_parameters": {
"name": "Chat example - OpenAI",
"public": False,
"secret_key": {
"type": "env_var",
"env_vars": ["LANGFUSE_SECRET_KEY"],
"strict": True,
},
"public_key": {
"type": "env_var",
"env_vars": ["LANGFUSE_PUBLIC_KEY"],
"strict": True,
},
"span_handler": None,
},
}

def test_to_dict_with_params(self, monkeypatch):
monkeypatch.setenv("LANGFUSE_SECRET_KEY", "secret")
monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "public")

langfuse_connector = LangfuseConnector(
name="Chat example - OpenAI",
public=True,
secret_key=Secret.from_env_var("LANGFUSE_SECRET_KEY"),
public_key=Secret.from_env_var("LANGFUSE_PUBLIC_KEY"),
span_handler=CustomSpanHandler(),
)

serialized = langfuse_connector.to_dict()
assert serialized == {
"type": "haystack_integrations.components.connectors.langfuse.langfuse_connector.LangfuseConnector",
"init_parameters": {
"name": "Chat example - OpenAI",
"public": True,
"secret_key": {
"type": "env_var",
"env_vars": ["LANGFUSE_SECRET_KEY"],
"strict": True,
},
"public_key": {
"type": "env_var",
"env_vars": ["LANGFUSE_PUBLIC_KEY"],
"strict": True,
},
"span_handler": {
"type": "tests.test_langfuse_connector.CustomSpanHandler",
"data": {
"type": "tests.test_langfuse_connector.CustomSpanHandler",
"init_parameters": {},
},
},
},
}

def test_from_dict(self, monkeypatch):
monkeypatch.setenv("LANGFUSE_SECRET_KEY", "secret")
monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "public")

data = {
"type": "haystack_integrations.components.connectors.langfuse.langfuse_connector.LangfuseConnector",
"init_parameters": {
"name": "Chat example - OpenAI",
"public": False,
"secret_key": {
"type": "env_var",
"env_vars": ["LANGFUSE_SECRET_KEY"],
"strict": True,
},
"public_key": {
"type": "env_var",
"env_vars": ["LANGFUSE_PUBLIC_KEY"],
"strict": True,
},
"span_handler": None,
},
}
langfuse_connector = LangfuseConnector.from_dict(data)
assert langfuse_connector.name == "Chat example - OpenAI"
assert langfuse_connector.public is False
assert langfuse_connector.secret_key == Secret.from_env_var("LANGFUSE_SECRET_KEY")
assert langfuse_connector.public_key == Secret.from_env_var("LANGFUSE_PUBLIC_KEY")
assert langfuse_connector.span_handler is None

def test_from_dict_with_params(self, monkeypatch):
monkeypatch.setenv("LANGFUSE_SECRET_KEY", "secret")
monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "public")

data = {
"type": "haystack_integrations.components.connectors.langfuse.langfuse_connector.LangfuseConnector",
"init_parameters": {
"name": "Chat example - OpenAI",
"public": True,
"secret_key": {
"type": "env_var",
"env_vars": ["LANGFUSE_SECRET_KEY"],
"strict": True,
},
"public_key": {
"type": "env_var",
"env_vars": ["LANGFUSE_PUBLIC_KEY"],
"strict": True,
},
"span_handler": {
"type": "tests.test_langfuse_connector.CustomSpanHandler",
"data": {
"type": "tests.test_langfuse_connector.CustomSpanHandler",
"init_parameters": {},
},
},
},
}

langfuse_connector = LangfuseConnector.from_dict(data)
assert langfuse_connector.name == "Chat example - OpenAI"
assert langfuse_connector.public is True
assert langfuse_connector.secret_key == Secret.from_env_var("LANGFUSE_SECRET_KEY")
assert langfuse_connector.public_key == Secret.from_env_var("LANGFUSE_PUBLIC_KEY")
assert isinstance(langfuse_connector.span_handler, CustomSpanHandler)

def test_pipeline_serialization(self, monkeypatch):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was moved from the test_tracing.py file

# Set test env vars
monkeypatch.setenv("LANGFUSE_SECRET_KEY", "secret")
monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "public")
monkeypatch.setenv("OPENAI_API_KEY", "openai_api_key")

# Create pipeline with OpenAI LLM
pipe = Pipeline()
pipe.add_component(
"tracer",
LangfuseConnector(
name="Chat example - OpenAI",
public=True,
secret_key=Secret.from_env_var("LANGFUSE_SECRET_KEY"),
public_key=Secret.from_env_var("LANGFUSE_PUBLIC_KEY"),
),
)
pipe.add_component("prompt_builder", ChatPromptBuilder())
pipe.add_component("llm", OpenAIChatGenerator())
pipe.connect("prompt_builder.prompt", "llm.messages")

# Serialize
serialized = pipe.to_dict()

# Check serialized secrets
tracer_params = serialized["components"]["tracer"]["init_parameters"]
assert isinstance(tracer_params["secret_key"], dict)
assert tracer_params["secret_key"]["type"] == "env_var"
assert tracer_params["secret_key"]["env_vars"] == ["LANGFUSE_SECRET_KEY"]
assert isinstance(tracer_params["public_key"], dict)
assert tracer_params["public_key"]["type"] == "env_var"
assert tracer_params["public_key"]["env_vars"] == ["LANGFUSE_PUBLIC_KEY"]

# Deserialize
new_pipe = Pipeline.from_dict(serialized)

# Verify pipeline is the same
assert new_pipe == pipe
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great to see that you've introduced multiple test cases. Maybe in future we can cover some edge casestest_invalid_span_handler to verify how the system behaves. But its your call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! I think that is something we can do in the future.

71 changes: 0 additions & 71 deletions integrations/langfuse/tests/test_langfuse_span.py

This file was deleted.

Loading