• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 19114177728

05 Nov 2025 07:41PM UTC coverage: 92.248%. Remained the same
19114177728

Pull #9932

github

web-flow
Merge 3db96ab24 into 510d06361
Pull Request #9932: fix: prompt-builder - jinja2 template set vars still shows required

13531 of 14668 relevant lines covered (92.25%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.77
haystack/dataclasses/streaming_chunk.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
from dataclasses import asdict, dataclass, field
1✔
6
from typing import Any, Awaitable, Callable, Literal, Optional, Union, overload
1✔
7

8
from haystack.core.component import Component
1✔
9
from haystack.dataclasses.chat_message import ReasoningContent, ToolCallResult
1✔
10
from haystack.utils.asynchronous import is_callable_async_compatible
1✔
11

12
# Type alias for standard finish_reason values following OpenAI's convention
13
# plus Haystack-specific value ("tool_call_results")
14
FinishReason = Literal["stop", "length", "tool_calls", "content_filter", "tool_call_results"]
1✔
15

16

17
@dataclass
1✔
18
class ToolCallDelta:
1✔
19
    """
20
    Represents a Tool call prepared by the model, usually contained in an assistant message.
21

22
    :param index: The index of the Tool call in the list of Tool calls.
23
    :param tool_name: The name of the Tool to call.
24
    :param arguments: Either the full arguments in JSON format or a delta of the arguments.
25
    :param id: The ID of the Tool call.
26
    :param extra: Dictionary of extra information about the Tool call. Use to store provider-specific
27
        information. To avoid serialization issues, values should be JSON serializable.
28
    """
29

30
    index: int
1✔
31
    tool_name: Optional[str] = field(default=None)
1✔
32
    arguments: Optional[str] = field(default=None)
1✔
33
    id: Optional[str] = field(default=None)  # noqa: A003
1✔
34
    extra: Optional[dict[str, Any]] = field(default=None)
1✔
35

36
    def to_dict(self) -> dict[str, Any]:
1✔
37
        """
38
        Returns a dictionary representation of the ToolCallDelta.
39

40
        :returns: A dictionary with keys 'index', 'tool_name', 'arguments', 'id', and 'extra'.
41
        """
42
        return asdict(self)
1✔
43

44
    @classmethod
1✔
45
    def from_dict(cls, data: dict[str, Any]) -> "ToolCallDelta":
1✔
46
        """
47
        Creates a ToolCallDelta from a serialized representation.
48

49
        :param data: Dictionary containing ToolCallDelta's attributes.
50
        :returns: A ToolCallDelta instance.
51
        """
52
        return ToolCallDelta(**data)
1✔
53

54

55
@dataclass
1✔
56
class ComponentInfo:
1✔
57
    """
58
    The `ComponentInfo` class encapsulates information about a component.
59

60
    :param type: The type of the component.
61
    :param name: The name of the component assigned when adding it to a pipeline.
62

63
    """
64

65
    type: str
1✔
66
    name: Optional[str] = field(default=None)
1✔
67

68
    @classmethod
1✔
69
    def from_component(cls, component: Component) -> "ComponentInfo":
1✔
70
        """
71
        Create a `ComponentInfo` object from a `Component` instance.
72

73
        :param component:
74
            The `Component` instance.
75
        :returns:
76
            The `ComponentInfo` object with the type and name of the given component.
77
        """
78
        component_type = f"{component.__class__.__module__}.{component.__class__.__name__}"
1✔
79
        component_name = getattr(component, "__component_name__", None)
1✔
80
        return cls(type=component_type, name=component_name)
1✔
81

82
    def to_dict(self) -> dict[str, Any]:
1✔
83
        """
84
        Returns a dictionary representation of ComponentInfo.
85

86
        :returns: A dictionary with keys 'type' and 'name'.
87
        """
88
        return asdict(self)
1✔
89

90
    @classmethod
1✔
91
    def from_dict(cls, data: dict[str, Any]) -> "ComponentInfo":
1✔
92
        """
93
        Creates a ComponentInfo from a serialized representation.
94

95
        :param data: Dictionary containing ComponentInfo's attributes.
96
        :returns: A ComponentInfo instance.
97
        """
98
        return ComponentInfo(**data)
1✔
99

100

101
@dataclass
1✔
102
class StreamingChunk:
1✔
103
    """
104
    The `StreamingChunk` class encapsulates a segment of streamed content along with associated metadata.
105

106
    This structure facilitates the handling and processing of streamed data in a systematic manner.
107

108
    :param content: The content of the message chunk as a string.
109
    :param meta: A dictionary containing metadata related to the message chunk.
110
    :param component_info: A `ComponentInfo` object containing information about the component that generated the chunk,
111
        such as the component name and type.
112
    :param index: An optional integer index representing which content block this chunk belongs to.
113
    :param tool_calls: An optional list of ToolCallDelta object representing a tool call associated with the message
114
        chunk.
115
    :param tool_call_result: An optional ToolCallResult object representing the result of a tool call.
116
    :param start: A boolean indicating whether this chunk marks the start of a content block.
117
    :param finish_reason: An optional value indicating the reason the generation finished.
118
        Standard values follow OpenAI's convention: "stop", "length", "tool_calls", "content_filter",
119
        plus Haystack-specific value "tool_call_results".
120
    :param reasoning: An optional ReasoningContent object representing the reasoning content associated
121
        with the message chunk.
122
    """
123

124
    content: str
1✔
125
    meta: dict[str, Any] = field(default_factory=dict, hash=False)
1✔
126
    component_info: Optional[ComponentInfo] = field(default=None)
1✔
127
    index: Optional[int] = field(default=None)
1✔
128
    tool_calls: Optional[list[ToolCallDelta]] = field(default=None)
1✔
129
    tool_call_result: Optional[ToolCallResult] = field(default=None)
1✔
130
    start: bool = field(default=False)
1✔
131
    finish_reason: Optional[FinishReason] = field(default=None)
1✔
132
    reasoning: Optional[ReasoningContent] = field(default=None)
1✔
133

134
    def __post_init__(self):
1✔
135
        fields_set = sum(bool(x) for x in (self.content, self.tool_calls, self.tool_call_result, self.reasoning))
1✔
136
        if fields_set > 1:
1✔
137
            raise ValueError(
1✔
138
                "Only one of `content`, `tool_call`, `tool_call_result` or `reasoning` may be set in a StreamingChunk. "
139
                f"Got content: '{self.content}', tool_call: '{self.tool_calls}', "
140
                f"tool_call_result: '{self.tool_call_result}', reasoning: '{self.reasoning}'."
141
            )
142

143
        # NOTE: We don't enforce this for self.content otherwise it would be a breaking change
144
        if (self.tool_calls or self.tool_call_result or self.reasoning) and self.index is None:
1✔
145
            raise ValueError("If `tool_call`, `tool_call_result` or `reasoning` is set, `index` must also be set.")
1✔
146

147
    def to_dict(self) -> dict[str, Any]:
1✔
148
        """
149
        Returns a dictionary representation of the StreamingChunk.
150

151
        :returns: Serialized dictionary representation of the calling object.
152
        """
153
        return {
1✔
154
            "content": self.content,
155
            "meta": self.meta,
156
            "component_info": self.component_info.to_dict() if self.component_info else None,
157
            "index": self.index,
158
            "tool_calls": [tc.to_dict() for tc in self.tool_calls] if self.tool_calls else None,
159
            "tool_call_result": self.tool_call_result.to_dict() if self.tool_call_result else None,
160
            "start": self.start,
161
            "finish_reason": self.finish_reason,
162
            "reasoning": self.reasoning.to_dict() if self.reasoning else None,
163
        }
164

165
    @classmethod
1✔
166
    def from_dict(cls, data: dict[str, Any]) -> "StreamingChunk":
1✔
167
        """
168
        Creates a deserialized StreamingChunk instance from a serialized representation.
169

170
        :param data: Dictionary containing the StreamingChunk's attributes.
171
        :returns: A StreamingChunk instance.
172
        """
173
        if "content" not in data:
1✔
174
            raise ValueError("Missing required field `content` in StreamingChunk deserialization.")
×
175

176
        return StreamingChunk(
1✔
177
            content=data["content"],
178
            meta=data.get("meta", {}),
179
            component_info=ComponentInfo.from_dict(data["component_info"]) if data.get("component_info") else None,
180
            index=data.get("index"),
181
            tool_calls=[ToolCallDelta.from_dict(tc) for tc in data["tool_calls"]] if data.get("tool_calls") else None,
182
            tool_call_result=ToolCallResult.from_dict(data["tool_call_result"])
183
            if data.get("tool_call_result")
184
            else None,
185
            start=data.get("start", False),
186
            finish_reason=data.get("finish_reason"),
187
            reasoning=ReasoningContent.from_dict(data["reasoning"]) if data.get("reasoning") else None,
188
        )
189

190

191
SyncStreamingCallbackT = Callable[[StreamingChunk], None]
1✔
192
AsyncStreamingCallbackT = Callable[[StreamingChunk], Awaitable[None]]
1✔
193

194
StreamingCallbackT = Union[SyncStreamingCallbackT, AsyncStreamingCallbackT]
1✔
195

196

197
@overload
198
def select_streaming_callback(
199
    init_callback: Optional[StreamingCallbackT],
200
    runtime_callback: Optional[StreamingCallbackT],
201
    requires_async: Literal[False],
202
) -> Optional[SyncStreamingCallbackT]: ...
203
@overload
204
def select_streaming_callback(
205
    init_callback: Optional[StreamingCallbackT],
206
    runtime_callback: Optional[StreamingCallbackT],
207
    requires_async: Literal[True],
208
) -> Optional[AsyncStreamingCallbackT]: ...
209

210

211
def select_streaming_callback(
1✔
212
    init_callback: Optional[StreamingCallbackT], runtime_callback: Optional[StreamingCallbackT], requires_async: bool
213
) -> Optional[StreamingCallbackT]:
214
    """
215
    Picks the correct streaming callback given an optional initial and runtime callback.
216

217
    The runtime callback takes precedence over the initial callback.
218

219
    :param init_callback:
220
        The initial callback.
221
    :param runtime_callback:
222
        The runtime callback.
223
    :param requires_async:
224
        Whether the selected callback must be async compatible.
225
    :returns:
226
        The selected callback.
227
    """
228
    if init_callback is not None:
1✔
229
        if requires_async and not is_callable_async_compatible(init_callback):
1✔
230
            raise ValueError("The init callback must be async compatible.")
1✔
231
        if not requires_async and is_callable_async_compatible(init_callback):
1✔
232
            raise ValueError("The init callback cannot be a coroutine.")
1✔
233

234
    if runtime_callback is not None:
1✔
235
        if requires_async and not is_callable_async_compatible(runtime_callback):
1✔
236
            raise ValueError("The runtime callback must be async compatible.")
×
237
        if not requires_async and is_callable_async_compatible(runtime_callback):
1✔
238
            raise ValueError("The runtime callback cannot be a coroutine.")
×
239

240
    return runtime_callback or init_callback
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc