• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 15486046280

06 Jun 2025 08:17AM UTC coverage: 90.418% (-0.001%) from 90.419%
15486046280

push

github

web-flow
feat: Update streaming chunk (#9424)

* Start expanding StreamingChunk

* First pass at expanding Streaming Chunk

* Working version!

* Some tweaks and also make ToolInvoker stream a chunk with a finish reason

* Properly update test

* Change to tool_name, remove kw_only since its python 3.10 only and update HuggingFaceAPIChatGenerator to start following new StreamingChunk

* Add reno

* Some cleanup

* Fix unit tests

* Fix mypy and integration test

* Fix pylint

* Start refactoring huggingface local api

* Refactor openai generator and chat generator to reuse util methods

* Did some reorg

* Reusue utility method in HuggingFaceAPI

* Get rid of unneeded default values in tests

* Update conversion of streaming chunks to chat message to not rely on openai dataclass anymore

* Fix tests and loosen check in StreamingChunk post_init

* Fixes

* Fix license header

* Add start and index to HFAPIGenerator

* Fix mypy

* Clean up

* Update haystack/components/generators/utils.py

Co-authored-by: Julian Risch <julian.risch@deepset.ai>

* Update haystack/components/generators/utils.py

Co-authored-by: Julian Risch <julian.risch@deepset.ai>

* Change StreamingChunk.start to only a bool

* PR comments

* Fix unit test

* PR comment

* Fix test

---------

Co-authored-by: Julian Risch <julian.risch@deepset.ai>

11512 of 12732 relevant lines covered (90.42%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.0
haystack/components/generators/utils.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import json
1✔
6
from typing import Dict, List
1✔
7

8
from haystack import logging
1✔
9
from haystack.dataclasses import ChatMessage, StreamingChunk, ToolCall
1✔
10

11
logger = logging.getLogger(__name__)
1✔
12

13

14
def print_streaming_chunk(chunk: StreamingChunk) -> None:
1✔
15
    """
16
    Callback function to handle and display streaming output chunks.
17

18
    This function processes a `StreamingChunk` object by:
19
    - Printing tool call metadata (if any), including function names and arguments, as they arrive.
20
    - Printing tool call results when available.
21
    - Printing the main content (e.g., text tokens) of the chunk as it is received.
22

23
    The function outputs data directly to stdout and flushes output buffers to ensure immediate display during
24
    streaming.
25

26
    :param chunk: A chunk of streaming data containing content and optional metadata, such as tool calls and
27
        tool results.
28
    """
29
    if chunk.start and chunk.index and chunk.index > 0:
×
30
        # If this is the start of a new content block but not the first content block, print two new lines
31
        print("\n\n", flush=True, end="")
×
32

33
    ## Tool Call streaming
34
    if chunk.tool_call:
×
35
        # If chunk.start is True indicates beginning of a tool call
36
        # Also presence of chunk.tool_call.name indicates the start of a tool call too
37
        if chunk.start:
×
38
            print("[TOOL CALL]\n", flush=True, end="")
×
39
            print(f"Tool: {chunk.tool_call.tool_name} ", flush=True, end="")
×
40
            print("\nArguments: ", flush=True, end="")
×
41

42
        # print the tool arguments
43
        if chunk.tool_call.arguments:
×
44
            print(chunk.tool_call.arguments, flush=True, end="")
×
45

46
    ## Tool Call Result streaming
47
    # Print tool call results if available (from ToolInvoker)
48
    if chunk.tool_call_result:
×
49
        # Tool Call Result is fully formed so delta accumulation is not needed
50
        print(f"[TOOL RESULT]\n{chunk.tool_call_result.result}", flush=True, end="")
×
51

52
    ## Normal content streaming
53
    # Print the main content of the chunk (from ChatGenerator)
54
    if chunk.content:
×
55
        if chunk.start:
×
56
            print("[ASSISTANT]\n", flush=True, end="")
×
57
        print(chunk.content, flush=True, end="")
×
58

59
    # End of LLM assistant message so we add two new lines
60
    # This ensures spacing between multiple LLM messages (e.g. Agent) or multiple Tool Call Results
61
    if chunk.meta.get("finish_reason") is not None:
×
62
        print("\n\n", flush=True, end="")
×
63

64

65
def _convert_streaming_chunks_to_chat_message(chunks: List[StreamingChunk]) -> ChatMessage:
1✔
66
    """
67
    Connects the streaming chunks into a single ChatMessage.
68

69
    :param chunks: The list of all `StreamingChunk` objects.
70

71
    :returns: The ChatMessage.
72
    """
73
    text = "".join([chunk.content for chunk in chunks])
1✔
74
    tool_calls = []
1✔
75

76
    # Process tool calls if present in any chunk
77
    tool_call_data: Dict[int, Dict[str, str]] = {}  # Track tool calls by index
1✔
78
    for chunk in chunks:
1✔
79
        if chunk.tool_call:
1✔
80
            # We do this to make sure mypy is happy, but we enforce index is not None in the StreamingChunk dataclass if
81
            # tool_call is present
82
            assert chunk.index is not None
1✔
83

84
            # We use the index of the chunk to track the tool call across chunks since the ID is not always provided
85
            if chunk.index not in tool_call_data:
1✔
86
                tool_call_data[chunk.index] = {"id": "", "name": "", "arguments": ""}
1✔
87

88
            # Save the ID if present
89
            if chunk.tool_call.id is not None:
1✔
90
                tool_call_data[chunk.index]["id"] = chunk.tool_call.id
1✔
91

92
            if chunk.tool_call.tool_name is not None:
1✔
93
                tool_call_data[chunk.index]["name"] += chunk.tool_call.tool_name
1✔
94
            if chunk.tool_call.arguments is not None:
1✔
95
                tool_call_data[chunk.index]["arguments"] += chunk.tool_call.arguments
1✔
96

97
    # Convert accumulated tool call data into ToolCall objects
98
    sorted_keys = sorted(tool_call_data.keys())
1✔
99
    for key in sorted_keys:
1✔
100
        tool_call = tool_call_data[key]
1✔
101
        try:
1✔
102
            arguments = json.loads(tool_call["arguments"])
1✔
103
            tool_calls.append(ToolCall(id=tool_call["id"], tool_name=tool_call["name"], arguments=arguments))
1✔
104
        except json.JSONDecodeError:
×
105
            logger.warning(
×
106
                "OpenAI returned a malformed JSON string for tool call arguments. This tool call "
107
                "will be skipped. To always generate a valid JSON, set `tools_strict` to `True`. "
108
                "Tool call ID: {_id}, Tool name: {_name}, Arguments: {_arguments}",
109
                _id=tool_call["id"],
110
                _name=tool_call["name"],
111
                _arguments=tool_call["arguments"],
112
            )
113

114
    # finish_reason can appear in different places so we look for the last one
115
    finish_reasons = [
1✔
116
        chunk.meta.get("finish_reason") for chunk in chunks if chunk.meta.get("finish_reason") is not None
117
    ]
118
    finish_reason = finish_reasons[-1] if finish_reasons else None
1✔
119

120
    meta = {
1✔
121
        "model": chunks[-1].meta.get("model"),
122
        "index": 0,
123
        "finish_reason": finish_reason,
124
        "completion_start_time": chunks[0].meta.get("received_at"),  # first chunk received
125
        "usage": chunks[-1].meta.get("usage"),  # last chunk has the final usage data if available
126
    }
127

128
    return ChatMessage.from_assistant(text=text or None, tool_calls=tool_calls, meta=meta)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc