Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass down tracing contexts in SuperComponents #217

Closed
sjrl opened this issue Mar 4, 2025 · 6 comments
Closed

Pass down tracing contexts in SuperComponents #217

sjrl opened this issue Mar 4, 2025 · 6 comments
Assignees
Labels

Comments

@sjrl
Copy link
Contributor

sjrl commented Mar 4, 2025

Right now with Agents and SuperComponents it is difficult to disambiguate where traces come from since are now using nested pipelines.

We would like to essentially pass down an existing tracing context to a nested pipeline run to make it easier to understand the origin of a trace and know which nested pipeline the trace originated from.

cc @mathislucka

@julian-risch julian-risch added the P2 label Mar 4, 2025
@mathislucka mathislucka self-assigned this Mar 7, 2025
@mathislucka
Copy link
Member

@sjrl

Investigated this quite a bit.

For tracers that use the opentelemetry-sdk this should "just work". I tried with SuperComponent and Agent and the spans are correctly set as child spans of the main pipeline span.

Our langfuse integration does not support this out of the box but if we move from the langfuse-sdk to the opentelemetry-sdk, then this works out of the box. The implementation would probably also reduce maintenance effort for the langfuse integration.

One problem:
If we use AsyncPipeline instead it does not work and I didn't find a way to properly propagate context yet. If you have any experience with tracing in async environments then help would be appreciated.

@sjrl
Copy link
Contributor Author

sjrl commented Mar 7, 2025

Thanks for looking into this! Unfortunately, I don't have much experience with respect to propagating tracing context but it's possible that @wochinge might.

One question

Our langfuse integration does not support this out of the box but if we move from the langfuse-sdk to the opentelemetry-sdk, then this works out of the box.

on this point are you saying that it's possible to use the opentelemetry-sdk in our Langfuse tracer? Or would it be better to figure out how to update our Langfuse tracer to properly pass the parent context using the langfuse-sdk?

@mathislucka
Copy link
Member

It's possible to use the opentelemetry-sdk with langfuse.

This is how we can implement it. Any custom handling of any attributes could still be done inside of the LangfuseSpan.

# SPDX-FileCopyrightText: 2022-present deepset GmbH <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0

import contextlib
from typing import Any, Dict, Iterator, Optional, AsyncIterator

from haystack.tracing import Span
from haystack.tracing.opentelemetry import OpenTelemetrySpan, OpenTelemetryTracer

from haystack.tracing import tracer as proxy_tracer
from haystack.dataclasses import ChatMessage



class LangfuseSpan(OpenTelemetrySpan):
    """
    Internal class representing a bridge between the Haystack span tracing API and Langfuse.
    """
    def set_content_tag(self, key: str, value: Any) -> None:
        """
        Set a content-specific tag for this span.

        :param key: The content tag key.
        :param value: The content tag value.
        """
        if not proxy_tracer.is_content_tracing_enabled:
            return
        if key.endswith(".input"):
            if "messages" in value:
                messages = [m.to_openai_dict_format() for m in value["messages"]]
                self.set_tag("input.value", messages)
            else:
                self.set_tag("input.value", value)
        elif key.endswith(".output"):
            if "replies" in value:
                if all(isinstance(r, ChatMessage) for r in value["replies"]):
                    replies = [m.to_openai_dict_format() for m in value["replies"]]
                else:
                    replies = value["replies"]
                self.set_tag("output.value", replies)
            else:
                self.set_tag("output.value", value)



class LangfuseTracer(OpenTelemetryTracer):
    """
    Internal class representing a bridge between the Haystack tracer and Langfuse.
    """

    @contextlib.contextmanager
    def trace(
            self, operation_name: str, tags: Optional[Dict[str, Any]] = None, parent_span: Optional[Span] = None
    ) -> Iterator[Span]:
        """Activate and return a new span that inherits from the current active span."""

        with self._tracer.start_as_current_span(operation_name) as raw_span:
            span = LangfuseSpan(raw_span)
            if tags:
                span.set_tags(tags)

            yield span

This is how you would configure the tracer:

import os
import base64


from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import Resource
from opentelemetry.semconv.resource import ResourceAttributes

from haystack import tracing
from haystack.tracing.langfuse import LangfuseTracer
from haystack.dataclasses import ChatMessage



# Service name is required for most backends
resource = Resource(attributes={
    ResourceAttributes.SERVICE_NAME: "haystack"  # Correct constant
})

tracer_provider = TracerProvider(resource=resource)

LANGFUSE_PUBLIC_KEY = os.getenv("LANGFUSE_PUBLIC_KEY")
LANGFUSE_SECRET_KEY = os.getenv("LANGFUSE_SECRET_KEY")
LANGFUSE_AUTH = base64.b64encode(f"{LANGFUSE_PUBLIC_KEY}:{LANGFUSE_SECRET_KEY}".encode()).decode()
exporter = OTLPSpanExporter(
         endpoint=f"{os.getenv('LANGFUSE_OTEL_API')}/v1/traces",
         headers={"Authorization": f"Basic {LANGFUSE_AUTH}"},
     )

processor = BatchSpanProcessor(exporter)
tracer_provider.add_span_processor(processor)
trace.set_tracer_provider(tracer_provider)

tracer = tracer_provider.get_tracer("my_application")
tracing.enable_tracing(LangfuseTracer(tracer))
tracing.tracer.is_content_tracing_enabled = True

The configuration code could go into the LangfuseConnector so that we hide the complexity from the user.

I tried the same approach with pydantic's logfire and that also works out of the box with the opentelemetry-sdk (same for Arize probably).

@LastRemote
Copy link

@mathislucka Do you know why the current Langfuse tracer doesn't work for SuperComponents? I'm still understanding this.

@sjrl sjrl changed the title Pass down tracing contexts in Agents and SuperComponents Pass down tracing contexts in SuperComponents Apr 4, 2025
@sjrl
Copy link
Contributor Author

sjrl commented Apr 8, 2025

Hey this issue (at least for normal Pipeline + SuperComponents) is resolved by this PR

I'll be investigating how to get AsyncPipeline + SuperComponents to work in this issue so closing this one to reduce duplicates.

@sjrl sjrl closed this as completed Apr 8, 2025
@sjrl sjrl self-assigned this Apr 8, 2025
@sjrl
Copy link
Contributor Author

sjrl commented Apr 11, 2025

This works now in langfuse-haystack==0.10.1 here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants