• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 20404913201

21 Dec 2025 04:54AM UTC coverage: 92.263% (+0.003%) from 92.26%
20404913201

Pull #10265

github

web-flow
Merge d0532cd59 into 8091034fb
Pull Request #10265: fix: Extract usage info from any chunk in streaming response

14191 of 15381 relevant lines covered (92.26%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.46
haystack/components/generators/utils.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import json
1✔
6

7
from haystack import logging
1✔
8
from haystack.dataclasses import ChatMessage, ReasoningContent, StreamingChunk, ToolCall
1✔
9

10
logger = logging.getLogger(__name__)
1✔
11

12

13
def print_streaming_chunk(chunk: StreamingChunk) -> None:
1✔
14
    """
15
    Callback function to handle and display streaming output chunks.
16

17
    This function processes a `StreamingChunk` object by:
18
    - Printing tool call metadata (if any), including function names and arguments, as they arrive.
19
    - Printing tool call results when available.
20
    - Printing the main content (e.g., text tokens) of the chunk as it is received.
21

22
    The function outputs data directly to stdout and flushes output buffers to ensure immediate display during
23
    streaming.
24

25
    :param chunk: A chunk of streaming data containing content and optional metadata, such as tool calls and
26
        tool results.
27
    """
28
    if chunk.start and chunk.index and chunk.index > 0:
1✔
29
        # If this is the start of a new content block but not the first content block, print two new lines
30
        print("\n\n", flush=True, end="")
×
31

32
    ## Tool Call streaming
33
    if chunk.tool_calls:
1✔
34
        # Typically, if there are multiple tool calls in the chunk this means that the tool calls are fully formed and
35
        # not just a delta.
36
        for tool_call in chunk.tool_calls:
1✔
37
            # If chunk.start is True indicates beginning of a tool call
38
            # Also presence of tool_call.tool_name indicates the start of a tool call too
39
            if chunk.start:
1✔
40
                # If there is more than one tool call in the chunk, we print two new lines to separate them
41
                # We know there is more than one tool call if the index of the tool call is greater than the index of
42
                # the chunk.
43
                if chunk.index and tool_call.index > chunk.index:
1✔
44
                    print("\n\n", flush=True, end="")
×
45

46
                print(f"[TOOL CALL]\nTool: {tool_call.tool_name} \nArguments: ", flush=True, end="")
1✔
47

48
            # print the tool arguments
49
            if tool_call.arguments:
1✔
50
                print(tool_call.arguments, flush=True, end="")
1✔
51

52
    ## Tool Call Result streaming
53
    # Print tool call results if available (from ToolInvoker)
54
    if chunk.tool_call_result:
1✔
55
        # Tool Call Result is fully formed so delta accumulation is not needed
56
        print(f"[TOOL RESULT]\n{chunk.tool_call_result.result}", flush=True, end="")
1✔
57

58
    ## Normal content streaming
59
    # Print the main content of the chunk (from ChatGenerator)
60
    if chunk.content:
1✔
61
        if chunk.start:
1✔
62
            print("[ASSISTANT]\n", flush=True, end="")
1✔
63
        print(chunk.content, flush=True, end="")
1✔
64

65
    ## Reasoning content streaming
66
    # Print the reasoning content of the chunk (from ChatGenerator)
67
    if chunk.reasoning:
1✔
68
        if chunk.start:
1✔
69
            print("[REASONING]\n", flush=True, end="")
1✔
70
        print(chunk.reasoning.reasoning_text, flush=True, end="")
1✔
71

72
    # End of LLM assistant message so we add two new lines
73
    # This ensures spacing between multiple LLM messages (e.g. Agent) or multiple Tool Call Results
74
    if chunk.finish_reason is not None:
1✔
75
        print("\n\n", flush=True, end="")
1✔
76

77

78
def _convert_streaming_chunks_to_chat_message(chunks: list[StreamingChunk]) -> ChatMessage:
1✔
79
    """
80
    Connects the streaming chunks into a single ChatMessage.
81

82
    :param chunks: The list of all `StreamingChunk` objects.
83

84
    :returns: The ChatMessage.
85
    """
86
    text = "".join([chunk.content for chunk in chunks])
1✔
87
    logprobs = []
1✔
88
    for chunk in chunks:
1✔
89
        if chunk.meta.get("logprobs"):
1✔
90
            logprobs.append(chunk.meta.get("logprobs"))
×
91
    tool_calls = []
1✔
92

93
    # Accumulate reasoning content from chunks
94
    reasoning_parts = [chunk.reasoning.reasoning_text for chunk in chunks if chunk.reasoning]
1✔
95
    reasoning = ReasoningContent(reasoning_text="".join(reasoning_parts)) if reasoning_parts else None
1✔
96

97
    # Process tool calls if present in any chunk
98
    tool_call_data: dict[int, dict[str, str]] = {}  # Track tool calls by index
1✔
99
    for chunk in chunks:
1✔
100
        if chunk.tool_calls:
1✔
101
            for tool_call in chunk.tool_calls:
1✔
102
                # We use the index of the tool_call to track the tool call across chunks since the ID is not always
103
                # provided
104
                if tool_call.index not in tool_call_data:
1✔
105
                    tool_call_data[tool_call.index] = {"id": "", "name": "", "arguments": ""}
1✔
106

107
                # Save the ID if present
108
                if tool_call.id is not None:
1✔
109
                    tool_call_data[tool_call.index]["id"] = tool_call.id
1✔
110

111
                if tool_call.tool_name is not None:
1✔
112
                    tool_call_data[tool_call.index]["name"] += tool_call.tool_name
1✔
113
                if tool_call.arguments is not None:
1✔
114
                    tool_call_data[tool_call.index]["arguments"] += tool_call.arguments
1✔
115

116
    # Convert accumulated tool call data into ToolCall objects
117
    sorted_keys = sorted(tool_call_data.keys())
1✔
118
    for key in sorted_keys:
1✔
119
        tool_call_dict = tool_call_data[key]
1✔
120
        try:
1✔
121
            arguments = json.loads(tool_call_dict.get("arguments", "{}")) if tool_call_dict.get("arguments") else {}
1✔
122
            tool_calls.append(ToolCall(id=tool_call_dict["id"], tool_name=tool_call_dict["name"], arguments=arguments))
1✔
123
        except json.JSONDecodeError:
×
124
            logger.warning(
×
125
                "The LLM provider returned a malformed JSON string for tool call arguments. This tool call "
126
                "will be skipped. To always generate a valid JSON, set `tools_strict` to `True`. "
127
                "Tool call ID: {_id}, Tool name: {_name}, Arguments: {_arguments}",
128
                _id=tool_call_dict["id"],
129
                _name=tool_call_dict["name"],
130
                _arguments=tool_call_dict["arguments"],
131
            )
132

133
    # finish_reason can appear in different places so we look for the last one
134
    finish_reasons = [chunk.finish_reason for chunk in chunks if chunk.finish_reason]
1✔
135
    finish_reason = finish_reasons[-1] if finish_reasons else None
1✔
136

137
    # usage info can appear in different chunks depending on the API provider
138
    # (e.g., OpenAI returns it in the last chunk with empty choices, but Qwen3 may return it differently)
139
    # so we look for the last non-None usage value across all chunks
140
    usage = None
1✔
141
    for chunk in reversed(chunks):
1✔
142
        chunk_usage = chunk.meta.get("usage")
1✔
143
        if chunk_usage is not None:
1✔
144
            usage = chunk_usage
1✔
145
            break
1✔
146

147
    meta = {
1✔
148
        "model": chunks[-1].meta.get("model"),
149
        "index": 0,
150
        "finish_reason": finish_reason,
151
        "completion_start_time": chunks[0].meta.get("received_at"),  # first chunk received
152
        "usage": usage,
153
    }
154

155
    if logprobs:
1✔
156
        meta["logprobs"] = logprobs
×
157

158
    return ChatMessage.from_assistant(text=text or None, tool_calls=tool_calls, reasoning=reasoning, meta=meta)
1✔
159

160

161
def _serialize_object(obj):
1✔
162
    """Convert an object to a serializable dict recursively"""
163
    if hasattr(obj, "model_dump"):
1✔
164
        return obj.model_dump()
1✔
165
    elif hasattr(obj, "__dict__"):
1✔
166
        return {k: _serialize_object(v) for k, v in obj.__dict__.items() if not k.startswith("_")}
×
167
    elif isinstance(obj, dict):
1✔
168
        return {k: _serialize_object(v) for k, v in obj.items()}
×
169
    elif isinstance(obj, list):
1✔
170
        return [_serialize_object(item) for item in obj]
×
171
    else:
172
        return obj
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc