diff --git a/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py b/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py index 43964f514..7e9949273 100644 --- a/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py +++ b/integrations/langfuse/src/haystack_integrations/components/connectors/langfuse/langfuse_connector.py @@ -97,6 +97,7 @@ async def shutdown_event(): ) print(response["llm"]["replies"][0]) print(response["tracer"]["trace_url"]) + print(response["tracer"]["trace_id"]) ``` For advanced use cases, you can also customize how spans are created and processed by @@ -175,6 +176,7 @@ def run(self, invocation_context: Optional[Dict[str, Any]] = None): :returns: A dictionary with the following keys: - `name`: The name of the tracing component. - `trace_url`: The URL to the tracing data. + - `trace_id`: The ID of the trace. """ logger.debug( "Langfuse tracer invoked with the following context: '{invocation_context}'", diff --git a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py index cf10ed6eb..4b44fefe6 100644 --- a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py +++ b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py @@ -50,6 +50,7 @@ # We keep them here to avoid making typos when using them. _PIPELINE_INPUT_KEY = "haystack.pipeline.input_data" _PIPELINE_OUTPUT_KEY = "haystack.pipeline.output_data" +_ASYNC_PIPELINE_RUN_KEY = "haystack.async_pipeline.run" _PIPELINE_RUN_KEY = "haystack.pipeline.run" _COMPONENT_NAME_KEY = "haystack.component.name" _COMPONENT_TYPE_KEY = "haystack.component.type" @@ -250,12 +251,16 @@ class DefaultSpanHandler(SpanHandler): def create_span(self, context: SpanContext) -> LangfuseSpan: if self.tracer is None: - message = "Tracer is not initialized" + message = ( + "Tracer is not initialized. " + "Make sure the environment variable HAYSTACK_CONTENT_TRACING_ENABLED is set to true before " + "importing Haystack." + ) raise RuntimeError(message) tracing_ctx = tracing_context_var.get({}) if not context.parent_span: - if context.operation_name != _PIPELINE_RUN_KEY: + if context.operation_name not in [_PIPELINE_RUN_KEY, _ASYNC_PIPELINE_RUN_KEY]: logger.warning( "Creating a new trace without a parent span is not recommended for operation '{operation_name}'.", operation_name=context.operation_name, @@ -353,19 +358,22 @@ def trace( span_name = tags.get(_COMPONENT_NAME_KEY, operation_name) component_type = tags.get(_COMPONENT_TYPE_KEY) - # Create span using the handler - span = self._span_handler.create_span( - SpanContext( - name=span_name, - operation_name=operation_name, - component_type=component_type, - tags=tags, - parent_span=parent_span, - trace_name=self._name, - public=self._public, - ) + # Create a new span context + span_context = SpanContext( + name=span_name, + operation_name=operation_name, + component_type=component_type, + tags=tags, + # We use the current active span as the parent span if not provided to handle nested pipelines + # The nested pipeline (or sub-pipeline) will be a child of the current active span + parent_span=parent_span or self.current_span(), + trace_name=self._name, + public=self._public, ) + # Create span using the handler + span = self._span_handler.create_span(span_context) + self._context.append(span) span.set_tags(tags) @@ -374,11 +382,11 @@ def trace( # Let the span handler process the span self._span_handler.handle(span, component_type) - raw_span = span.raw_span() - # In this section, we finalize both regular spans and generation spans created using the LangfuseSpan class. # It's important to end() these spans to ensure they are properly closed and all relevant data is recorded. - # Note that we do not call end() on the main trace span itself, as its lifecycle is managed differently. + # Note that we do not call end() on the main trace span itself (StatefulTraceClient), as its lifecycle is + # managed differently. + raw_span = span.raw_span() if isinstance(raw_span, (StatefulSpanClient, StatefulGenerationClient)): raw_span.end() self._context.pop() diff --git a/integrations/langfuse/tests/test_langfuse_connector.py b/integrations/langfuse/tests/test_langfuse_connector.py new file mode 100644 index 000000000..90cdbf90f --- /dev/null +++ b/integrations/langfuse/tests/test_langfuse_connector.py @@ -0,0 +1,213 @@ +# SPDX-FileCopyrightText: 2023-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +import os + +os.environ["HAYSTACK_CONTENT_TRACING_ENABLED"] = "true" + +from unittest.mock import Mock + +from haystack import Pipeline +from haystack.components.builders import ChatPromptBuilder +from haystack.components.generators.chat import OpenAIChatGenerator +from haystack.utils import Secret + +from haystack_integrations.components.connectors.langfuse import LangfuseConnector +from haystack_integrations.tracing.langfuse import DefaultSpanHandler + + +class CustomSpanHandler(DefaultSpanHandler): + def handle(self, span, component_type=None): + pass + + +class TestLangfuseConnector: + def test_run(self, monkeypatch): + monkeypatch.setenv("LANGFUSE_SECRET_KEY", "secret") + monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "public") + + langfuse_connector = LangfuseConnector( + name="Chat example - OpenAI", + public=True, + secret_key=Secret.from_env_var("LANGFUSE_SECRET_KEY"), + public_key=Secret.from_env_var("LANGFUSE_PUBLIC_KEY"), + ) + + mock_tracer = Mock() + mock_tracer.get_trace_url.return_value = "https://example.com/trace" + mock_tracer.get_trace_id.return_value = "12345" + langfuse_connector.tracer = mock_tracer + + response = langfuse_connector.run(invocation_context={"some_key": "some_value"}) + assert response["name"] == "Chat example - OpenAI" + assert response["trace_url"] == "https://example.com/trace" + assert response["trace_id"] == "12345" + + def test_to_dict(self, monkeypatch): + monkeypatch.setenv("LANGFUSE_SECRET_KEY", "secret") + monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "public") + + langfuse_connector = LangfuseConnector(name="Chat example - OpenAI") + serialized = langfuse_connector.to_dict() + + assert serialized == { + "type": "haystack_integrations.components.connectors.langfuse.langfuse_connector.LangfuseConnector", + "init_parameters": { + "name": "Chat example - OpenAI", + "public": False, + "secret_key": { + "type": "env_var", + "env_vars": ["LANGFUSE_SECRET_KEY"], + "strict": True, + }, + "public_key": { + "type": "env_var", + "env_vars": ["LANGFUSE_PUBLIC_KEY"], + "strict": True, + }, + "span_handler": None, + }, + } + + def test_to_dict_with_params(self, monkeypatch): + monkeypatch.setenv("LANGFUSE_SECRET_KEY", "secret") + monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "public") + + langfuse_connector = LangfuseConnector( + name="Chat example - OpenAI", + public=True, + secret_key=Secret.from_env_var("LANGFUSE_SECRET_KEY"), + public_key=Secret.from_env_var("LANGFUSE_PUBLIC_KEY"), + span_handler=CustomSpanHandler(), + ) + + serialized = langfuse_connector.to_dict() + assert serialized == { + "type": "haystack_integrations.components.connectors.langfuse.langfuse_connector.LangfuseConnector", + "init_parameters": { + "name": "Chat example - OpenAI", + "public": True, + "secret_key": { + "type": "env_var", + "env_vars": ["LANGFUSE_SECRET_KEY"], + "strict": True, + }, + "public_key": { + "type": "env_var", + "env_vars": ["LANGFUSE_PUBLIC_KEY"], + "strict": True, + }, + "span_handler": { + "type": "tests.test_langfuse_connector.CustomSpanHandler", + "data": { + "type": "tests.test_langfuse_connector.CustomSpanHandler", + "init_parameters": {}, + }, + }, + }, + } + + def test_from_dict(self, monkeypatch): + monkeypatch.setenv("LANGFUSE_SECRET_KEY", "secret") + monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "public") + + data = { + "type": "haystack_integrations.components.connectors.langfuse.langfuse_connector.LangfuseConnector", + "init_parameters": { + "name": "Chat example - OpenAI", + "public": False, + "secret_key": { + "type": "env_var", + "env_vars": ["LANGFUSE_SECRET_KEY"], + "strict": True, + }, + "public_key": { + "type": "env_var", + "env_vars": ["LANGFUSE_PUBLIC_KEY"], + "strict": True, + }, + "span_handler": None, + }, + } + langfuse_connector = LangfuseConnector.from_dict(data) + assert langfuse_connector.name == "Chat example - OpenAI" + assert langfuse_connector.public is False + assert langfuse_connector.secret_key == Secret.from_env_var("LANGFUSE_SECRET_KEY") + assert langfuse_connector.public_key == Secret.from_env_var("LANGFUSE_PUBLIC_KEY") + assert langfuse_connector.span_handler is None + + def test_from_dict_with_params(self, monkeypatch): + monkeypatch.setenv("LANGFUSE_SECRET_KEY", "secret") + monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "public") + + data = { + "type": "haystack_integrations.components.connectors.langfuse.langfuse_connector.LangfuseConnector", + "init_parameters": { + "name": "Chat example - OpenAI", + "public": True, + "secret_key": { + "type": "env_var", + "env_vars": ["LANGFUSE_SECRET_KEY"], + "strict": True, + }, + "public_key": { + "type": "env_var", + "env_vars": ["LANGFUSE_PUBLIC_KEY"], + "strict": True, + }, + "span_handler": { + "type": "tests.test_langfuse_connector.CustomSpanHandler", + "data": { + "type": "tests.test_langfuse_connector.CustomSpanHandler", + "init_parameters": {}, + }, + }, + }, + } + + langfuse_connector = LangfuseConnector.from_dict(data) + assert langfuse_connector.name == "Chat example - OpenAI" + assert langfuse_connector.public is True + assert langfuse_connector.secret_key == Secret.from_env_var("LANGFUSE_SECRET_KEY") + assert langfuse_connector.public_key == Secret.from_env_var("LANGFUSE_PUBLIC_KEY") + assert isinstance(langfuse_connector.span_handler, CustomSpanHandler) + + def test_pipeline_serialization(self, monkeypatch): + # Set test env vars + monkeypatch.setenv("LANGFUSE_SECRET_KEY", "secret") + monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "public") + monkeypatch.setenv("OPENAI_API_KEY", "openai_api_key") + + # Create pipeline with OpenAI LLM + pipe = Pipeline() + pipe.add_component( + "tracer", + LangfuseConnector( + name="Chat example - OpenAI", + public=True, + secret_key=Secret.from_env_var("LANGFUSE_SECRET_KEY"), + public_key=Secret.from_env_var("LANGFUSE_PUBLIC_KEY"), + ), + ) + pipe.add_component("prompt_builder", ChatPromptBuilder()) + pipe.add_component("llm", OpenAIChatGenerator()) + pipe.connect("prompt_builder.prompt", "llm.messages") + + # Serialize + serialized = pipe.to_dict() + + # Check serialized secrets + tracer_params = serialized["components"]["tracer"]["init_parameters"] + assert isinstance(tracer_params["secret_key"], dict) + assert tracer_params["secret_key"]["type"] == "env_var" + assert tracer_params["secret_key"]["env_vars"] == ["LANGFUSE_SECRET_KEY"] + assert isinstance(tracer_params["public_key"], dict) + assert tracer_params["public_key"]["type"] == "env_var" + assert tracer_params["public_key"]["env_vars"] == ["LANGFUSE_PUBLIC_KEY"] + + # Deserialize + new_pipe = Pipeline.from_dict(serialized) + + # Verify pipeline is the same + assert new_pipe == pipe diff --git a/integrations/langfuse/tests/test_langfuse_span.py b/integrations/langfuse/tests/test_langfuse_span.py deleted file mode 100644 index f701c82ef..000000000 --- a/integrations/langfuse/tests/test_langfuse_span.py +++ /dev/null @@ -1,71 +0,0 @@ -# SPDX-FileCopyrightText: 2023-present deepset GmbH -# -# SPDX-License-Identifier: Apache-2.0 - -import os - -os.environ["HAYSTACK_CONTENT_TRACING_ENABLED"] = "true" - -from unittest.mock import Mock - -from haystack.dataclasses import ChatMessage - -from haystack_integrations.tracing.langfuse.tracer import LangfuseSpan - - -class TestLangfuseSpan: - - # LangfuseSpan can be initialized with a span object - def test_initialized_with_span_object(self): - mock_span = Mock() - span = LangfuseSpan(mock_span) - assert span.raw_span() == mock_span - - # set_tag method can update metadata of the span object - def test_set_tag_updates_metadata(self): - mock_span = Mock() - span = LangfuseSpan(mock_span) - - span.set_tag("key", "value") - mock_span.update.assert_called_once_with(metadata={"key": "value"}) - assert span._data["key"] == "value" - - # set_content_tag method can update input and output of the span object - def test_set_content_tag_updates_input_and_output(self): - mock_span = Mock() - - span = LangfuseSpan(mock_span) - span.set_content_tag("input_key", "input_value") - assert span._data["input_key"] == "input_value" - - mock_span.reset_mock() - span.set_content_tag("output_key", "output_value") - assert span._data["output_key"] == "output_value" - - # set_content_tag method can update input and output of the span object with messages/replies - def test_set_content_tag_updates_input_and_output_with_messages(self): - mock_span = Mock() - - # test message input - span = LangfuseSpan(mock_span) - span.set_content_tag("key.input", {"messages": [ChatMessage.from_user("message")]}) - assert mock_span.update.call_count == 1 - # check we converted ChatMessage to OpenAI format - assert mock_span.update.call_args_list[0][1] == {"input": [{"role": "user", "content": "message"}]} - assert span._data["key.input"] == {"messages": [ChatMessage.from_user("message")]} - - # test replies ChatMessage list - mock_span.reset_mock() - span.set_content_tag("key.output", {"replies": [ChatMessage.from_system("reply")]}) - assert mock_span.update.call_count == 1 - # check we converted ChatMessage to OpenAI format - assert mock_span.update.call_args_list[0][1] == {"output": [{"role": "system", "content": "reply"}]} - assert span._data["key.output"] == {"replies": [ChatMessage.from_system("reply")]} - - # test replies string list - mock_span.reset_mock() - span.set_content_tag("key.output", {"replies": ["reply1", "reply2"]}) - assert mock_span.update.call_count == 1 - # check we handle properly string list replies - assert mock_span.update.call_args_list[0][1] == {"output": ["reply1", "reply2"]} - assert span._data["key.output"] == {"replies": ["reply1", "reply2"]} diff --git a/integrations/langfuse/tests/test_tracer.py b/integrations/langfuse/tests/test_tracer.py index b1c6f9fe5..b7320d74b 100644 --- a/integrations/langfuse/tests/test_tracer.py +++ b/integrations/langfuse/tests/test_tracer.py @@ -6,9 +6,12 @@ import logging import sys from unittest.mock import MagicMock, Mock, patch +from typing import Optional +import pytest from haystack.dataclasses import ChatMessage -from haystack_integrations.tracing.langfuse.tracer import LangfuseTracer +from haystack_integrations.tracing.langfuse.tracer import LangfuseTracer, LangfuseSpan, SpanContext, DefaultSpanHandler +from haystack_integrations.tracing.langfuse.tracer import _COMPONENT_OUTPUT_KEY class MockSpan: @@ -44,9 +47,185 @@ def flush(self): pass -class TestLangfuseTracer: +class CustomSpanHandler(DefaultSpanHandler): + def handle(self, span: LangfuseSpan, component_type: Optional[str]) -> None: + if component_type == "OpenAIChatGenerator": + output = span.get_data().get(_COMPONENT_OUTPUT_KEY, {}) + replies = output.get("replies", []) + if len(replies[0].text) > 10: + span.raw_span().update(level="WARNING", status_message="Response too long (> 10 chars)") + + +class TestLangfuseSpan: + + # LangfuseSpan can be initialized with a span object + def test_initialized_with_span_object(self): + mock_span = Mock() + span = LangfuseSpan(mock_span) + assert span.raw_span() == mock_span + + # set_tag method can update metadata of the span object + def test_set_tag_updates_metadata(self): + mock_span = Mock() + span = LangfuseSpan(mock_span) + + span.set_tag("key", "value") + mock_span.update.assert_called_once_with(metadata={"key": "value"}) + assert span._data["key"] == "value" + + # set_content_tag method can update input and output of the span object + def test_set_content_tag_updates_input_and_output(self): + mock_span = Mock() + + span = LangfuseSpan(mock_span) + span.set_content_tag("input_key", "input_value") + assert span._data["input_key"] == "input_value" + + mock_span.reset_mock() + span.set_content_tag("output_key", "output_value") + assert span._data["output_key"] == "output_value" + + # set_content_tag method can update input and output of the span object with messages/replies + def test_set_content_tag_updates_input_and_output_with_messages(self): + mock_span = Mock() + + # test message input + span = LangfuseSpan(mock_span) + span.set_content_tag("key.input", {"messages": [ChatMessage.from_user("message")]}) + assert mock_span.update.call_count == 1 + # check we converted ChatMessage to OpenAI format + assert mock_span.update.call_args_list[0][1] == {"input": [{"role": "user", "content": "message"}]} + assert span._data["key.input"] == {"messages": [ChatMessage.from_user("message")]} + + # test replies ChatMessage list + mock_span.reset_mock() + span.set_content_tag("key.output", {"replies": [ChatMessage.from_system("reply")]}) + assert mock_span.update.call_count == 1 + # check we converted ChatMessage to OpenAI format + assert mock_span.update.call_args_list[0][1] == {"output": [{"role": "system", "content": "reply"}]} + assert span._data["key.output"] == {"replies": [ChatMessage.from_system("reply")]} + + # test replies string list + mock_span.reset_mock() + span.set_content_tag("key.output", {"replies": ["reply1", "reply2"]}) + assert mock_span.update.call_count == 1 + # check we handle properly string list replies + assert mock_span.update.call_args_list[0][1] == {"output": ["reply1", "reply2"]} + assert span._data["key.output"] == {"replies": ["reply1", "reply2"]} + + +class TestSpanContext: + def test_post_init(self): + with pytest.raises(ValueError): + SpanContext(name=None, operation_name="operation_name", component_type=None, tags={}, parent_span=None) + with pytest.raises(ValueError): + SpanContext(name="name", operation_name=None, component_type=None, tags={}, parent_span=None) + with pytest.raises(ValueError): + SpanContext( + name="name", + operation_name="operation_name", + component_type=None, + tags={}, + parent_span=None, + trace_name=None, + ) + + +class TestDefaultSpanHandler: + def test_handle_generator(self): + mock_span = Mock() + mock_span.raw_span.return_value = mock_span + mock_span.get_data.return_value = { + "haystack.component.type": "OpenAIGenerator", + "haystack.component.output": {"replies": ["This the LLM's response"], "meta": [{"model": "test_model"}]}, + } + + handler = DefaultSpanHandler() + handler.handle(mock_span, component_type="OpenAIGenerator") + + assert mock_span.update.call_count == 1 + assert mock_span.update.call_args_list[0][1] == {"usage": None, "model": "test_model"} + + def test_handle_chat_generator(self): + mock_span = Mock() + mock_span.raw_span.return_value = mock_span + mock_span.get_data.return_value = { + "haystack.component.type": "OpenAIChatGenerator", + "haystack.component.output": { + "replies": [ + ChatMessage.from_assistant( + "This the LLM's response", + meta={"model": "test_model", "completion_start_time": "2021-07-27T16:02:08.012345"}, + ) + ] + }, + } + + handler = DefaultSpanHandler() + handler.handle(mock_span, component_type="OpenAIChatGenerator") - # LangfuseTracer can be initialized with a Langfuse instance, a name and a boolean value for public. + assert mock_span.update.call_count == 1 + assert mock_span.update.call_args_list[0][1] == { + "usage": None, + "model": "test_model", + "completion_start_time": datetime.datetime(2021, 7, 27, 16, 2, 8, 12345), + } + + def test_handle_bad_completion_start_time(self, caplog): + mock_span = Mock() + mock_span.raw_span.return_value = mock_span + mock_span.get_data.return_value = { + "haystack.component.type": "OpenAIChatGenerator", + "haystack.component.output": { + "replies": [ + ChatMessage.from_assistant( + "This the LLM's response", + meta={"model": "test_model", "completion_start_time": "2021-07-32"}, + ) + ] + }, + } + + handler = DefaultSpanHandler() + with caplog.at_level(logging.ERROR): + handler.handle(mock_span, component_type="OpenAIChatGenerator") + assert "Failed to parse completion_start_time" in caplog.text + + assert mock_span.update.call_count == 1 + assert mock_span.update.call_args_list[0][1] == { + "usage": None, + "model": "test_model", + "completion_start_time": None, + } + + +class TestCustomSpanHandler: + def test_handle(self): + mock_span = Mock() + mock_span.raw_span.return_value = mock_span + mock_span.get_data.return_value = { + "haystack.component.type": "OpenAIChatGenerator", + "haystack.component.output": { + "replies": [ + ChatMessage.from_assistant( + "This the LLM's response", + meta={"model": "test_model", "completion_start_time": "2021-07-32"}, + ) + ] + }, + } + + handler = CustomSpanHandler() + handler.handle(mock_span, component_type="OpenAIChatGenerator") + + assert mock_span.update.call_count == 1 + assert mock_span.update.call_args_list[0][1] == { + "level": "WARNING", + "status_message": "Response too long (> 10 chars)", + } + + +class TestLangfuseTracer: def test_initialization(self): langfuse_instance = Mock() tracer = LangfuseTracer(tracer=langfuse_instance, name="Haystack", public=True) @@ -55,8 +234,6 @@ def test_initialization(self): assert tracer._name == "Haystack" assert tracer._public - # check that the trace method is called on the tracer instance with the provided operation name and tags - # check that the span is added to the context and removed after the context manager exits def test_create_new_span(self): mock_raw_span = MagicMock() mock_raw_span.operation_name = "operation_name" @@ -74,6 +251,7 @@ def test_create_new_span(self): tracer = LangfuseTracer(tracer=mock_tracer, name="Haystack", public=False) + # check that the trace method is called on the tracer instance with the provided operation name and tags with tracer.trace("operation_name", tags={"tag1": "value1", "tag2": "value2"}) as span: assert len(tracer._context) == 1, "The trace span should have been added to the the root context span" assert span.raw_span().operation_name == "operation_name" diff --git a/integrations/langfuse/tests/test_tracing.py b/integrations/langfuse/tests/test_tracing.py index 02796fec2..e94714f4b 100644 --- a/integrations/langfuse/tests/test_tracing.py +++ b/integrations/langfuse/tests/test_tracing.py @@ -4,23 +4,20 @@ import os import time +from typing import Any, Dict, List from urllib.parse import urlparse -from typing import Optional import pytest import requests -from haystack import Pipeline +from haystack import Pipeline, component from haystack.components.builders import ChatPromptBuilder from haystack.components.generators.chat import OpenAIChatGenerator from haystack.dataclasses import ChatMessage -from haystack.utils import Secret from requests.auth import HTTPBasicAuth from haystack_integrations.components.connectors.langfuse import LangfuseConnector from haystack_integrations.components.generators.anthropic import AnthropicChatGenerator from haystack_integrations.components.generators.cohere import CohereChatGenerator -from haystack_integrations.tracing.langfuse import LangfuseSpan, DefaultSpanHandler -from haystack_integrations.tracing.langfuse.tracer import _COMPONENT_OUTPUT_KEY # don't remove (or move) this env var setting from here, it's needed to turn tracing on os.environ["HAYSTACK_CONTENT_TRACING_ENABLED"] = "true" @@ -51,7 +48,7 @@ def poll_langfuse(url: str): @pytest.fixture -def pipeline_with_env_vars(llm_class, expected_trace): +def basic_pipeline(llm_class, expected_trace): pipe = Pipeline() pipe.add_component("tracer", LangfuseConnector(name=f"Chat example - {expected_trace}", public=True)) pipe.add_component("prompt_builder", ChatPromptBuilder()) @@ -69,18 +66,16 @@ def pipeline_with_env_vars(llm_class, expected_trace): (CohereChatGenerator, "COHERE_API_KEY", "Cohere"), ], ) -@pytest.mark.parametrize("pipeline_fixture", ["pipeline_with_env_vars"]) -def test_tracing_integration(llm_class, env_var, expected_trace, pipeline_fixture, request): +def test_tracing_integration(llm_class, env_var, expected_trace, basic_pipeline): if not all([os.environ.get("LANGFUSE_SECRET_KEY"), os.environ.get("LANGFUSE_PUBLIC_KEY"), os.environ.get(env_var)]): pytest.skip(f"Missing required environment variables: LANGFUSE_SECRET_KEY, LANGFUSE_PUBLIC_KEY, or {env_var}") - pipe = request.getfixturevalue(pipeline_fixture) messages = [ ChatMessage.from_system("Always respond in German even if some input data is in other languages."), ChatMessage.from_user("Tell me about {{location}}"), ] - response = pipe.run( + response = basic_pipeline.run( data={ "prompt_builder": {"template_variables": {"location": "Berlin"}, "template": messages}, "tracer": {"invocation_context": {"user_id": "user_42"}}, @@ -107,107 +102,58 @@ def test_tracing_integration(llm_class, env_var, expected_trace, pipeline_fixtur assert res_json["observations"][0]["type"] == "GENERATION" -def test_pipeline_serialization(monkeypatch): - """Test that a pipeline with secrets can be properly serialized and deserialized""" - - # Set test env vars - monkeypatch.setenv("LANGFUSE_SECRET_KEY", "secret") - monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "public") - monkeypatch.setenv("OPENAI_API_KEY", "openai_api_key") - - # Create pipeline with OpenAI LLM - pipe = Pipeline() - pipe.add_component( - "tracer", - LangfuseConnector( - name="Chat example - OpenAI", - public=True, - secret_key=Secret.from_env_var("LANGFUSE_SECRET_KEY"), - public_key=Secret.from_env_var("LANGFUSE_PUBLIC_KEY"), - ), - ) - pipe.add_component("prompt_builder", ChatPromptBuilder()) - pipe.add_component("llm", OpenAIChatGenerator()) - pipe.connect("prompt_builder.prompt", "llm.messages") - - # Serialize - serialized = pipe.to_dict() - - # Check serialized secrets - tracer_params = serialized["components"]["tracer"]["init_parameters"] - assert isinstance(tracer_params["secret_key"], dict) - assert tracer_params["secret_key"]["type"] == "env_var" - assert tracer_params["secret_key"]["env_vars"] == ["LANGFUSE_SECRET_KEY"] - assert isinstance(tracer_params["public_key"], dict) - assert tracer_params["public_key"]["type"] == "env_var" - assert tracer_params["public_key"]["env_vars"] == ["LANGFUSE_PUBLIC_KEY"] - - # Deserialize - new_pipe = Pipeline.from_dict(serialized) - - # Verify pipeline is the same - assert new_pipe == pipe - - -class QualityCheckSpanHandler(DefaultSpanHandler): - """Extends default handler to add quality checks with warning levels.""" - - def handle(self, span: LangfuseSpan, component_type: Optional[str]) -> None: - # First do the default handling (model, usage, etc.) - super().handle(span, component_type) - - # Then add our custom quality checks - if component_type == "OpenAIChatGenerator": - output = span._data.get(_COMPONENT_OUTPUT_KEY, {}) - replies = output.get("replies", []) - - if not replies: - span._span.update(level="ERROR", status_message="No response received") - return - - reply = replies[0] - if "error" in reply.meta: - span._span.update(level="ERROR", status_message=f"OpenAI error: {reply.meta['error']}") - elif len(reply.text) > 10: - span._span.update(level="WARNING", status_message="Response too long (> 10 chars)") - else: - span._span.update(level="DEFAULT", status_message="Success") - - @pytest.mark.integration -def test_custom_span_handler(): - """Test that custom span handler properly sets Langfuse levels.""" +def test_tracing_with_sub_pipelines(): if not all( [os.environ.get("LANGFUSE_SECRET_KEY"), os.environ.get("LANGFUSE_PUBLIC_KEY"), os.environ.get("OPENAI_API_KEY")] ): - pytest.skip("Missing required environment variables") - + pytest.skip( + f"Missing required environment variables: LANGFUSE_SECRET_KEY, LANGFUSE_PUBLIC_KEY, or OPENAI_API_KEY" + ) + + @component + class SubGenerator: + def __init__(self): + self.sub_pipeline = Pipeline() + self.sub_pipeline.add_component("llm", OpenAIChatGenerator()) + + @component.output_types(replies=List[ChatMessage]) + def run(self, messages: List[ChatMessage]) -> Dict[str, Any]: + return {"replies": self.sub_pipeline.run(data={"llm": {"messages": messages}})["llm"]["replies"]} + + @component + class SubPipeline: + def __init__(self): + self.sub_pipeline = Pipeline() + self.sub_pipeline.add_component("prompt_builder", ChatPromptBuilder()) + self.sub_pipeline.add_component("sub_llm", SubGenerator()) + self.sub_pipeline.connect("prompt_builder.prompt", "sub_llm.messages") + + @component.output_types(replies=List[ChatMessage]) + def run(self, messages: List[ChatMessage]) -> Dict[str, Any]: + return { + "replies": self.sub_pipeline.run( + data={"prompt_builder": {"template": messages, "template_variables": {"location": "Berlin"}}} + )["sub_llm"]["replies"] + } + + # Create the main pipeline pipe = Pipeline() - pipe.add_component( - "tracer", - LangfuseConnector( - name="Quality Check Example", - public=True, - span_handler=QualityCheckSpanHandler(), - ), - ) - pipe.add_component("prompt_builder", ChatPromptBuilder()) - pipe.add_component("llm", OpenAIChatGenerator()) - pipe.connect("prompt_builder.prompt", "llm.messages") + pipe.add_component("tracer", LangfuseConnector(name="Sub-pipeline example")) + pipe.add_component("sub_pipeline", SubPipeline()) - # Test short response - messages = [ - ChatMessage.from_system("Respond with exactly 3 words."), - ChatMessage.from_user("What is Berlin?"), + msgs = [ + ChatMessage.from_system("Always respond in German even if some input data is in other languages."), + ChatMessage.from_user("Tell me about {{location}}"), ] - response = pipe.run( - data={ - "prompt_builder": {"template_variables": {}, "template": messages}, - "tracer": {"invocation_context": {"user_id": "test_user"}}, - } + data={"sub_pipeline": {"messages": msgs}, "tracer": {"invocation_context": {"user_id": "user_42"}}} ) + assert "Berlin" in response["sub_pipeline"]["replies"][0].text + assert response["tracer"]["trace_url"] + assert response["tracer"]["trace_id"] + trace_url = response["tracer"]["trace_url"] uuid = os.path.basename(urlparse(trace_url).path) url = f"https://cloud.langfuse.com/api/public/traces/{uuid}" @@ -215,6 +161,21 @@ def test_custom_span_handler(): res = poll_langfuse(url) assert res.status_code == 200, f"Failed to retrieve data from Langfuse API: {res.status_code}" - content = str(res.content) - assert "WARNING" in content - assert "Response too long" in content + res_json = res.json() + assert res_json["name"] == "Sub-pipeline example" + assert isinstance(res_json["input"], dict) + assert "sub_pipeline" in res_json["input"] + assert "messages" in res_json["input"]["sub_pipeline"] + assert res_json["input"]["tracer"]["invocation_context"]["user_id"] == "user_42" + assert isinstance(res_json["output"], dict) + assert isinstance(res_json["metadata"], dict) + assert isinstance(res_json["observations"], list) + + observations = res_json["observations"] + + haystack_pipeline_run_observations = [obs for obs in observations if obs["name"] == "haystack.pipeline.run"] + # There should be two observations for the haystack.pipeline.run span: one for each sub pipeline + # Main pipeline is stored under the name "Sub-pipeline example" + assert len(haystack_pipeline_run_observations) == 2 + assert "prompt_builder" in haystack_pipeline_run_observations[0]["input"] + assert "llm" in haystack_pipeline_run_observations[1]["input"]