• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 10815083488

11 Sep 2024 03:45PM UTC coverage: 90.308% (-0.003%) from 90.311%
10815083488

Pull #8354

github

web-flow
Merge f2c88f653 into 3016c5ca9
Pull Request #8354: feat: Deprecate `max_loops_allowed` in favour of new argument `max_runs_per_component`

7184 of 7955 relevant lines covered (90.31%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.55
haystack/core/pipeline/pipeline.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
from copy import deepcopy
1✔
6
from typing import Any, Dict, List, Mapping, Optional, Set, Tuple
1✔
7
from warnings import warn
1✔
8

9
from haystack import logging, tracing
1✔
10
from haystack.core.component import Component
1✔
11
from haystack.core.errors import PipelineMaxComponentRuns, PipelineRuntimeError
1✔
12
from haystack.core.pipeline.base import (
1✔
13
    _dequeue_component,
14
    _dequeue_waiting_component,
15
    _enqueue_component,
16
    _enqueue_waiting_component,
17
)
18
from haystack.telemetry import pipeline_running
1✔
19

20
from .base import PipelineBase, _add_missing_input_defaults, _is_lazy_variadic
1✔
21

22
logger = logging.getLogger(__name__)
1✔
23

24

25
class Pipeline(PipelineBase):
1✔
26
    """
27
    Synchronous version of the orchestration engine.
28

29
    Orchestrates component execution according to the execution graph, one after the other.
30
    """
31

32
    def _run_component(self, name: str, inputs: Dict[str, Any]) -> Dict[str, Any]:
1✔
33
        """
34
        Runs a Component with the given inputs.
35

36
        :param name: Name of the Component as defined in the Pipeline.
37
        :param inputs: Inputs for the Component.
38
        :raises PipelineRuntimeError: If Component doesn't return a dictionary.
39
        :return: The output of the Component.
40
        """
41
        instance: Component = self.graph.nodes[name]["instance"]
1✔
42

43
        with tracing.tracer.trace(
1✔
44
            "haystack.component.run",
45
            tags={
46
                "haystack.component.name": name,
47
                "haystack.component.type": instance.__class__.__name__,
48
                "haystack.component.input_types": {k: type(v).__name__ for k, v in inputs.items()},
49
                "haystack.component.input_spec": {
50
                    key: {
51
                        "type": (value.type.__name__ if isinstance(value.type, type) else str(value.type)),
52
                        "senders": value.senders,
53
                    }
54
                    for key, value in instance.__haystack_input__._sockets_dict.items()  # type: ignore
55
                },
56
                "haystack.component.output_spec": {
57
                    key: {
58
                        "type": (value.type.__name__ if isinstance(value.type, type) else str(value.type)),
59
                        "receivers": value.receivers,
60
                    }
61
                    for key, value in instance.__haystack_output__._sockets_dict.items()  # type: ignore
62
                },
63
            },
64
        ) as span:
65
            span.set_content_tag("haystack.component.input", inputs)
1✔
66
            logger.info("Running component {component_name}", component_name=name)
1✔
67
            res: Dict[str, Any] = instance.run(**inputs)
1✔
68
            self.graph.nodes[name]["visits"] += 1
1✔
69

70
            # After a Component that has variadic inputs is run, we need to reset the variadic inputs that were consumed
71
            for socket in instance.__haystack_input__._sockets_dict.values():  # type: ignore
1✔
72
                if socket.name not in inputs:
1✔
73
                    continue
×
74
                if socket.is_variadic:
1✔
75
                    inputs[socket.name] = []
1✔
76

77
            if not isinstance(res, Mapping):
1✔
78
                raise PipelineRuntimeError(
×
79
                    f"Component '{name}' didn't return a dictionary. "
80
                    "Components must always return dictionaries: check the the documentation."
81
                )
82
            span.set_tag("haystack.component.visits", self.graph.nodes[name]["visits"])
1✔
83
            span.set_content_tag("haystack.component.output", res)
1✔
84

85
            return res
1✔
86

87
    def run(  # noqa: PLR0915
1✔
88
        self, data: Dict[str, Any], include_outputs_from: Optional[Set[str]] = None
89
    ) -> Dict[str, Any]:
90
        """
91
        Runs the pipeline with given input data.
92

93
        :param data:
94
            A dictionary of inputs for the pipeline's components. Each key is a component name
95
            and its value is a dictionary of that component's input parameters:
96
            ```
97
            data = {
98
                "comp1": {"input1": 1, "input2": 2},
99
            }
100
            ```
101
            For convenience, this format is also supported when input names are unique:
102
            ```
103
            data = {
104
                "input1": 1, "input2": 2,
105
            }
106
            ```
107

108
        :param include_outputs_from:
109
            Set of component names whose individual outputs are to be
110
            included in the pipeline's output. For components that are
111
            invoked multiple times (in a loop), only the last-produced
112
            output is included.
113
        :returns:
114
            A dictionary where each entry corresponds to a component name
115
            and its output. If `include_outputs_from` is `None`, this dictionary
116
            will only contain the outputs of leaf components, i.e., components
117
            without outgoing connections.
118

119
        :raises PipelineRuntimeError:
120
            If a component fails or returns unexpected output.
121

122
        Example a - Using named components:
123
        Consider a 'Hello' component that takes a 'word' input and outputs a greeting.
124

125
        ```python
126
        @component
127
        class Hello:
128
            @component.output_types(output=str)
129
            def run(self, word: str):
130
                return {"output": f"Hello, {word}!"}
131
        ```
132

133
        Create a pipeline with two 'Hello' components connected together:
134

135
        ```python
136
        pipeline = Pipeline()
137
        pipeline.add_component("hello", Hello())
138
        pipeline.add_component("hello2", Hello())
139
        pipeline.connect("hello.output", "hello2.word")
140
        result = pipeline.run(data={"hello": {"word": "world"}})
141
        ```
142

143
        This runs the pipeline with the specified input for 'hello', yielding
144
        {'hello2': {'output': 'Hello, Hello, world!!'}}.
145

146
        Example b - Using flat inputs:
147
        You can also pass inputs directly without specifying component names:
148

149
        ```python
150
        result = pipeline.run(data={"word": "world"})
151
        ```
152

153
        The pipeline resolves inputs to the correct components, returning
154
        {'hello2': {'output': 'Hello, Hello, world!!'}}.
155
        """
156
        pipeline_running(self)
1✔
157

158
        # Reset the visits count for each component
159
        self._init_graph()
1✔
160

161
        # TODO: Remove this warmup once we can check reliably whether a component has been warmed up or not
162
        # As of now it's here to make sure we don't have failing tests that assume warm_up() is called in run()
163
        self.warm_up()
1✔
164

165
        # normalize `data`
166
        data = self._prepare_component_input_data(data)
1✔
167

168
        # Raise if input is malformed in some way
169
        self._validate_input(data)
1✔
170

171
        # Initialize the inputs state
172
        components_inputs: Dict[str, Dict[str, Any]] = self._init_inputs_state(data)
1✔
173

174
        # Take all components that:
175
        # - have no inputs
176
        # - receive input from the user
177
        # - have at least one input not connected
178
        # - have at least one input that is variadic
179
        run_queue: List[Tuple[str, Component]] = self._init_run_queue(data)
1✔
180

181
        # These variables are used to detect when we're stuck in a loop.
182
        # Stuck loops can happen when one or more components are waiting for input but
183
        # no other component is going to run.
184
        # This can happen when a whole branch of the graph is skipped for example.
185
        # When we find that two consecutive iterations of the loop where the waiting_queue is the same,
186
        # we know we're stuck in a loop and we can't make any progress.
187
        #
188
        # They track the previous two states of the waiting_queue. So if waiting_queue would n,
189
        # before_last_waiting_queue would be n-2 and last_waiting_queue would be n-1.
190
        # When we run a component, we reset both.
191
        before_last_waiting_queue: Optional[Set[str]] = None
1✔
192
        last_waiting_queue: Optional[Set[str]] = None
1✔
193

194
        # The waiting_for_input list is used to keep track of components that are waiting for input.
195
        waiting_queue: List[Tuple[str, Component]] = []
1✔
196

197
        include_outputs_from = set() if include_outputs_from is None else include_outputs_from
1✔
198

199
        # This is what we'll return at the end
200
        final_outputs: Dict[Any, Any] = {}
1✔
201

202
        with tracing.tracer.trace(
1✔
203
            "haystack.pipeline.run",
204
            tags={
205
                "haystack.pipeline.input_data": data,
206
                "haystack.pipeline.output_data": final_outputs,
207
                "haystack.pipeline.metadata": self.metadata,
208
                "haystack.pipeline.max_loops_allowed": self._max_runs_per_component,
209
                "haystack.pipeline.max_runs_per_component": self._max_runs_per_component,
210
            },
211
        ):
212
            # Cache for extra outputs, if enabled.
213
            extra_outputs: Dict[Any, Any] = {}
1✔
214

215
            while len(run_queue) > 0:
1✔
216
                name, comp = run_queue.pop(0)
1✔
217

218
                if _is_lazy_variadic(comp) and not all(_is_lazy_variadic(comp) for _, comp in run_queue):
1✔
219
                    # We run Components with lazy variadic inputs only if there only Components with
220
                    # lazy variadic inputs left to run
221
                    _enqueue_waiting_component((name, comp), waiting_queue)
×
222
                    continue
×
223

224
                if self._component_has_enough_inputs_to_run(name, components_inputs):
1✔
225
                    if self.graph.nodes[name]["visits"] > self._max_runs_per_component:
1✔
226
                        msg = f"Maximum run count {self._max_runs_per_component} reached for component '{name}'"
×
227
                        raise PipelineMaxComponentRuns(msg)
×
228

229
                    res: Dict[str, Any] = self._run_component(name, components_inputs[name])
1✔
230

231
                    if name in include_outputs_from:
1✔
232
                        # Deepcopy the outputs to prevent downstream nodes from modifying them
233
                        # We don't care about loops - Always store the last output.
234
                        extra_outputs[name] = deepcopy(res)
×
235

236
                    # Reset the waiting for input previous states, we managed to run a component
237
                    before_last_waiting_queue = None
1✔
238
                    last_waiting_queue = None
1✔
239

240
                    # We manage to run this component that was in the waiting list, we can remove it.
241
                    # This happens when a component was put in the waiting list but we reached it from another edge.
242
                    _dequeue_waiting_component((name, comp), waiting_queue)
1✔
243

244
                    for pair in self._find_components_that_will_receive_no_input(name, res, components_inputs):
1✔
245
                        _dequeue_component(pair, run_queue, waiting_queue)
×
246
                    res = self._distribute_output(name, res, components_inputs, run_queue, waiting_queue)
1✔
247

248
                    if len(res) > 0:
1✔
249
                        final_outputs[name] = res
1✔
250
                else:
251
                    # This component doesn't have enough inputs so we can't run it yet
252
                    _enqueue_waiting_component((name, comp), waiting_queue)
1✔
253

254
                if len(run_queue) == 0 and len(waiting_queue) > 0:
1✔
255
                    # Check if we're stuck in a loop.
256
                    # It's important to check whether previous waitings are None as it could be that no
257
                    # Component has actually been run yet.
258
                    if (
1✔
259
                        before_last_waiting_queue is not None
260
                        and last_waiting_queue is not None
261
                        and before_last_waiting_queue == last_waiting_queue
262
                    ):
263
                        if self._is_stuck_in_a_loop(waiting_queue):
1✔
264
                            # We're stuck! We can't make any progress.
265
                            msg = (
×
266
                                "Pipeline is stuck running in a loop. Partial outputs will be returned. "
267
                                "Check the Pipeline graph for possible issues."
268
                            )
269
                            warn(RuntimeWarning(msg))
×
270
                            break
×
271

272
                        (name, comp) = self._find_next_runnable_lazy_variadic_or_default_component(waiting_queue)
1✔
273
                        _add_missing_input_defaults(name, comp, components_inputs)
1✔
274
                        _enqueue_component((name, comp), run_queue, waiting_queue)
1✔
275
                        continue
1✔
276

277
                    before_last_waiting_queue = last_waiting_queue.copy() if last_waiting_queue is not None else None
1✔
278
                    last_waiting_queue = {item[0] for item in waiting_queue}
1✔
279

280
                    (name, comp) = self._find_next_runnable_component(components_inputs, waiting_queue)
1✔
281
                    _add_missing_input_defaults(name, comp, components_inputs)
1✔
282
                    _enqueue_component((name, comp), run_queue, waiting_queue)
1✔
283

284
            if len(include_outputs_from) > 0:
1✔
285
                for name, output in extra_outputs.items():
×
286
                    inner = final_outputs.get(name)
×
287
                    if inner is None:
×
288
                        final_outputs[name] = output
×
289
                    else:
290
                        # Let's not override any keys that are already
291
                        # in the final_outputs as they might be different
292
                        # from what we cached in extra_outputs, e.g. when loops
293
                        # are involved.
294
                        for k, v in output.items():
×
295
                            if k not in inner:
×
296
                                inner[k] = v
×
297

298
            return final_outputs
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc