Skip to content

feat: Unify traces of sub-pipelines within pipelines with Langfuse #1624

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 11, 2025

Conversation

sjrl
Copy link
Contributor

@sjrl sjrl commented Apr 8, 2025

Related Issues

Proposed Changes:

  • Correctly pass the parent_span in the situation we have a sub-pipeline within a pipeline

How did you test it?

  • Added integration test and tested locally
  • Also expanded unit tests and did some refactoring

Notes for the reviewer

Here is some example code

import os

os.environ["OPENAI_API_KEY"] = ""
os.environ["LANGFUSE_SECRET_KEY"] = ""
os.environ["LANGFUSE_PUBLIC_KEY"] = ""
os.environ["HAYSTACK_CONTENT_TRACING_ENABLED"] = "true"

from haystack import Pipeline, SuperComponent
from haystack.components.builders import ChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
from haystack.utils.auth import Secret

from haystack_integrations.components.connectors.langfuse import LangfuseConnector


sub_pipe = Pipeline()
sub_pipe.add_component("llm", OpenAIChatGenerator())
super_llm = SuperComponent(
    pipeline=sub_pipe,
)

top_sub_pipe = Pipeline()
top_sub_pipe.add_component("prompt_builder", ChatPromptBuilder())
top_sub_pipe.add_component("super_llm", super_llm)
top_sub_pipe.connect("prompt_builder.prompt", "super_llm.messages")
super_prompt_and_llm = SuperComponent(
    pipeline=top_sub_pipe,
)

# Main pipeline
pipe = Pipeline()
pipe.add_component("tracer", LangfuseConnector("Sub-Sub Pipeline Test"))
pipe.add_component("super_prompt_and_llm", super_prompt_and_llm)

messages = [
    ChatMessage.from_system("Always respond in German even if some input data is in other languages."),
    ChatMessage.from_user("Tell me about {{location}}"),
]

response = pipe.run(
    data={
        "super_prompt_and_llm": {
            "template_variables": {"location": "Berlin"},
            "template": messages,
        },
        "tracer": {
            "invocation_context": {"some_key": "some_value"},
        },
    }
)
print(response["super_prompt_and_llm"]["replies"][0])
print(response["tracer"]["trace_url"])

which produces this trace in Langfuse

Screenshot 2025-04-08 at 11 49 28

Checklist

@sjrl sjrl requested a review from a team as a code owner April 8, 2025 09:49
@sjrl sjrl requested review from Amnah199 and removed request for a team April 8, 2025 09:49
@github-actions github-actions bot added integration:langfuse type:documentation Improvements or additions to documentation labels Apr 8, 2025
assert langfuse_connector.public_key == Secret.from_env_var("LANGFUSE_PUBLIC_KEY")
assert isinstance(langfuse_connector.span_handler, CustomSpanHandler)

def test_pipeline_serialization(self, monkeypatch):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was moved from the test_tracing.py file

@@ -1,71 +0,0 @@
# SPDX-FileCopyrightText: 2023-present deepset GmbH <[email protected]>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file got refactored into the test_tracer.py file, following our standard practice of having the test files follow the same name structure as our module files.

@pytest.mark.integration
def test_custom_span_handler():
"""Test that custom span handler properly sets Langfuse levels."""
def test_tracing_with_sub_pipelines():
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the new test, that checks that when using sub-pipelines within a pipeline we correctly collect the traces together.

tags=tags,
# We use the current active span as the parent span if not provided to handle nested pipelines
# The nested pipeline (or sub-pipeline) will be a child of the current active span
parent_span=parent_span or self.current_span(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is what fixed the issue

@sjrl
Copy link
Contributor Author

sjrl commented Apr 8, 2025

pinging @vblagoje in case you are interested

@vblagoje
Copy link
Member

vblagoje commented Apr 9, 2025

Very nice @sjrl

new_pipe = Pipeline.from_dict(serialized)

# Verify pipeline is the same
assert new_pipe == pipe
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great to see that you've introduced multiple test cases. Maybe in future we can cover some edge casestest_invalid_span_handler to verify how the system behaves. But its your call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! I think that is something we can do in the future.

Copy link
Contributor

@Amnah199 Amnah199 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work! Although I may not have full context on Langfuse, I reviewed the code based on my understanding. I am guessing @vblagoje has also had a chance to take a look.

@sjrl sjrl merged commit 900db7e into main Apr 11, 2025
10 checks passed
@sjrl sjrl deleted the langfuse-sub branch April 11, 2025 05:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
integration:langfuse type:documentation Improvements or additions to documentation
Projects
None yet
3 participants