• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 15731663775

18 Jun 2025 11:31AM UTC coverage: 90.167% (-0.01%) from 90.178%
15731663775

Pull #9525

github

web-flow
Merge 9945253d9 into f91145964
Pull Request #9525: refactor: Update to `StreamingChunk`, better `index` setting and change `tool_call` to `tool_calls`

11572 of 12834 relevant lines covered (90.17%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.22
haystack/components/generators/utils.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import json
1✔
6
from typing import Dict, List
1✔
7

8
from haystack import logging
1✔
9
from haystack.dataclasses import ChatMessage, StreamingChunk, ToolCall
1✔
10

11
logger = logging.getLogger(__name__)
1✔
12

13

14
def print_streaming_chunk(chunk: StreamingChunk) -> None:
1✔
15
    """
16
    Callback function to handle and display streaming output chunks.
17

18
    This function processes a `StreamingChunk` object by:
19
    - Printing tool call metadata (if any), including function names and arguments, as they arrive.
20
    - Printing tool call results when available.
21
    - Printing the main content (e.g., text tokens) of the chunk as it is received.
22

23
    The function outputs data directly to stdout and flushes output buffers to ensure immediate display during
24
    streaming.
25

26
    :param chunk: A chunk of streaming data containing content and optional metadata, such as tool calls and
27
        tool results.
28
    """
29
    if chunk.start and chunk.index and chunk.index > 0:
1✔
30
        # If this is the start of a new content block but not the first content block, print two new lines
31
        print("\n\n", flush=True, end="")
×
32

33
    ## Tool Call streaming
34
    if chunk.tool_calls:
1✔
35
        # Typically, if there are multiple tool calls in the chunk, typically this means that the tool calls are fully
36
        # formed and not just a delta.
37
        for tool_call in chunk.tool_calls:
×
38
            # If chunk.start is True indicates beginning of a tool call
39
            # Also presence of tool_call.tool_name indicates the start of a tool call too
40
            if chunk.start:
×
41
                # If there is more than one tool call in the chunk, we print two new lines to separate them
42
                # We know there is more than one tool call if the index of the tool call is greater than the index of
43
                # the chunk.
44
                if tool_call.index > chunk.index:
×
45
                    print("\n\n", flush=True, end="")
×
46

47
                print("[TOOL CALL]\n", flush=True, end="")
×
48
                print(f"Tool: {tool_call.tool_name} ", flush=True, end="")
×
49
                print("\nArguments: ", flush=True, end="")
×
50

51
            # print the tool arguments
52
            if tool_call.arguments:
×
53
                print(tool_call.arguments, flush=True, end="")
×
54

55
    ## Tool Call Result streaming
56
    # Print tool call results if available (from ToolInvoker)
57
    if chunk.tool_call_result:
1✔
58
        # Tool Call Result is fully formed so delta accumulation is not needed
59
        print(f"[TOOL RESULT]\n{chunk.tool_call_result.result}", flush=True, end="")
1✔
60

61
    ## Normal content streaming
62
    # Print the main content of the chunk (from ChatGenerator)
63
    if chunk.content:
1✔
64
        if chunk.start:
×
65
            print("[ASSISTANT]\n", flush=True, end="")
×
66
        print(chunk.content, flush=True, end="")
×
67

68
    # End of LLM assistant message so we add two new lines
69
    # This ensures spacing between multiple LLM messages (e.g. Agent) or multiple Tool Call Results
70
    if chunk.meta.get("finish_reason") is not None:
1✔
71
        print("\n\n", flush=True, end="")
1✔
72

73

74
def _convert_streaming_chunks_to_chat_message(chunks: List[StreamingChunk]) -> ChatMessage:
1✔
75
    """
76
    Connects the streaming chunks into a single ChatMessage.
77

78
    :param chunks: The list of all `StreamingChunk` objects.
79

80
    :returns: The ChatMessage.
81
    """
82
    text = "".join([chunk.content for chunk in chunks])
1✔
83
    tool_calls = []
1✔
84

85
    # Process tool calls if present in any chunk
86
    tool_call_data: Dict[int, Dict[str, str]] = {}  # Track tool calls by index
1✔
87
    for chunk in chunks:
1✔
88
        if chunk.tool_calls:
1✔
89
            # We do this to make sure mypy is happy, but we enforce index is not None in the StreamingChunk dataclass if
90
            # tool_call is present
91
            assert chunk.index is not None
1✔
92

93
            for tool_call in chunk.tool_calls:
1✔
94
                # We use the index of the tool_call to track the tool call across chunks since the ID is not always
95
                # provided
96
                if tool_call.index not in tool_call_data:
1✔
97
                    tool_call_data[chunk.index] = {"id": "", "name": "", "arguments": ""}
1✔
98

99
                # Save the ID if present
100
                if tool_call.id is not None:
1✔
101
                    tool_call_data[chunk.index]["id"] = tool_call.id
1✔
102

103
                if tool_call.tool_name is not None:
1✔
104
                    tool_call_data[chunk.index]["name"] += tool_call.tool_name
1✔
105
                if tool_call.arguments is not None:
1✔
106
                    tool_call_data[chunk.index]["arguments"] += tool_call.arguments
1✔
107

108
    # Convert accumulated tool call data into ToolCall objects
109
    sorted_keys = sorted(tool_call_data.keys())
1✔
110
    for key in sorted_keys:
1✔
111
        tool_call_dict = tool_call_data[key]
1✔
112
        try:
1✔
113
            arguments = json.loads(tool_call_dict["arguments"])
1✔
114
            tool_calls.append(ToolCall(id=tool_call_dict["id"], tool_name=tool_call_dict["name"], arguments=arguments))
1✔
115
        except json.JSONDecodeError:
×
116
            logger.warning(
×
117
                "OpenAI returned a malformed JSON string for tool call arguments. This tool call "
118
                "will be skipped. To always generate a valid JSON, set `tools_strict` to `True`. "
119
                "Tool call ID: {_id}, Tool name: {_name}, Arguments: {_arguments}",
120
                _id=tool_call_dict["id"],
121
                _name=tool_call_dict["name"],
122
                _arguments=tool_call_dict["arguments"],
123
            )
124

125
    # finish_reason can appear in different places so we look for the last one
126
    finish_reasons = [
1✔
127
        chunk.meta.get("finish_reason") for chunk in chunks if chunk.meta.get("finish_reason") is not None
128
    ]
129
    finish_reason = finish_reasons[-1] if finish_reasons else None
1✔
130

131
    meta = {
1✔
132
        "model": chunks[-1].meta.get("model"),
133
        "index": 0,
134
        "finish_reason": finish_reason,
135
        "completion_start_time": chunks[0].meta.get("received_at"),  # first chunk received
136
        "usage": chunks[-1].meta.get("usage"),  # last chunk has the final usage data if available
137
    }
138

139
    return ChatMessage.from_assistant(text=text or None, tool_calls=tool_calls, meta=meta)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc