• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 18124993824

30 Sep 2025 09:17AM UTC coverage: 91.987% (-0.07%) from 92.055%
18124993824

Pull #9849

github

web-flow
Merge 88d2656a0 into 34aa66ecc
Pull Request #9849: Add Tool/Toolset warm_up

13237 of 14390 relevant lines covered (91.99%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.8
haystack/core/pipeline/utils.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import heapq
1✔
6
from collections.abc import Iterable
1✔
7
from copy import deepcopy
1✔
8
from functools import wraps
1✔
9
from itertools import count
1✔
10
from typing import Any, Optional
1✔
11

12
from haystack import logging
1✔
13
from haystack.core.component import Component
1✔
14

15
logger = logging.getLogger(__name__)
1✔
16

17

18
def warm_tools_on_component(component: Any, field_name: Optional[str] = None) -> None:
1✔
19
    """
20
    Warm any Tool or Toolset instances reachable from a component.
21

22
    :param component: The component to search for tools
23
    :param field_name: Optional specific field name to check instead of searching all attributes
24
    """
25
    # Import locally to avoid circular dependencies
26
    from haystack.tools.tool import Tool
1✔
27
    from haystack.tools.toolset import Toolset
1✔
28

29
    attributes = [field_name] if field_name else dir(component)
1✔
30

31
    for attr_name in attributes:
1✔
32
        try:
1✔
33
            attr_value = getattr(component, attr_name)
1✔
34
        except Exception as exc:  # pragma: no cover - defensive
35
            logger.debug(
36
                "Failed to access attribute {attr_name} on component {component}: {exc}",
37
                attr_name=attr_name,
38
                component=component.__class__.__name__,
39
                exc=exc,
40
            )
41
            continue
42

43
        if attr_value is None or callable(attr_value):
1✔
44
            continue
1✔
45

46
        for candidate in _iter_tool_candidates(attr_value):
1✔
47
            try:
1✔
48
                if isinstance(candidate, (Tool, Toolset)):
1✔
49
                    logger.debug("Warming up tools for component {component}", component=component.__class__.__name__)
1✔
50
                    candidate.warm_up()
1✔
51
            except Exception as exc:  # pragma: no cover - defensive
52
                logger.debug(
53
                    "Failed to warm tool candidate from attribute {attr_name} on component {component}: {exc}",
54
                    attr_name=attr_name,
55
                    component=component.__class__.__name__,
56
                    exc=exc,
57
                )
58

59

60
def _iter_tool_candidates(value: Any) -> Iterable[Any]:
1✔
61
    """Yield potential Tool or Toolset instances from a value."""
62
    from haystack.tools.tool import Tool
1✔
63
    from haystack.tools.toolset import Toolset
1✔
64

65
    if isinstance(value, (Tool, Toolset)):
1✔
66
        return (value,)
×
67

68
    if isinstance(value, Iterable) and not isinstance(value, (str, bytes, dict)):
1✔
69
        return value
1✔
70

71
    return ()
1✔
72

73

74
def _deepcopy_with_exceptions(obj: Any) -> Any:
1✔
75
    """
76
    Attempts to perform a deep copy of the given object.
77

78
    This function recursively handles common container types (lists, tuples, sets, and dicts) to ensure deep copies
79
    of nested structures. For specific object types that are known to be problematic for deepcopying-such as
80
    instances of `Component`, `Tool`, or `Toolset` - the original object is returned as-is.
81
    If `deepcopy` fails for any other reason, the original object is returned and a log message is recorded.
82

83
    :param obj: The object to be deep-copied.
84

85
    :returns:
86
        A deep-copied version of the object, or the original object if deepcopying fails.
87
    """
88
    # Import here to avoid circular imports
89
    from haystack.tools.tool import Tool
1✔
90
    from haystack.tools.toolset import Toolset
1✔
91

92
    if isinstance(obj, (list, tuple, set)):
1✔
93
        return type(obj)(_deepcopy_with_exceptions(v) for v in obj)
1✔
94

95
    if isinstance(obj, dict):
1✔
96
        return {k: _deepcopy_with_exceptions(v) for k, v in obj.items()}
1✔
97

98
    # Components and Tools often contain objects that we do not want to deepcopy or are not deepcopyable
99
    # (e.g. models, clients, etc.). In this case we return the object as-is.
100
    if isinstance(obj, (Component, Tool, Toolset)):
1✔
101
        return obj
1✔
102

103
    try:
1✔
104
        return deepcopy(obj)
1✔
105
    except Exception as e:
1✔
106
        logger.info(
1✔
107
            "Deepcopy failed for object of type '{obj_type}'. Error: {error}. Returning original object instead.",
108
            obj_type=type(obj).__name__,
109
            error=e,
110
        )
111
        return obj
1✔
112

113

114
def parse_connect_string(connection: str) -> tuple[str, Optional[str]]:
1✔
115
    """
116
    Returns component-connection pairs from a connect_to/from string.
117

118
    :param connection:
119
        The connection string.
120
    :returns:
121
        A tuple containing the component name and the connection name.
122
    """
123
    if "." in connection:
1✔
124
        split_str = connection.split(".", maxsplit=1)
1✔
125
        return (split_str[0], split_str[1])
1✔
126
    return connection, None
1✔
127

128

129
class FIFOPriorityQueue:
1✔
130
    """
131
    A priority queue that maintains FIFO order for items of equal priority.
132

133
    Items with the same priority are processed in the order they were added.
134
    This queue ensures that when multiple items share the same priority level,
135
    they are dequeued in the same order they were enqueued (First-In-First-Out).
136
    """
137

138
    def __init__(self) -> None:
1✔
139
        """
140
        Initialize a new FIFO priority queue.
141
        """
142
        # List of tuples (priority, count, item) where count ensures FIFO order
143
        self._queue: list[tuple[int, int, Any]] = []
1✔
144
        # Counter to maintain insertion order for equal priorities
145
        self._counter = count()
1✔
146

147
    def push(self, item: Any, priority: int) -> None:
1✔
148
        """
149
        Push an item into the queue with a given priority.
150

151
        Items with equal priority maintain FIFO ordering based on insertion time.
152
        Lower priority numbers are dequeued first.
153

154
        :param item:
155
            The item to insert into the queue.
156
        :param priority:
157
            Priority level for the item. Lower numbers indicate higher priority.
158
        """
159
        next_count = next(self._counter)
1✔
160
        entry = (priority, next_count, item)
1✔
161
        heapq.heappush(self._queue, entry)
1✔
162

163
    def pop(self) -> tuple[int, Any]:
1✔
164
        """
165
        Remove and return the highest priority item from the queue.
166

167
        For items with equal priority, returns the one that was inserted first.
168

169
        :returns:
170
            A tuple containing (priority, item) with the lowest priority number.
171
        :raises IndexError:
172
            If the queue is empty.
173
        """
174
        if not self._queue:
1✔
175
            raise IndexError("pop from empty queue")
1✔
176
        priority, _, item = heapq.heappop(self._queue)
1✔
177
        return priority, item
1✔
178

179
    def peek(self) -> tuple[int, Any]:
1✔
180
        """
181
        Return but don't remove the highest priority item from the queue.
182

183
        For items with equal priority, returns the one that was inserted first.
184

185
        :returns:
186
            A tuple containing (priority, item) with the lowest priority number.
187
        :raises IndexError:
188
            If the queue is empty.
189
        """
190
        if not self._queue:
1✔
191
            raise IndexError("peek at empty queue")
1✔
192
        priority, _, item = self._queue[0]
1✔
193
        return priority, item
1✔
194

195
    def get(self) -> Optional[tuple[int, Any]]:
1✔
196
        """
197
        Remove and return the highest priority item from the queue.
198

199
        For items with equal priority, returns the one that was inserted first.
200
        Unlike pop(), returns None if the queue is empty instead of raising an exception.
201

202
        :returns:
203
            A tuple containing (priority, item), or None if the queue is empty.
204
        """
205
        if not self._queue:
1✔
206
            return None
1✔
207
        priority, _, item = heapq.heappop(self._queue)
1✔
208
        return priority, item
1✔
209

210
    def __len__(self) -> int:
1✔
211
        """
212
        Return the number of items in the queue.
213

214
        :returns:
215
            The number of items currently in the queue.
216
        """
217
        return len(self._queue)
1✔
218

219
    def __bool__(self) -> bool:
1✔
220
        """
221
        Return True if the queue has items, False if empty.
222

223
        :returns:
224
            True if the queue contains items, False otherwise.
225
        """
226
        return bool(self._queue)
1✔
227

228

229
def args_deprecated(func):
1✔
230
    """
231
    Decorator to warn about the use of positional arguments in a function.
232

233
    Adapted from https://stackoverflow.com/questions/68432070/
234
    :param func:
235
    """
236

237
    def _positional_arg_warning() -> None:
1✔
238
        """
239
        Triggers a warning message if positional arguments are used in a function
240
        """
241
        import warnings
1✔
242

243
        msg = (
1✔
244
            "Warning: In an upcoming release, this method will require keyword arguments for all parameters. "
245
            "Please update your code to use keyword arguments to ensure future compatibility. "
246
        )
247
        warnings.warn(msg, DeprecationWarning, stacklevel=2)
1✔
248

249
    @wraps(func)
1✔
250
    def wrapper(*args, **kwargs):
1✔
251
        # call the function first, to make sure the signature matches
252
        ret_value = func(*args, **kwargs)
1✔
253

254
        # A Pipeline instance is always the first argument - remove it from the args to check for positional arguments
255
        # We check the class name as strings to avoid circular imports
256
        if args and isinstance(args, tuple) and args[0].__class__.__name__ in ["Pipeline", "PipelineBase"]:
1✔
257
            args = args[1:]
×
258

259
        if args:
1✔
260
            _positional_arg_warning()
1✔
261
        return ret_value
1✔
262

263
    return wrapper
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc