• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 18818043546

26 Oct 2025 12:38PM UTC coverage: 92.24% (+0.02%) from 92.219%
18818043546

Pull #9942

github

web-flow
Merge 9ca93ecfb into 554616981
Pull Request #9942: feat: Add warm_up() method to ChatGenerators for tool initialization

13491 of 14626 relevant lines covered (92.24%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.15
haystack/components/generators/chat/fallback.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
from __future__ import annotations
1✔
6

7
import asyncio
1✔
8
from typing import Any, Optional, Union
1✔
9

10
from haystack import component, default_from_dict, default_to_dict, logging
1✔
11
from haystack.components.generators.chat.types import ChatGenerator
1✔
12
from haystack.dataclasses import ChatMessage, StreamingCallbackT
1✔
13
from haystack.tools import ToolsType
1✔
14
from haystack.utils.deserialization import deserialize_component_inplace
1✔
15

16
logger = logging.getLogger(__name__)
1✔
17

18

19
@component
1✔
20
class FallbackChatGenerator:
1✔
21
    """
22
    A chat generator wrapper that tries multiple chat generators sequentially.
23

24
    It forwards all parameters transparently to the underlying chat generators and returns the first successful result.
25
    Calls chat generators sequentially until one succeeds. Falls back on any exception raised by a generator.
26
    If all chat generators fail, it raises a RuntimeError with details.
27

28
    Timeout enforcement is fully delegated to the underlying chat generators. The fallback mechanism will only
29
    work correctly if the underlying chat generators implement proper timeout handling and raise exceptions
30
    when timeouts occur. For predictable latency guarantees, ensure your chat generators:
31
    - Support a `timeout` parameter in their initialization
32
    - Implement timeout as total wall-clock time (shared deadline for both streaming and non-streaming)
33
    - Raise timeout exceptions (e.g., TimeoutError, asyncio.TimeoutError, httpx.TimeoutException) when exceeded
34

35
    Note: Most well-implemented chat generators (OpenAI, Anthropic, Cohere, etc.) support timeout parameters
36
    with consistent semantics. For HTTP-based LLM providers, a single timeout value (e.g., `timeout=30`)
37
    typically applies to all connection phases: connection setup, read, write, and pool. For streaming
38
    responses, read timeout is the maximum gap between chunks. For non-streaming, it's the time limit for
39
    receiving the complete response.
40

41
    Failover is automatically triggered when a generator raises any exception, including:
42
    - Timeout errors (if the generator implements and raises them)
43
    - Rate limit errors (429)
44
    - Authentication errors (401)
45
    - Context length errors (400)
46
    - Server errors (500+)
47
    - Any other exception
48
    """
49

50
    def __init__(self, chat_generators: list[ChatGenerator]):
1✔
51
        """
52
        Creates an instance of FallbackChatGenerator.
53

54
        :param chat_generators: A non-empty list of chat generator components to try in order.
55
        """
56
        if not chat_generators:
1✔
57
            msg = "'chat_generators' must be a non-empty list"
1✔
58
            raise ValueError(msg)
1✔
59

60
        self.chat_generators = list(chat_generators)
1✔
61

62
    def to_dict(self) -> dict[str, Any]:
1✔
63
        """Serialize the component, including nested chat generators when they support serialization."""
64
        return default_to_dict(
1✔
65
            self, chat_generators=[gen.to_dict() for gen in self.chat_generators if hasattr(gen, "to_dict")]
66
        )
67

68
    @classmethod
1✔
69
    def from_dict(cls, data: dict[str, Any]) -> FallbackChatGenerator:
1✔
70
        """Rebuild the component from a serialized representation, restoring nested chat generators."""
71
        # Reconstruct nested chat generators from their serialized dicts
72
        init_params = data.get("init_parameters", {})
1✔
73
        serialized = init_params.get("chat_generators") or []
1✔
74
        deserialized: list[Any] = []
1✔
75
        for g in serialized:
1✔
76
            # Use the generic component deserializer available in Haystack
77
            holder = {"component": g}
1✔
78
            deserialize_component_inplace(holder, key="component")
1✔
79
            deserialized.append(holder["component"])
1✔
80
        init_params["chat_generators"] = deserialized
1✔
81
        data["init_parameters"] = init_params
1✔
82
        return default_from_dict(cls, data)
1✔
83

84
    def warm_up(self) -> None:
1✔
85
        """
86
        Warm up all underlying chat generators.
87

88
        This method calls warm_up() on each underlying generator that supports it.
89
        """
90
        for gen in self.chat_generators:
1✔
91
            if hasattr(gen, "warm_up") and callable(gen.warm_up):
1✔
92
                gen.warm_up()
1✔
93

94
    def _run_single_sync(  # pylint: disable=too-many-positional-arguments
1✔
95
        self,
96
        gen: Any,
97
        messages: list[ChatMessage],
98
        generation_kwargs: Union[dict[str, Any], None],
99
        tools: Optional[ToolsType],
100
        streaming_callback: Union[StreamingCallbackT, None],
101
    ) -> dict[str, Any]:
102
        return gen.run(
1✔
103
            messages=messages, generation_kwargs=generation_kwargs, tools=tools, streaming_callback=streaming_callback
104
        )
105

106
    async def _run_single_async(  # pylint: disable=too-many-positional-arguments
1✔
107
        self,
108
        gen: Any,
109
        messages: list[ChatMessage],
110
        generation_kwargs: Union[dict[str, Any], None],
111
        tools: Optional[ToolsType],
112
        streaming_callback: Union[StreamingCallbackT, None],
113
    ) -> dict[str, Any]:
114
        if hasattr(gen, "run_async") and callable(gen.run_async):
1✔
115
            return await gen.run_async(
1✔
116
                messages=messages,
117
                generation_kwargs=generation_kwargs,
118
                tools=tools,
119
                streaming_callback=streaming_callback,
120
            )
121
        return await asyncio.to_thread(
1✔
122
            gen.run,
123
            messages=messages,
124
            generation_kwargs=generation_kwargs,
125
            tools=tools,
126
            streaming_callback=streaming_callback,
127
        )
128

129
    @component.output_types(replies=list[ChatMessage], meta=dict[str, Any])
1✔
130
    def run(
1✔
131
        self,
132
        messages: list[ChatMessage],
133
        generation_kwargs: Union[dict[str, Any], None] = None,
134
        tools: Optional[ToolsType] = None,
135
        streaming_callback: Union[StreamingCallbackT, None] = None,
136
    ) -> dict[str, Any]:
137
        """
138
        Execute chat generators sequentially until one succeeds.
139

140
        :param messages: The conversation history as a list of ChatMessage instances.
141
        :param generation_kwargs: Optional parameters for the chat generator (e.g., temperature, max_tokens).
142
        :param tools: A list of Tool and/or Toolset objects, or a single Toolset for function calling capabilities.
143
        :param streaming_callback: Optional callable for handling streaming responses.
144
        :returns: A dictionary with:
145
            - "replies": Generated ChatMessage instances from the first successful generator.
146
            - "meta": Execution metadata including successful_chat_generator_index, successful_chat_generator_class,
147
              total_attempts, failed_chat_generators, plus any metadata from the successful generator.
148
        :raises RuntimeError: If all chat generators fail.
149
        """
150
        failed: list[str] = []
1✔
151
        last_error: Union[BaseException, None] = None
1✔
152

153
        for idx, gen in enumerate(self.chat_generators):
1✔
154
            gen_name = gen.__class__.__name__
1✔
155
            try:
1✔
156
                result = self._run_single_sync(gen, messages, generation_kwargs, tools, streaming_callback)
1✔
157
                replies = result.get("replies", [])
1✔
158
                meta = dict(result.get("meta", {}))
1✔
159
                meta.update(
1✔
160
                    {
161
                        "successful_chat_generator_index": idx,
162
                        "successful_chat_generator_class": gen_name,
163
                        "total_attempts": idx + 1,
164
                        "failed_chat_generators": failed,
165
                    }
166
                )
167
                return {"replies": replies, "meta": meta}
1✔
168
            except Exception as e:  # noqa: BLE001 - fallback logic should handle any exception
1✔
169
                logger.warning(
1✔
170
                    "ChatGenerator {chat_generator} failed with error: {error}", chat_generator=gen_name, error=e
171
                )
172
                failed.append(gen_name)
1✔
173
                last_error = e
1✔
174

175
        failed_names = ", ".join(failed)
1✔
176
        msg = (
1✔
177
            f"All {len(self.chat_generators)} chat generators failed. "
178
            f"Last error: {last_error}. Failed chat generators: [{failed_names}]"
179
        )
180
        raise RuntimeError(msg)
1✔
181

182
    @component.output_types(replies=list[ChatMessage], meta=dict[str, Any])
1✔
183
    async def run_async(
1✔
184
        self,
185
        messages: list[ChatMessage],
186
        generation_kwargs: Union[dict[str, Any], None] = None,
187
        tools: Optional[ToolsType] = None,
188
        streaming_callback: Union[StreamingCallbackT, None] = None,
189
    ) -> dict[str, Any]:
190
        """
191
        Asynchronously execute chat generators sequentially until one succeeds.
192

193
        :param messages: The conversation history as a list of ChatMessage instances.
194
        :param generation_kwargs: Optional parameters for the chat generator (e.g., temperature, max_tokens).
195
        :param tools: A list of Tool and/or Toolset objects, or a single Toolset for function calling capabilities.
196
        :param streaming_callback: Optional callable for handling streaming responses.
197
        :returns: A dictionary with:
198
            - "replies": Generated ChatMessage instances from the first successful generator.
199
            - "meta": Execution metadata including successful_chat_generator_index, successful_chat_generator_class,
200
              total_attempts, failed_chat_generators, plus any metadata from the successful generator.
201
        :raises RuntimeError: If all chat generators fail.
202
        """
203
        failed: list[str] = []
1✔
204
        last_error: Union[BaseException, None] = None
1✔
205

206
        for idx, gen in enumerate(self.chat_generators):
1✔
207
            gen_name = gen.__class__.__name__
1✔
208
            try:
1✔
209
                result = await self._run_single_async(gen, messages, generation_kwargs, tools, streaming_callback)
1✔
210
                replies = result.get("replies", [])
1✔
211
                meta = dict(result.get("meta", {}))
1✔
212
                meta.update(
1✔
213
                    {
214
                        "successful_chat_generator_index": idx,
215
                        "successful_chat_generator_class": gen_name,
216
                        "total_attempts": idx + 1,
217
                        "failed_chat_generators": failed,
218
                    }
219
                )
220
                return {"replies": replies, "meta": meta}
1✔
221
            except Exception as e:  # noqa: BLE001 - fallback logic should handle any exception
1✔
222
                logger.warning(
1✔
223
                    "ChatGenerator {chat_generator} failed with error: {error}", chat_generator=gen_name, error=e
224
                )
225
                failed.append(gen_name)
1✔
226
                last_error = e
1✔
227

228
        failed_names = ", ".join(failed)
×
229
        msg = (
×
230
            f"All {len(self.chat_generators)} chat generators failed. "
231
            f"Last error: {last_error}. Failed chat generators: [{failed_names}]"
232
        )
233
        raise RuntimeError(msg)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc