• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 17434976747

03 Sep 2025 01:26PM UTC coverage: 92.056% (-0.02%) from 92.079%
17434976747

Pull #9759

github

web-flow
Merge 7d9036b38 into f48789f5f
Pull Request #9759: feat: Add PipelineTool to streamline using Pipeline as Tools with Agent

12967 of 14086 relevant lines covered (92.06%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.53
haystack/core/pipeline/utils.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import heapq
1✔
6
from copy import deepcopy
1✔
7
from functools import wraps
1✔
8
from itertools import count
1✔
9
from typing import Any, Optional
1✔
10

11
from haystack import logging
1✔
12
from haystack.core.component import Component
1✔
13

14
logger = logging.getLogger(__name__)
1✔
15

16

17
def _deepcopy_with_exceptions(obj: Any) -> Any:
1✔
18
    """
19
    Attempts to perform a deep copy of the given object.
20

21
    This function recursively handles common container types (lists, tuples, sets, and dicts) to ensure deep copies
22
    of nested structures. For specific object types that are known to be problematic for deepcopying-such as
23
    instances of `Component`, `Tool`, or `Toolset` - the original object is returned as-is.
24
    If `deepcopy` fails for any other reason, the original object is returned and a log message is recorded.
25

26
    :param obj: The object to be deep-copied.
27

28
    :returns:
29
        A deep-copied version of the object, or the original object if deepcopying fails.
30
    """
31
    # Import here to avoid circular imports
32
    from haystack.tools.tool import Tool
1✔
33
    from haystack.tools.toolset import Toolset
1✔
34

35
    if isinstance(obj, (list, tuple, set)):
1✔
36
        return type(obj)(_deepcopy_with_exceptions(v) for v in obj)
1✔
37

38
    if isinstance(obj, dict):
1✔
39
        return {k: _deepcopy_with_exceptions(v) for k, v in obj.items()}
1✔
40

41
    # Components and Tools often contain objects that we do not want to deepcopy or are not deepcopyable
42
    # (e.g. models, clients, etc.). In this case we return the object as-is.
43
    if isinstance(obj, (Component, Tool, Toolset)):
1✔
44
        return obj
1✔
45

46
    try:
1✔
47
        return deepcopy(obj)
1✔
48
    except Exception as e:
1✔
49
        logger.info(
1✔
50
            "Deepcopy failed for object of type '{obj_type}'. Error: {error}. Returning original object instead.",
51
            obj_type=type(obj).__name__,
52
            error=e,
53
        )
54
        return obj
1✔
55

56

57
def parse_connect_string(connection: str) -> tuple[str, Optional[str]]:
1✔
58
    """
59
    Returns component-connection pairs from a connect_to/from string.
60

61
    :param connection:
62
        The connection string.
63
    :returns:
64
        A tuple containing the component name and the connection name.
65
    """
66
    if "." in connection:
1✔
67
        split_str = connection.split(".", maxsplit=1)
1✔
68
        return (split_str[0], split_str[1])
1✔
69
    return connection, None
1✔
70

71

72
class FIFOPriorityQueue:
1✔
73
    """
74
    A priority queue that maintains FIFO order for items of equal priority.
75

76
    Items with the same priority are processed in the order they were added.
77
    This queue ensures that when multiple items share the same priority level,
78
    they are dequeued in the same order they were enqueued (First-In-First-Out).
79
    """
80

81
    def __init__(self) -> None:
1✔
82
        """
83
        Initialize a new FIFO priority queue.
84
        """
85
        # List of tuples (priority, count, item) where count ensures FIFO order
86
        self._queue: list[tuple[int, int, Any]] = []
1✔
87
        # Counter to maintain insertion order for equal priorities
88
        self._counter = count()
1✔
89

90
    def push(self, item: Any, priority: int) -> None:
1✔
91
        """
92
        Push an item into the queue with a given priority.
93

94
        Items with equal priority maintain FIFO ordering based on insertion time.
95
        Lower priority numbers are dequeued first.
96

97
        :param item:
98
            The item to insert into the queue.
99
        :param priority:
100
            Priority level for the item. Lower numbers indicate higher priority.
101
        """
102
        next_count = next(self._counter)
1✔
103
        entry = (priority, next_count, item)
1✔
104
        heapq.heappush(self._queue, entry)
1✔
105

106
    def pop(self) -> tuple[int, Any]:
1✔
107
        """
108
        Remove and return the highest priority item from the queue.
109

110
        For items with equal priority, returns the one that was inserted first.
111

112
        :returns:
113
            A tuple containing (priority, item) with the lowest priority number.
114
        :raises IndexError:
115
            If the queue is empty.
116
        """
117
        if not self._queue:
1✔
118
            raise IndexError("pop from empty queue")
1✔
119
        priority, _, item = heapq.heappop(self._queue)
1✔
120
        return priority, item
1✔
121

122
    def peek(self) -> tuple[int, Any]:
1✔
123
        """
124
        Return but don't remove the highest priority item from the queue.
125

126
        For items with equal priority, returns the one that was inserted first.
127

128
        :returns:
129
            A tuple containing (priority, item) with the lowest priority number.
130
        :raises IndexError:
131
            If the queue is empty.
132
        """
133
        if not self._queue:
1✔
134
            raise IndexError("peek at empty queue")
1✔
135
        priority, _, item = self._queue[0]
1✔
136
        return priority, item
1✔
137

138
    def get(self) -> Optional[tuple[int, Any]]:
1✔
139
        """
140
        Remove and return the highest priority item from the queue.
141

142
        For items with equal priority, returns the one that was inserted first.
143
        Unlike pop(), returns None if the queue is empty instead of raising an exception.
144

145
        :returns:
146
            A tuple containing (priority, item), or None if the queue is empty.
147
        """
148
        if not self._queue:
1✔
149
            return None
1✔
150
        priority, _, item = heapq.heappop(self._queue)
1✔
151
        return priority, item
1✔
152

153
    def __len__(self) -> int:
1✔
154
        """
155
        Return the number of items in the queue.
156

157
        :returns:
158
            The number of items currently in the queue.
159
        """
160
        return len(self._queue)
1✔
161

162
    def __bool__(self) -> bool:
1✔
163
        """
164
        Return True if the queue has items, False if empty.
165

166
        :returns:
167
            True if the queue contains items, False otherwise.
168
        """
169
        return bool(self._queue)
1✔
170

171

172
def args_deprecated(func):
1✔
173
    """
174
    Decorator to warn about the use of positional arguments in a function.
175

176
    Adapted from https://stackoverflow.com/questions/68432070/
177
    :param func:
178
    """
179

180
    def _positional_arg_warning() -> None:
1✔
181
        """
182
        Triggers a warning message if positional arguments are used in a function
183
        """
184
        import warnings
1✔
185

186
        msg = (
1✔
187
            "Warning: In an upcoming release, this method will require keyword arguments for all parameters. "
188
            "Please update your code to use keyword arguments to ensure future compatibility. "
189
        )
190
        warnings.warn(msg, DeprecationWarning, stacklevel=2)
1✔
191

192
    @wraps(func)
1✔
193
    def wrapper(*args, **kwargs):
1✔
194
        # call the function first, to make sure the signature matches
195
        ret_value = func(*args, **kwargs)
1✔
196

197
        # A Pipeline instance is always the first argument - remove it from the args to check for positional arguments
198
        # We check the class name as strings to avoid circular imports
199
        if args and isinstance(args, tuple) and args[0].__class__.__name__ in ["Pipeline", "PipelineBase"]:
1✔
200
            args = args[1:]
×
201

202
        if args:
1✔
203
            _positional_arg_warning()
1✔
204
        return ret_value
1✔
205

206
    return wrapper
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc