• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 14831376297

05 May 2025 07:33AM UTC coverage: 90.418% (-0.1%) from 90.513%
14831376297

Pull #9290

github

web-flow
Merge b4f49382d into 7db719981
Pull Request #9290: feat: enable streaming ToolCall/Result from Agent

10908 of 12064 relevant lines covered (90.42%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

16.67
haystack/components/generators/utils.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
from openai.types.chat.chat_completion_chunk import ChoiceDeltaToolCall
1✔
6

7
from haystack.dataclasses import StreamingChunk
1✔
8

9

10
def print_streaming_chunk(chunk: StreamingChunk) -> None:
1✔
11
    """
12
    Callback function to handle and display streaming output chunks.
13

14
    This function processes a `StreamingChunk` object by:
15
    - Printing tool call metadata (if any), including function names and arguments, as they arrive.
16
    - Printing tool call results when available.
17
    - Printing the main content (e.g., text tokens) of the chunk as it is received.
18

19
    The function outputs data directly to stdout and flushes output buffers to ensure immediate display during
20
    streaming.
21

22
    :param chunk: A chunk of streaming data containing content and optional metadata, such as tool calls and
23
        tool results.
24
    """
25
    if chunk.meta.get("tool_calls"):
×
26
        for tool_call in chunk.meta["tool_calls"]:
×
27
            if isinstance(tool_call, ChoiceDeltaToolCall) and tool_call.function:
×
28
                if tool_call.function.name and not tool_call.function.arguments:
×
29
                    print(f"[TOOL CALL - {tool_call.function.name}] ", flush=True, end="")
×
30

31
                if tool_call.function.arguments:
×
32
                    if tool_call.function.arguments.startswith("{"):
×
33
                        print("\nArguments: ", flush=True, end="")
×
34
                    print(tool_call.function.arguments, flush=True, end="")
×
35
                    if tool_call.function.arguments.endswith("}"):
×
36
                        print("\n\n", flush=True, end="")
×
37

38
    if chunk.meta.get("tool_result"):
×
39
        print(f"[TOOL RESULT]\n{chunk.meta['tool_result']}\n\n", flush=True, end="")
×
40

41
    if chunk.content:
×
42
        print(chunk.content, flush=True, end="")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc