Skip to content

AsyncPipeline Creates Multiple Traces Instead of a Single Unified Trace in Langfuse #1604

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
immortal3 opened this issue Apr 4, 2025 · 9 comments · Fixed by #1624
Closed
Assignees
Labels
bug Something isn't working P1

Comments

@immortal3
Copy link

immortal3 commented Apr 4, 2025

Describe the Bug:
When using the AsyncPipeline instead of the synchronous Pipeline, multiple trace entries are created in Langfuse for what should be a single execution of the pipeline. This behavior makes it difficult to get a unified view of the request flow.

Additionally, the following warning is logged during execution:

WARNING: 2025-04-03,17:49:55: tracer.py:243: Creating a new trace without a parent span is not recommended for operation 'haystack.async_pipeline.run'.

This suggests that the trace context is not being properly propagated across async components or pipeline runs.


Code to Reproduce:

from haystack import AsyncPipeline
from haystack_integrations.components.tracers.langfuse import LangfuseConnector

pipeline = AsyncPipeline()

# Add components (simplified example)
pipeline.add_component("tracer", LangfuseConnector(name="Async Pipeline"))
pipeline.add_component("retriever", SomeRetrieverComponent())
pipeline.add_component("generator", SomeGeneratorComponent())

# Define connections
pipeline.connect("retriever", "generator")

# Run the async pipeline
await pipeline.run({"retriever": {"query": "What is Haystack?"}})

Observed Behavior:

  • Multiple traces appear in Langfuse for a single pipeline run.
  • The LangfuseConnector logs a warning about missing parent span context.
  • Lack of unified tracing breaks end-to-end visibility for async workflows.

Expected Behavior:
A single trace should be created and propagated throughout the entire async pipeline execution, just like in the synchronous Pipeline. This would ensure consistent observability and debugging in Langfuse.

Image

Describe your environment (please complete the following information):

  • OS: Linux, WSL
  • haystack-ai==2.12.0
  • langfuse-haystack==0.8.0
@immortal3 immortal3 added the bug Something isn't working label Apr 4, 2025
@immortal3 immortal3 changed the title Multiple Traces Creation with AsyncPipeline (instead of Pipeline) Multiple Traces while using AsyncPipeline (compared to Pipeline) Apr 4, 2025
@immortal3 immortal3 changed the title Multiple Traces while using AsyncPipeline (compared to Pipeline) AsyncPipeline Creates Multiple Traces Instead of a Single Unified Trace in Langfuse Apr 4, 2025
@sjrl
Copy link
Contributor

sjrl commented Apr 4, 2025

hey @immortal3 thanks for raising this! To help with the debugging could you tell us specifically which Retriever and Generator you were using?

It sounds like this could be related to this issue where the parent context is also not being passed down correctly when using the OpenTelemetry tracer when using AsyncPipeline.

@sjrl sjrl added the P1 label Apr 4, 2025
@immortal3
Copy link
Author

@sjrl We don't use any native Haystack Components. As i mentioned in #1605 , we have lot of custom components and sub-pipelines. Our custom components are basically as following.

@component
class CustomComp:
      @component.output_types()
     def run(self, a):

@sjrl
Copy link
Contributor

sjrl commented Apr 4, 2025

@immortal3 ahh okay good to know. In that case I'd like to confirm that your custom components don't have a run_async method implemented in them right?

So nothing like

@component
class CustomComp:
    @component.output_types()
    def run(self, a):
        ...
    @component.output_types()
    async def run_async(self, a):
        ...

@immortal3
Copy link
Author

Yes, no run_async as of now.

@sjrl
Copy link
Contributor

sjrl commented Apr 8, 2025

Hey @immortal3 I'm struggling to reproduce your issue. Here is the example script I've tried

import asyncio
import logging

logging.basicConfig(format="%(levelname)s - %(name)s -  %(message)s", level=logging.WARNING)
logging.getLogger("haystack").setLevel(logging.INFO)

import os

os.environ["OPENAI_API_KEY"] = ""
os.environ["LANGFUSE_SECRET_KEY"] = ""
os.environ["LANGFUSE_PUBLIC_KEY"] = ""
os.environ["HAYSTACK_CONTENT_TRACING_ENABLED"] = "true"

from haystack import AsyncPipeline
from haystack.components.builders import ChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage

from haystack_integrations.components.connectors.langfuse import LangfuseConnector


pipe = AsyncPipeline()
pipe.add_component("tracer", LangfuseConnector("Async Pipeline Test"))
pipe.add_component("prompt_builder", ChatPromptBuilder())
pipe.add_component("llm", OpenAIChatGenerator())

pipe.connect("prompt_builder.prompt", "llm.messages")

messages = [
    ChatMessage.from_system("Always respond in German even if some input data is in other languages."),
    ChatMessage.from_user("Tell me about {{location}}"),
]

# use sync run
# response = pipe.run(
#     data={
#         "prompt_builder": {
#             "template_variables": {"location": "Berlin"},
#             "template": messages,
#         },
#         "tracer": {
#             "invocation_context": {"some_key": "some_value"},
#         },
#     }
# )
# print(response["llm"]["replies"][0])
# print(response["tracer"]["trace_url"])


# use async run
async def run_async():
    resp = await pipe.run_async(
        data={
            "prompt_builder": {
                "template_variables": {"location": "Berlin"},
                "template": messages,
            },
            "tracer": {
                "invocation_context": {"some_key": "some_value"},
            },
        }
    )
    print(resp["llm"]["replies"][0])
    print(resp["tracer"]["trace_url"])

asyncio.run(run_async())

and when running this I end up with the following trace

Image

so everything looks to be as expected.

Could you provide a minimal working example that caused your issue? Is it possible your problem resulted from using components that were using sub-pipelines like you mentioned in your other issue?

@sjrl
Copy link
Contributor

sjrl commented Apr 8, 2025

And to clarify this

The LangfuseConnector logs a warning about missing parent span context.

was a false positive. The warning message shouldn't have been shown. This if statement

if context.operation_name != _PIPELINE_RUN_KEY:

needs to be updated to

if context.operation_name not in [_PIPELINE_RUN_KEY, _ASYNC_PIPELINE_RUN_KEY]:

@immortal3
Copy link
Author

@sjrl Can't reproduce on Single Pipeline and checked traces, it mainly creates new traces for Sub pipelines. This is helpful.

@sjrl
Copy link
Contributor

sjrl commented Apr 8, 2025

@immortal3 thanks for the info! I'll focus on sub-pipelines then to see if the fix here is enough or if more needs to be done.

@sjrl
Copy link
Contributor

sjrl commented Apr 11, 2025

Hey @immortal3 this has been merged now and is available in langfuse-haystack==0.10.1 here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working P1
Projects
None yet
2 participants