• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 15277232880

27 May 2025 01:55PM UTC coverage: 90.41% (+0.02%) from 90.388%
15277232880

push

github

web-flow
refactor: Refactor hf api chat generator (#9449)

* Refactor HFAPI Chat Generator

* Add component info to generators

* Fix type hint

* Add reno

* Fix unit tests

* Remove incorrect dev comment

* Move _convert_streaming_chunks_to_chat_message to utils file

11464 of 12680 relevant lines covered (90.41%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.0
haystack/components/generators/utils.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import json
1✔
6
from typing import Any, Dict, List
1✔
7

8
from openai.types.chat.chat_completion_chunk import ChoiceDeltaToolCall
1✔
9

10
from haystack import logging
1✔
11
from haystack.dataclasses import ChatMessage, StreamingChunk, ToolCall
1✔
12

13
logger = logging.getLogger(__name__)
1✔
14

15

16
def print_streaming_chunk(chunk: StreamingChunk) -> None:
1✔
17
    """
18
    Callback function to handle and display streaming output chunks.
19

20
    This function processes a `StreamingChunk` object by:
21
    - Printing tool call metadata (if any), including function names and arguments, as they arrive.
22
    - Printing tool call results when available.
23
    - Printing the main content (e.g., text tokens) of the chunk as it is received.
24

25
    The function outputs data directly to stdout and flushes output buffers to ensure immediate display during
26
    streaming.
27

28
    :param chunk: A chunk of streaming data containing content and optional metadata, such as tool calls and
29
        tool results.
30
    """
31
    # Print tool call metadata if available (from ChatGenerator)
32
    if tool_calls := chunk.meta.get("tool_calls"):
×
33
        for tool_call in tool_calls:
×
34
            # Convert to dict if tool_call is a ChoiceDeltaToolCall
35
            tool_call_dict: Dict[str, Any] = (
×
36
                tool_call.to_dict() if isinstance(tool_call, ChoiceDeltaToolCall) else tool_call
37
            )
38

39
            if function := tool_call_dict.get("function"):
×
40
                if name := function.get("name"):
×
41
                    print("\n\n[TOOL CALL]\n", flush=True, end="")
×
42
                    print(f"Tool: {name} ", flush=True, end="")
×
43
                    print("\nArguments: ", flush=True, end="")
×
44

45
                if arguments := function.get("arguments"):
×
46
                    print(arguments, flush=True, end="")
×
47

48
    # Print tool call results if available (from ToolInvoker)
49
    if tool_result := chunk.meta.get("tool_result"):
×
50
        print(f"\n\n[TOOL RESULT]\n{tool_result}", flush=True, end="")
×
51

52
    # Print the main content of the chunk (from ChatGenerator)
53
    if content := chunk.content:
×
54
        print(content, flush=True, end="")
×
55

56
    # End of LLM assistant message so we add two new lines
57
    # This ensures spacing between multiple LLM messages (e.g. Agent)
58
    if chunk.meta.get("finish_reason") is not None:
×
59
        print("\n\n", flush=True, end="")
×
60

61

62
def _convert_streaming_chunks_to_chat_message(chunks: List[StreamingChunk]) -> ChatMessage:
1✔
63
    """
64
    Connects the streaming chunks into a single ChatMessage.
65

66
    :param chunks: The list of all `StreamingChunk` objects.
67

68
    :returns: The ChatMessage.
69
    """
70
    text = "".join([chunk.content for chunk in chunks])
1✔
71
    tool_calls = []
1✔
72

73
    # Process tool calls if present in any chunk
74
    tool_call_data: Dict[str, Dict[str, str]] = {}  # Track tool calls by index
1✔
75
    for chunk_payload in chunks:
1✔
76
        tool_calls_meta = chunk_payload.meta.get("tool_calls")
1✔
77
        if tool_calls_meta is not None:
1✔
78
            for delta in tool_calls_meta:
1✔
79
                # We use the index of the tool call to track it across chunks since the ID is not always provided
80
                if delta.index not in tool_call_data:
1✔
81
                    tool_call_data[delta.index] = {"id": "", "name": "", "arguments": ""}
1✔
82

83
                # Save the ID if present
84
                if delta.id is not None:
1✔
85
                    tool_call_data[delta.index]["id"] = delta.id
1✔
86

87
                if delta.function is not None:
1✔
88
                    if delta.function.name is not None:
1✔
89
                        tool_call_data[delta.index]["name"] += delta.function.name
1✔
90
                    if delta.function.arguments is not None:
1✔
91
                        tool_call_data[delta.index]["arguments"] += delta.function.arguments
1✔
92

93
    # Convert accumulated tool call data into ToolCall objects
94
    for call_data in tool_call_data.values():
1✔
95
        try:
1✔
96
            arguments = json.loads(call_data["arguments"])
1✔
97
            tool_calls.append(ToolCall(id=call_data["id"], tool_name=call_data["name"], arguments=arguments))
1✔
98
        except json.JSONDecodeError:
×
99
            logger.warning(
×
100
                "OpenAI returned a malformed JSON string for tool call arguments. This tool call "
101
                "will be skipped. To always generate a valid JSON, set `tools_strict` to `True`. "
102
                "Tool call ID: {_id}, Tool name: {_name}, Arguments: {_arguments}",
103
                _id=call_data["id"],
104
                _name=call_data["name"],
105
                _arguments=call_data["arguments"],
106
            )
107

108
    # finish_reason can appear in different places so we look for the last one
109
    finish_reasons = [
1✔
110
        chunk.meta.get("finish_reason") for chunk in chunks if chunk.meta.get("finish_reason") is not None
111
    ]
112
    finish_reason = finish_reasons[-1] if finish_reasons else None
1✔
113

114
    meta = {
1✔
115
        "model": chunks[-1].meta.get("model"),
116
        "index": 0,
117
        "finish_reason": finish_reason,
118
        "completion_start_time": chunks[0].meta.get("received_at"),  # first chunk received
119
        "usage": chunks[-1].meta.get("usage"),  # last chunk has the final usage data if available
120
    }
121

122
    return ChatMessage.from_assistant(text=text or None, tool_calls=tool_calls, meta=meta)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc