Traces Not Appearing In Single Trace Tree When Using AsyncPipeline Of Haystack

by ADMIN 79 views

===========================================================

Introduction


Haystack is a popular open-source library for building conversational AI applications. It provides a flexible and modular architecture for integrating various components, such as language models, data storage, and user interfaces. However, when using the AsyncPipeline class with LangSmith's @traceable decorator, the trace tree is incomplete. Components within the AsyncPipeline don't appear as child traces under the parent trace. This issue doesn't occur when using the synchronous Pipeline class.

Expected Behavior


Traces from all components in an AsyncPipeline should appear as children of the parent trace, creating a complete trace tree - identical to how the synchronous Pipeline works.

Actual Behavior


When using AsyncPipeline, The parent trace appears in LangSmith but underlying trace is empty. Individual component traces appear as separate, disconnected traces. The hierarchy/relationship between traces is lost.

To Reproduce


The issue can be reproduced by running the following code:

import logging
import os
from abc import abstractmethod
from typing import Any, Callable, List

from dotenv import load_dotenv
from haystack import AsyncPipeline, Pipeline, component
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
from langsmith import get_current_run_tree, traceable

logging.basicConfig(format="%(levelname)s - %(name)s -  %(message)s", level=logging.WARNING)
logging.getLogger("haystack").setLevel(logging.INFO)

# should contain following variables:
# os.environ["LANGCHAIN_TRACING_V2"] = "true"
# os.environ["LANGCHAIN_PROJECT"] = "local"
# os.environ["LANGCHAIN_API_KEY"] = "API_KEY"
load_dotenv(".env")


@component
class BaseComponent:
    def __init__(self):
        pass

    @abstractmethod
    def run(self):
        pass


def trace_inputs(keys: list[str]) -> Callable[[dict[str, Any]], dict[str, Any]]:
    return lambda inputs: {key: inputs.get(key, None) for key in keys}


@component
class CustomPromptBuilder(BaseComponent):
    def __init__(self):
        BaseComponent.__init__(self)

    @traceable(name="custom_prompt_builder", run_type="prompt", process_inputs=trace_inputs(["location"]))
    @component.output_types(messages=list[ChatMessage])
    def run(self, location: str):
        return {
            "messages": [
                ChatMessage.from_system("Always respond in German even if some input data is in other languages."),
                ChatMessage.from_user(f"Tell me about {location}"),
            ]
        }


@component
class ResponseProcessor(BaseComponent):
    def __init__(self):
        BaseComponent.__init__(self)

    @traceable(name="response_processor", run_type="llm", process_inputs=trace_inputs(["replies"]))
    @component.output_types(processed_response=str)
    def run(self, replies: List[ChatMessage]):
        # Extract content from the reply and process it
        response_text = replies[0].text if replies else ""
        processed = f"{response_text}\n\n--- Processed by ResponseProcessor ---"
 return {"processed_response": processed}


class TranslationService:
    def __init__(self, use_async=True):
        self.use_async = use_async
        # Toggle between AsyncPipeline and Pipeline to demonstrate the issue
        self.pipe = AsyncPipeline() if use_async else Pipeline()

        # Add components
        self.pipe.add_component("prompt_builder", CustomPromptBuilder())
        self.pipe.add_component("llm", OpenAIChatGenerator())
        self.pipe.add_component("processor", ResponseProcessor())

        # Connect components
        self.pipe.connect("prompt_builder.messages", "llm.messages")
        self.pipe.connect("llm.replies", "processor.replies")  # Fixed: using replies instead of responses

    @traceable(name="translation_service", run_type="chain", process_inputs=trace_inputs(["location_name"]))
    def translate(self, location_name: str):
        res = self.pipe.run(data={"location": location_name})
        trace_obj = get_current_run_tree()
        print(f"Trace URL ({'AsyncPipeline' if self.use_async else 'Pipeline'}):", trace_obj.get_url())
        return res


# Demonstrate both cases
def run_comparison():
    # 1. Using Pipeline (works correctly with tracing)
    sync_service = TranslationService(use_async=False)
    sync_res = sync_service.translate(location_name="Berlin")
    print("Sync Pipeline Result:", sync_res)
    print("\n" + "-" * 50 + "\n")

    # 2. Using AsyncPipeline (tracing issue)
    async_service = TranslationService(use_async=True)
    async_res = async_service.translate(location_name="Munich")
    print("Async Pipeline Result:", async_res)


if __name__ == "__main__":
    run_comparison()

Traces Result


The traces result is as follows:

  • Pipeline: Image
  • AsyncPipeline: Image
  • All Traces at Root Level: Image

Additional Information


Langsmith has set of articles related to tracing in asyncio and threading. I am attaching them. We do use langsmith_extra={"parent": rt} in few places where we are spanning thread. I believe if parent is passed properly, this issue should be resolved.

FAQ Check


System


  • OS: Linux/wsl
  • haystack-ai==2.12.0
  • langsmith==0.3.27

Conclusion


In conclusion, the issue of traces not appearing in a single trace tree when using AsyncPipeline of Haystack is a complex problem that requires a deep understanding of the underlying architecture and tracing mechanisms. By analyzing the code and tracing results, we can identify the root cause of the issue and propose potential solutions. However, further investigation and testing are needed to confirm the effectiveness of these solutions.

Potential Solutions


Based on the analysis, the following potential solutions can be proposed:

  1. Passing parent to langsmith_extra: As mentioned in the additional information section, passing the parent to langsmith_extra in places where we are spanning threads might resolve the issue.
  2. Using a different tracing mechanism: If the issue persists, we can explore using a different tracing mechanism, such as the one provided by Langsmith, to see if it resolves the issue.
  3. Modifying the AsyncPipeline class: We can try modifying the AsyncPipeline class to see if it resolves the issue. This might involve changing the way the pipeline is constructed or the way the components are connected.

Future Work


To further investigate this issue, we can:

  1. Conduct further testing: We can conduct more thorough testing to see if the issue persists across different scenarios and configurations.
  2. Analyze the Langsmith documentation: We can dive deeper into the Langsmith documentation to see if there are any additional features or configurations that can help resolve the issue.
  3. Reach out to the Haystack community: We can reach out to the Haystack community to see if anyone has encountered similar issues or has any suggestions for resolving the problem.

By following these steps, we can gain a better understanding of the issue and potentially find a solution that works for our use case.

====================================================================

Q: What is the issue with the AsyncPipeline in Haystack?


A: The issue is that when using the AsyncPipeline class with LangSmith's @traceable decorator, the trace tree is incomplete. Components within the AsyncPipeline don't appear as child traces under the parent trace.

Q: What is the expected behavior of the AsyncPipeline?


A: The expected behavior is that traces from all components in an AsyncPipeline should appear as children of the parent trace, creating a complete trace tree - identical to how the synchronous Pipeline works.

Q: What is the actual behavior of the AsyncPipeline?


A: The actual behavior is that the parent trace appears in LangSmith but underlying trace is empty. Individual component traces appear as separate, disconnected traces. The hierarchy/relationship between traces is lost.

Q: How can I reproduce the issue?


A: You can reproduce the issue by running the provided code, which demonstrates the use of AsyncPipeline with LangSmith's @traceable decorator.

Q: What are the potential solutions to the issue?


A: The potential solutions include:

  1. Passing parent to langsmith_extra: Passing the parent to langsmith_extra in places where we are spanning threads might resolve the issue.
  2. Using a different tracing mechanism: If the issue persists, we can explore using a different tracing mechanism, such as the one provided by Langsmith, to see if it resolves the issue.
  3. Modifying the AsyncPipeline class: We can try modifying the AsyncPipeline class to see if it resolves the issue. This might involve changing the way the pipeline is constructed or the way the components are connected.

Q: What are the next steps to resolve the issue?


A: The next steps include:

  1. Conducting further testing: We can conduct more thorough testing to see if the issue persists across different scenarios and configurations.
  2. Analyzing the Langsmith documentation: We can dive deeper into the Langsmith documentation to see if there are any additional features or configurations that can help resolve the issue.
  3. Reaching out to the Haystack community: We can reach out to the Haystack community to see if anyone has encountered similar issues or has any suggestions for resolving the problem.

Q: What is the system configuration for the issue?


A: The system configuration includes:

  • OS: Linux/wsl
  • haystack-ai==2.12.0
  • langsmith==0.3.27

Q: What are the additional resources related to the issue?


A: The additional resources include:

Q: Have you had a look at our new FAQ page?


A: [x ] Yes, we have had a look at the FAQ page.

By following these Q&A, we can gain a better understanding the issue and potentially find a solution that works for our use case.