• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 10815083488

11 Sep 2024 03:45PM UTC coverage: 90.308% (-0.003%) from 90.311%
10815083488

Pull #8354

github

web-flow
Merge f2c88f653 into 3016c5ca9
Pull Request #8354: feat: Deprecate `max_loops_allowed` in favour of new argument `max_runs_per_component`

7184 of 7955 relevant lines covered (90.31%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.19
haystack/core/pipeline/base.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import importlib
1✔
6
import itertools
1✔
7
import warnings
1✔
8
from collections import defaultdict
1✔
9
from copy import copy, deepcopy
1✔
10
from datetime import datetime
1✔
11
from pathlib import Path
1✔
12
from typing import Any, Dict, Iterator, List, Optional, Set, TextIO, Tuple, Type, TypeVar, Union
1✔
13

14
import networkx  # type:ignore
1✔
15

16
from haystack import logging
1✔
17
from haystack.core.component import Component, InputSocket, OutputSocket, component
1✔
18
from haystack.core.errors import (
1✔
19
    DeserializationError,
20
    PipelineConnectError,
21
    PipelineDrawingError,
22
    PipelineError,
23
    PipelineUnmarshalError,
24
    PipelineValidationError,
25
)
26
from haystack.core.serialization import DeserializationCallbacks, component_from_dict, component_to_dict
1✔
27
from haystack.core.type_utils import _type_name, _types_are_compatible
1✔
28
from haystack.marshal import Marshaller, YamlMarshaller
1✔
29
from haystack.utils import is_in_jupyter
1✔
30

31
from .descriptions import find_pipeline_inputs, find_pipeline_outputs
1✔
32
from .draw import _to_mermaid_image
1✔
33
from .template import PipelineTemplate, PredefinedPipeline
1✔
34
from .utils import parse_connect_string
1✔
35

36
DEFAULT_MARSHALLER = YamlMarshaller()
1✔
37

38
# We use a generic type to annotate the return value of classmethods,
39
# so that static analyzers won't be confused when derived classes
40
# use those methods.
41
T = TypeVar("T", bound="PipelineBase")
1✔
42

43
logger = logging.getLogger(__name__)
1✔
44

45
_MAX_LOOPS_ALLOWED_DEPRECATION_MESSAGE = (
1✔
46
    "'max_loops_allowed' argument is deprecated and will be removed in version '2.7.0'. "
47
    "Use 'max_runs_per_component' instead."
48
)
49

50

51
class PipelineBase:
1✔
52
    """
53
    Components orchestration engine.
54

55
    Builds a graph of components and orchestrates their execution according to the execution graph.
56
    """
57

58
    def __init__(
1✔
59
        self,
60
        metadata: Optional[Dict[str, Any]] = None,
61
        max_loops_allowed: Optional[int] = None,
62
        debug_path: Union[Path, str] = Path(".haystack_debug/"),
63
        max_runs_per_component: int = 100,
64
    ):
65
        """
66
        Creates the Pipeline.
67

68
        :param metadata:
69
            Arbitrary dictionary to store metadata about this `Pipeline`. Make sure all the values contained in
70
            this dictionary can be serialized and deserialized if you wish to save this `Pipeline` to file.
71
        :param max_loops_allowed:
72
            How many times the `Pipeline` can run the same node before throwing an exception.
73
            This is deprecated and will be removed in version 2.7.0, use `max_runs_per_component` instead.
74
        :param debug_path:
75
            When debug is enabled in `run()`, where to save the debug data.
76
        :param max_runs_per_component:
77
            How many times the `Pipeline` can run the same Component.
78
            If this limit is reached a `PipelineMaxComponentRuns` exception is raised.
79
            If not set defaults to 100 runs per Component.
80
        """
81
        self._telemetry_runs = 0
1✔
82
        self._last_telemetry_sent: Optional[datetime] = None
1✔
83
        self.metadata = metadata or {}
1✔
84
        self.graph = networkx.MultiDiGraph()
1✔
85
        self._debug: Dict[int, Dict[str, Any]] = {}
1✔
86
        self._debug_path = Path(debug_path)
1✔
87

88
        if max_loops_allowed is not None:
1✔
89
            warnings.warn(_MAX_LOOPS_ALLOWED_DEPRECATION_MESSAGE, DeprecationWarning)
1✔
90
            self._max_runs_per_component = max_loops_allowed
1✔
91
        else:
92
            self._max_runs_per_component = max_runs_per_component
1✔
93

94
    @property
1✔
95
    def max_loops_allowed(self) -> int:
1✔
96
        """
97
        Returns the maximum number of runs per Component allowed in this Pipeline.
98

99
        This is a deprecated field, use `max_runs_per_component` instead.
100

101
        :return: Maximum number of runs per Component
102
        """
103
        warnings.warn(_MAX_LOOPS_ALLOWED_DEPRECATION_MESSAGE, DeprecationWarning)
1✔
104
        return self._max_runs_per_component
1✔
105

106
    @max_loops_allowed.setter
1✔
107
    def max_loops_allowed(self, value: int):
1✔
108
        """
109
        Sets the maximum number of runs per Component allowed in this Pipeline.
110

111
        This is a deprecated property, use `max_runs_per_component` instead.
112

113
        :param value: Maximum number of runs per Component
114
        """
115
        warnings.warn(_MAX_LOOPS_ALLOWED_DEPRECATION_MESSAGE, DeprecationWarning)
×
116
        self._max_runs_per_component = value
×
117

118
    def __eq__(self, other) -> bool:
1✔
119
        """
120
        Pipeline equality is defined by their type and the equality of their serialized form.
121

122
        Pipelines of the same type share every metadata, node and edge, but they're not required to use
123
        the same node instances: this allows pipeline saved and then loaded back to be equal to themselves.
124
        """
125
        if not isinstance(self, type(other)):
1✔
126
            return False
×
127
        return self.to_dict() == other.to_dict()
1✔
128

129
    def __repr__(self) -> str:
1✔
130
        """
131
        Returns a text representation of the Pipeline.
132
        """
133
        res = f"{object.__repr__(self)}\n"
1✔
134
        if self.metadata:
1✔
135
            res += "🧱 Metadata\n"
1✔
136
            for k, v in self.metadata.items():
1✔
137
                res += f"  - {k}: {v}\n"
1✔
138

139
        res += "🚅 Components\n"
1✔
140
        for name, instance in self.graph.nodes(data="instance"):  # type: ignore # type wrongly defined in networkx
1✔
141
            res += f"  - {name}: {instance.__class__.__name__}\n"
1✔
142

143
        res += "🛤️ Connections\n"
1✔
144
        for sender, receiver, edge_data in self.graph.edges(data=True):
1✔
145
            sender_socket = edge_data["from_socket"].name
1✔
146
            receiver_socket = edge_data["to_socket"].name
1✔
147
            res += f"  - {sender}.{sender_socket} -> {receiver}.{receiver_socket} ({edge_data['conn_type']})\n"
1✔
148

149
        return res
1✔
150

151
    def to_dict(self) -> Dict[str, Any]:
1✔
152
        """
153
        Serializes the pipeline to a dictionary.
154

155
        This is meant to be an intermediate representation but it can be also used to save a pipeline to file.
156

157
        :returns:
158
            Dictionary with serialized data.
159
        """
160
        components = {}
1✔
161
        for name, instance in self.graph.nodes(data="instance"):  # type:ignore
1✔
162
            components[name] = component_to_dict(instance)
1✔
163

164
        connections = []
1✔
165
        for sender, receiver, edge_data in self.graph.edges.data():
1✔
166
            sender_socket = edge_data["from_socket"].name
1✔
167
            receiver_socket = edge_data["to_socket"].name
1✔
168
            connections.append({"sender": f"{sender}.{sender_socket}", "receiver": f"{receiver}.{receiver_socket}"})
1✔
169
        return {
1✔
170
            "metadata": self.metadata,
171
            "max_runs_per_component": self._max_runs_per_component,
172
            "components": components,
173
            "connections": connections,
174
        }
175

176
    @classmethod
1✔
177
    def from_dict(
1✔
178
        cls: Type[T], data: Dict[str, Any], callbacks: Optional[DeserializationCallbacks] = None, **kwargs
179
    ) -> T:
180
        """
181
        Deserializes the pipeline from a dictionary.
182

183
        :param data:
184
            Dictionary to deserialize from.
185
        :param callbacks:
186
            Callbacks to invoke during deserialization.
187
        :param kwargs:
188
            `components`: a dictionary of {name: instance} to reuse instances of components instead of creating new
189
            ones.
190
        :returns:
191
            Deserialized component.
192
        """
193
        data_copy = deepcopy(data)  # to prevent modification of original data
1✔
194
        metadata = data_copy.get("metadata", {})
1✔
195
        max_runs_per_component = data_copy.get("max_runs_per_component", 100)
1✔
196
        max_loops_allowed = data_copy.get("max_loops_allowed", None)
1✔
197
        debug_path = Path(data_copy.get("debug_path", ".haystack_debug/"))
1✔
198
        pipe = cls(
1✔
199
            metadata=metadata,
200
            max_loops_allowed=max_loops_allowed,
201
            max_runs_per_component=max_runs_per_component,
202
            debug_path=debug_path,
203
        )
204
        components_to_reuse = kwargs.get("components", {})
1✔
205
        for name, component_data in data_copy.get("components", {}).items():
1✔
206
            if name in components_to_reuse:
1✔
207
                # Reuse an instance
208
                instance = components_to_reuse[name]
1✔
209
            else:
210
                if "type" not in component_data:
1✔
211
                    raise PipelineError(f"Missing 'type' in component '{name}'")
1✔
212

213
                if component_data["type"] not in component.registry:
1✔
214
                    try:
1✔
215
                        # Import the module first...
216
                        module, _ = component_data["type"].rsplit(".", 1)
1✔
217
                        logger.debug("Trying to import module {module_name}", module_name=module)
1✔
218
                        importlib.import_module(module)
1✔
219
                        # ...then try again
220
                        if component_data["type"] not in component.registry:
1✔
221
                            raise PipelineError(
×
222
                                f"Successfully imported module {module} but can't find it in the component registry."
223
                                "This is unexpected and most likely a bug."
224
                            )
225
                    except (ImportError, PipelineError) as e:
1✔
226
                        raise PipelineError(f"Component '{component_data['type']}' not imported.") from e
1✔
227

228
                # Create a new one
229
                component_class = component.registry[component_data["type"]]
1✔
230

231
                try:
1✔
232
                    instance = component_from_dict(component_class, component_data, name, callbacks)
1✔
233
                except Exception as e:
1✔
234
                    msg = (
1✔
235
                        f"Couldn't deserialize component '{name}' of class '{component_class.__name__}' "
236
                        f"with the following data: {str(component_data)}. Possible reasons include "
237
                        "malformed serialized data, mismatch between the serialized component and the "
238
                        "loaded one (due to a breaking change, see "
239
                        "https://github.com/deepset-ai/haystack/releases), etc."
240
                    )
241
                    raise DeserializationError(msg) from e
1✔
242
            pipe.add_component(name=name, instance=instance)
1✔
243

244
        for connection in data.get("connections", []):
1✔
245
            if "sender" not in connection:
1✔
246
                raise PipelineError(f"Missing sender in connection: {connection}")
1✔
247
            if "receiver" not in connection:
1✔
248
                raise PipelineError(f"Missing receiver in connection: {connection}")
1✔
249
            pipe.connect(sender=connection["sender"], receiver=connection["receiver"])
1✔
250

251
        return pipe
1✔
252

253
    def dumps(self, marshaller: Marshaller = DEFAULT_MARSHALLER) -> str:
1✔
254
        """
255
        Returns the string representation of this pipeline according to the format dictated by the `Marshaller` in use.
256

257
        :param marshaller:
258
            The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
259
        :returns:
260
            A string representing the pipeline.
261
        """
262
        return marshaller.marshal(self.to_dict())
1✔
263

264
    def dump(self, fp: TextIO, marshaller: Marshaller = DEFAULT_MARSHALLER):
1✔
265
        """
266
        Writes the string representation of this pipeline to the file-like object passed in the `fp` argument.
267

268
        :param fp:
269
            A file-like object ready to be written to.
270
        :param marshaller:
271
            The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
272
        """
273
        fp.write(marshaller.marshal(self.to_dict()))
1✔
274

275
    @classmethod
1✔
276
    def loads(
1✔
277
        cls: Type[T],
278
        data: Union[str, bytes, bytearray],
279
        marshaller: Marshaller = DEFAULT_MARSHALLER,
280
        callbacks: Optional[DeserializationCallbacks] = None,
281
    ) -> T:
282
        """
283
        Creates a `Pipeline` object from the string representation passed in the `data` argument.
284

285
        :param data:
286
            The string representation of the pipeline, can be `str`, `bytes` or `bytearray`.
287
        :param marshaller:
288
            The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
289
        :param callbacks:
290
            Callbacks to invoke during deserialization.
291
        :raises DeserializationError:
292
            If an error occurs during deserialization.
293
        :returns:
294
            A `Pipeline` object.
295
        """
296
        try:
1✔
297
            deserialized_data = marshaller.unmarshal(data)
1✔
298
        except Exception as e:
1✔
299
            raise DeserializationError(
1✔
300
                "Error while unmarshalling serialized pipeline data. This is usually "
301
                "caused by malformed or invalid syntax in the serialized representation."
302
            ) from e
303

304
        return cls.from_dict(deserialized_data, callbacks)
1✔
305

306
    @classmethod
1✔
307
    def load(
1✔
308
        cls: Type[T],
309
        fp: TextIO,
310
        marshaller: Marshaller = DEFAULT_MARSHALLER,
311
        callbacks: Optional[DeserializationCallbacks] = None,
312
    ) -> T:
313
        """
314
        Creates a `Pipeline` object a string representation.
315

316
        The string representation is read from the file-like object passed in the `fp` argument.
317

318

319
        :param fp:
320
            A file-like object ready to be read from.
321
        :param marshaller:
322
            The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
323
        :param callbacks:
324
            Callbacks to invoke during deserialization.
325
        :raises DeserializationError:
326
            If an error occurs during deserialization.
327
        :returns:
328
            A `Pipeline` object.
329
        """
330
        return cls.loads(fp.read(), marshaller, callbacks)
1✔
331

332
    def add_component(self, name: str, instance: Component) -> None:
1✔
333
        """
334
        Add the given component to the pipeline.
335

336
        Components are not connected to anything by default: use `Pipeline.connect()` to connect components together.
337
        Component names must be unique, but component instances can be reused if needed.
338

339
        :param name:
340
            The name of the component to add.
341
        :param instance:
342
            The component instance to add.
343

344
        :raises ValueError:
345
            If a component with the same name already exists.
346
        :raises PipelineValidationError:
347
            If the given instance is not a Canals component.
348
        """
349
        # Component names are unique
350
        if name in self.graph.nodes:
1✔
351
            raise ValueError(f"A component named '{name}' already exists in this pipeline: choose another name.")
×
352

353
        # Components can't be named `_debug`
354
        if name == "_debug":
1✔
355
            raise ValueError("'_debug' is a reserved name for debug output. Choose another name.")
×
356

357
        # Component instances must be components
358
        if not isinstance(instance, Component):
1✔
359
            raise PipelineValidationError(
×
360
                f"'{type(instance)}' doesn't seem to be a component. Is this class decorated with @component?"
361
            )
362

363
        if getattr(instance, "__haystack_added_to_pipeline__", None):
1✔
364
            msg = (
1✔
365
                "Component has already been added in another Pipeline. Components can't be shared between Pipelines. "
366
                "Create a new instance instead."
367
            )
368
            raise PipelineError(msg)
1✔
369

370
        setattr(instance, "__haystack_added_to_pipeline__", self)
1✔
371

372
        # Add component to the graph, disconnected
373
        logger.debug("Adding component '{component_name}' ({component})", component_name=name, component=instance)
1✔
374
        # We're completely sure the fields exist so we ignore the type error
375
        self.graph.add_node(
1✔
376
            name,
377
            instance=instance,
378
            input_sockets=instance.__haystack_input__._sockets_dict,  # type: ignore[attr-defined]
379
            output_sockets=instance.__haystack_output__._sockets_dict,  # type: ignore[attr-defined]
380
            visits=0,
381
        )
382

383
    def remove_component(self, name: str) -> Component:
1✔
384
        """
385
        Remove and returns component from the pipeline.
386

387
        Remove an existing component from the pipeline by providing its name.
388
        All edges that connect to the component will also be deleted.
389

390
        :param name:
391
            The name of the component to remove.
392
        :returns:
393
            The removed Component instance.
394

395
        :raises ValueError:
396
            If there is no component with that name already in the Pipeline.
397
        """
398

399
        # Check that a component with that name is in the Pipeline
400
        try:
1✔
401
            instance = self.get_component(name)
1✔
402
        except ValueError as exc:
1✔
403
            raise ValueError(
1✔
404
                f"There is no component named '{name}' in the pipeline. The valid component names are: ",
405
                ", ".join(n for n in self.graph.nodes),
406
            ) from exc
407

408
        # Delete component from the graph, deleting all its connections
409
        self.graph.remove_node(name)
1✔
410

411
        # Reset the Component sockets' senders and receivers
412
        input_sockets = instance.__haystack_input__._sockets_dict  # type: ignore[attr-defined]
1✔
413
        for socket in input_sockets.values():
1✔
414
            socket.senders = []
1✔
415

416
        output_sockets = instance.__haystack_output__._sockets_dict  # type: ignore[attr-defined]
1✔
417
        for socket in output_sockets.values():
1✔
418
            socket.receivers = []
1✔
419

420
        # Reset the Component's pipeline reference
421
        setattr(instance, "__haystack_added_to_pipeline__", None)
1✔
422

423
        return instance
1✔
424

425
    def connect(self, sender: str, receiver: str) -> "PipelineBase":
1✔
426
        """
427
        Connects two components together.
428

429
        All components to connect must exist in the pipeline.
430
        If connecting to a component that has several output connections, specify the inputs and output names as
431
        'component_name.connections_name'.
432

433
        :param sender:
434
            The component that delivers the value. This can be either just a component name or can be
435
            in the format `component_name.connection_name` if the component has multiple outputs.
436
        :param receiver:
437
            The component that receives the value. This can be either just a component name or can be
438
            in the format `component_name.connection_name` if the component has multiple inputs.
439
        :returns:
440
            The Pipeline instance.
441

442
        :raises PipelineConnectError:
443
            If the two components cannot be connected (for example if one of the components is
444
            not present in the pipeline, or the connections don't match by type, and so on).
445
        """
446
        # Edges may be named explicitly by passing 'node_name.edge_name' to connect().
447
        sender_component_name, sender_socket_name = parse_connect_string(sender)
1✔
448
        receiver_component_name, receiver_socket_name = parse_connect_string(receiver)
1✔
449

450
        # Get the nodes data.
451
        try:
1✔
452
            from_sockets = self.graph.nodes[sender_component_name]["output_sockets"]
1✔
453
        except KeyError as exc:
1✔
454
            raise ValueError(f"Component named {sender_component_name} not found in the pipeline.") from exc
1✔
455
        try:
1✔
456
            to_sockets = self.graph.nodes[receiver_component_name]["input_sockets"]
1✔
457
        except KeyError as exc:
1✔
458
            raise ValueError(f"Component named {receiver_component_name} not found in the pipeline.") from exc
1✔
459

460
        # If the name of either socket is given, get the socket
461
        sender_socket: Optional[OutputSocket] = None
1✔
462
        if sender_socket_name:
1✔
463
            sender_socket = from_sockets.get(sender_socket_name)
1✔
464
            if not sender_socket:
1✔
465
                raise PipelineConnectError(
1✔
466
                    f"'{sender} does not exist. "
467
                    f"Output connections of {sender_component_name} are: "
468
                    + ", ".join([f"{name} (type {_type_name(socket.type)})" for name, socket in from_sockets.items()])
469
                )
470

471
        receiver_socket: Optional[InputSocket] = None
1✔
472
        if receiver_socket_name:
1✔
473
            receiver_socket = to_sockets.get(receiver_socket_name)
1✔
474
            if not receiver_socket:
1✔
475
                raise PipelineConnectError(
1✔
476
                    f"'{receiver} does not exist. "
477
                    f"Input connections of {receiver_component_name} are: "
478
                    + ", ".join([f"{name} (type {_type_name(socket.type)})" for name, socket in to_sockets.items()])
479
                )
480

481
        # Look for a matching connection among the possible ones.
482
        # Note that if there is more than one possible connection but two sockets match by name, they're paired.
483
        sender_socket_candidates: List[OutputSocket] = [sender_socket] if sender_socket else list(from_sockets.values())
1✔
484
        receiver_socket_candidates: List[InputSocket] = (
1✔
485
            [receiver_socket] if receiver_socket else list(to_sockets.values())
486
        )
487

488
        # Find all possible connections between these two components
489
        possible_connections = [
1✔
490
            (sender_sock, receiver_sock)
491
            for sender_sock, receiver_sock in itertools.product(sender_socket_candidates, receiver_socket_candidates)
492
            if _types_are_compatible(sender_sock.type, receiver_sock.type)
493
        ]
494

495
        # We need this status for error messages, since we might need it in multiple places we calculate it here
496
        status = _connections_status(
1✔
497
            sender_node=sender_component_name,
498
            sender_sockets=sender_socket_candidates,
499
            receiver_node=receiver_component_name,
500
            receiver_sockets=receiver_socket_candidates,
501
        )
502

503
        if not possible_connections:
1✔
504
            # There's no possible connection between these two components
505
            if len(sender_socket_candidates) == len(receiver_socket_candidates) == 1:
1✔
506
                msg = (
1✔
507
                    f"Cannot connect '{sender_component_name}.{sender_socket_candidates[0].name}' with "
508
                    f"'{receiver_component_name}.{receiver_socket_candidates[0].name}': "
509
                    f"their declared input and output types do not match.\n{status}"
510
                )
511
            else:
512
                msg = (
×
513
                    f"Cannot connect '{sender_component_name}' with '{receiver_component_name}': "
514
                    f"no matching connections available.\n{status}"
515
                )
516
            raise PipelineConnectError(msg)
1✔
517

518
        if len(possible_connections) == 1:
1✔
519
            # There's only one possible connection, use it
520
            sender_socket = possible_connections[0][0]
1✔
521
            receiver_socket = possible_connections[0][1]
1✔
522

523
        if len(possible_connections) > 1:
1✔
524
            # There are multiple possible connection, let's try to match them by name
525
            name_matches = [
1✔
526
                (out_sock, in_sock) for out_sock, in_sock in possible_connections if in_sock.name == out_sock.name
527
            ]
528
            if len(name_matches) != 1:
1✔
529
                # There's are either no matches or more than one, we can't pick one reliably
530
                msg = (
1✔
531
                    f"Cannot connect '{sender_component_name}' with "
532
                    f"'{receiver_component_name}': more than one connection is possible "
533
                    "between these components. Please specify the connection name, like: "
534
                    f"pipeline.connect('{sender_component_name}.{possible_connections[0][0].name}', "
535
                    f"'{receiver_component_name}.{possible_connections[0][1].name}').\n{status}"
536
                )
537
                raise PipelineConnectError(msg)
1✔
538

539
            # Get the only possible match
540
            sender_socket = name_matches[0][0]
1✔
541
            receiver_socket = name_matches[0][1]
1✔
542

543
        # Connection must be valid on both sender/receiver sides
544
        if not sender_socket or not receiver_socket or not sender_component_name or not receiver_component_name:
1✔
545
            if sender_component_name and sender_socket:
×
546
                sender_repr = f"{sender_component_name}.{sender_socket.name} ({_type_name(sender_socket.type)})"
×
547
            else:
548
                sender_repr = "input needed"
×
549

550
            if receiver_component_name and receiver_socket:
×
551
                receiver_repr = f"({_type_name(receiver_socket.type)}) {receiver_component_name}.{receiver_socket.name}"
×
552
            else:
553
                receiver_repr = "output"
×
554
            msg = f"Connection must have both sender and receiver: {sender_repr} -> {receiver_repr}"
×
555
            raise PipelineConnectError(msg)
×
556

557
        logger.debug(
1✔
558
            "Connecting '{sender_component}.{sender_socket_name}' to '{receiver_component}.{receiver_socket_name}'",
559
            sender_component=sender_component_name,
560
            sender_socket_name=sender_socket.name,
561
            receiver_component=receiver_component_name,
562
            receiver_socket_name=receiver_socket.name,
563
        )
564

565
        if receiver_component_name in sender_socket.receivers and sender_component_name in receiver_socket.senders:
1✔
566
            # This is already connected, nothing to do
567
            return self
1✔
568

569
        if receiver_socket.senders and not receiver_socket.is_variadic:
1✔
570
            # Only variadic input sockets can receive from multiple senders
571
            msg = (
1✔
572
                f"Cannot connect '{sender_component_name}.{sender_socket.name}' with "
573
                f"'{receiver_component_name}.{receiver_socket.name}': "
574
                f"{receiver_component_name}.{receiver_socket.name} is already connected to {receiver_socket.senders}.\n"
575
            )
576
            raise PipelineConnectError(msg)
1✔
577

578
        # Update the sockets with the new connection
579
        sender_socket.receivers.append(receiver_component_name)
1✔
580
        receiver_socket.senders.append(sender_component_name)
1✔
581

582
        # Create the new connection
583
        self.graph.add_edge(
1✔
584
            sender_component_name,
585
            receiver_component_name,
586
            key=f"{sender_socket.name}/{receiver_socket.name}",
587
            conn_type=_type_name(sender_socket.type),
588
            from_socket=sender_socket,
589
            to_socket=receiver_socket,
590
            mandatory=receiver_socket.is_mandatory,
591
        )
592
        return self
1✔
593

594
    def get_component(self, name: str) -> Component:
1✔
595
        """
596
        Get the component with the specified name from the pipeline.
597

598
        :param name:
599
            The name of the component.
600
        :returns:
601
            The instance of that component.
602

603
        :raises ValueError:
604
            If a component with that name is not present in the pipeline.
605
        """
606
        try:
1✔
607
            return self.graph.nodes[name]["instance"]
1✔
608
        except KeyError as exc:
1✔
609
            raise ValueError(f"Component named {name} not found in the pipeline.") from exc
1✔
610

611
    def get_component_name(self, instance: Component) -> str:
1✔
612
        """
613
        Returns the name of the Component instance if it has been added to this Pipeline or an empty string otherwise.
614

615
        :param instance:
616
            The Component instance to look for.
617
        :returns:
618
            The name of the Component instance.
619
        """
620
        for name, inst in self.graph.nodes(data="instance"):  # type: ignore # type wrongly defined in networkx
1✔
621
            if inst == instance:
1✔
622
                return name
1✔
623
        return ""
1✔
624

625
    def inputs(self, include_components_with_connected_inputs: bool = False) -> Dict[str, Dict[str, Any]]:
1✔
626
        """
627
        Returns a dictionary containing the inputs of a pipeline.
628

629
        Each key in the dictionary corresponds to a component name, and its value is another dictionary that describes
630
        the input sockets of that component, including their types and whether they are optional.
631

632
        :param include_components_with_connected_inputs:
633
            If `False`, only components that have disconnected input edges are
634
            included in the output.
635
        :returns:
636
            A dictionary where each key is a pipeline component name and each value is a dictionary of
637
            inputs sockets of that component.
638
        """
639
        inputs: Dict[str, Dict[str, Any]] = {}
1✔
640
        for component_name, data in find_pipeline_inputs(self.graph, include_components_with_connected_inputs).items():
1✔
641
            sockets_description = {}
1✔
642
            for socket in data:
1✔
643
                sockets_description[socket.name] = {"type": socket.type, "is_mandatory": socket.is_mandatory}
1✔
644
                if not socket.is_mandatory:
1✔
645
                    sockets_description[socket.name]["default_value"] = socket.default_value
1✔
646

647
            if sockets_description:
1✔
648
                inputs[component_name] = sockets_description
1✔
649
        return inputs
1✔
650

651
    def outputs(self, include_components_with_connected_outputs: bool = False) -> Dict[str, Dict[str, Any]]:
1✔
652
        """
653
        Returns a dictionary containing the outputs of a pipeline.
654

655
        Each key in the dictionary corresponds to a component name, and its value is another dictionary that describes
656
        the output sockets of that component.
657

658
        :param include_components_with_connected_outputs:
659
            If `False`, only components that have disconnected output edges are
660
            included in the output.
661
        :returns:
662
            A dictionary where each key is a pipeline component name and each value is a dictionary of
663
            output sockets of that component.
664
        """
665
        outputs = {
1✔
666
            comp: {socket.name: {"type": socket.type} for socket in data}
667
            for comp, data in find_pipeline_outputs(self.graph, include_components_with_connected_outputs).items()
668
            if data
669
        }
670
        return outputs
1✔
671

672
    def show(self) -> None:
1✔
673
        """
674
        If running in a Jupyter notebook, display an image representing this `Pipeline`.
675

676
        """
677
        if is_in_jupyter():
1✔
678
            from IPython.display import Image, display  # type: ignore
1✔
679

680
            image_data = _to_mermaid_image(self.graph)
1✔
681

682
            display(Image(image_data))
1✔
683
        else:
684
            msg = "This method is only supported in Jupyter notebooks. Use Pipeline.draw() to save an image locally."
1✔
685
            raise PipelineDrawingError(msg)
1✔
686

687
    def draw(self, path: Path) -> None:
1✔
688
        """
689
        Save an image representing this `Pipeline` to `path`.
690

691
        :param path:
692
            The path to save the image to.
693
        """
694
        # Before drawing we edit a bit the graph, to avoid modifying the original that is
695
        # used for running the pipeline we copy it.
696
        image_data = _to_mermaid_image(self.graph)
1✔
697
        Path(path).write_bytes(image_data)
1✔
698

699
    def walk(self) -> Iterator[Tuple[str, Component]]:
1✔
700
        """
701
        Visits each component in the pipeline exactly once and yields its name and instance.
702

703
        No guarantees are provided on the visiting order.
704

705
        :returns:
706
            An iterator of tuples of component name and component instance.
707
        """
708
        for component_name, instance in self.graph.nodes(data="instance"):  # type: ignore # type is wrong in networkx
1✔
709
            yield component_name, instance
1✔
710

711
    def warm_up(self):
1✔
712
        """
713
        Make sure all nodes are warm.
714

715
        It's the node's responsibility to make sure this method can be called at every `Pipeline.run()`
716
        without re-initializing everything.
717
        """
718
        for node in self.graph.nodes:
1✔
719
            if hasattr(self.graph.nodes[node]["instance"], "warm_up"):
1✔
720
                logger.info("Warming up component {node}...", node=node)
×
721
                self.graph.nodes[node]["instance"].warm_up()
×
722

723
    def _validate_input(self, data: Dict[str, Any]):
1✔
724
        """
725
        Validates pipeline input data.
726

727
        Validates that data:
728
        * Each Component name actually exists in the Pipeline
729
        * Each Component is not missing any input
730
        * Each Component has only one input per input socket, if not variadic
731
        * Each Component doesn't receive inputs that are already sent by another Component
732

733
        :param data:
734
            A dictionary of inputs for the pipeline's components. Each key is a component name.
735

736
        :raises ValueError:
737
            If inputs are invalid according to the above.
738
        """
739
        for component_name, component_inputs in data.items():
1✔
740
            if component_name not in self.graph.nodes:
1✔
741
                raise ValueError(f"Component named {component_name} not found in the pipeline.")
1✔
742
            instance = self.graph.nodes[component_name]["instance"]
1✔
743
            for socket_name, socket in instance.__haystack_input__._sockets_dict.items():
1✔
744
                if socket.senders == [] and socket.is_mandatory and socket_name not in component_inputs:
1✔
745
                    raise ValueError(f"Missing input for component {component_name}: {socket_name}")
1✔
746
            for input_name in component_inputs.keys():
1✔
747
                if input_name not in instance.__haystack_input__._sockets_dict:
1✔
748
                    raise ValueError(f"Input {input_name} not found in component {component_name}.")
1✔
749

750
        for component_name in self.graph.nodes:
1✔
751
            instance = self.graph.nodes[component_name]["instance"]
1✔
752
            for socket_name, socket in instance.__haystack_input__._sockets_dict.items():
1✔
753
                component_inputs = data.get(component_name, {})
1✔
754
                if socket.senders == [] and socket.is_mandatory and socket_name not in component_inputs:
1✔
755
                    raise ValueError(f"Missing input for component {component_name}: {socket_name}")
1✔
756
                if socket.senders and socket_name in component_inputs and not socket.is_variadic:
1✔
757
                    raise ValueError(
1✔
758
                        f"Input {socket_name} for component {component_name} is already sent by {socket.senders}."
759
                    )
760

761
    def _prepare_component_input_data(self, data: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
1✔
762
        """
763
        Prepares input data for pipeline components.
764

765
        Organizes input data for pipeline components and identifies any inputs that are not matched to any
766
        component's input slots. Deep-copies data items to avoid sharing mutables across multiple components.
767

768
        This method processes a flat dictionary of input data, where each key-value pair represents an input name
769
        and its corresponding value. It distributes these inputs to the appropriate pipeline components based on
770
        their input requirements. Inputs that don't match any component's input slots are classified as unresolved.
771

772
        :param data:
773
            A dictionary potentially having input names as keys and input values as values.
774

775
        :returns:
776
            A dictionary mapping component names to their respective matched inputs.
777
        """
778
        # check whether the data is a nested dictionary of component inputs where each key is a component name
779
        # and each value is a dictionary of input parameters for that component
780
        is_nested_component_input = all(isinstance(value, dict) for value in data.values())
1✔
781
        if not is_nested_component_input:
1✔
782
            # flat input, a dict where keys are input names and values are the corresponding values
783
            # we need to convert it to a nested dictionary of component inputs and then run the pipeline
784
            # just like in the previous case
785
            pipeline_input_data: Dict[str, Dict[str, Any]] = defaultdict(dict)
1✔
786
            unresolved_kwargs = {}
1✔
787

788
            # Retrieve the input slots for each component in the pipeline
789
            available_inputs: Dict[str, Dict[str, Any]] = self.inputs()
1✔
790

791
            # Go through all provided to distribute them to the appropriate component inputs
792
            for input_name, input_value in data.items():
1✔
793
                resolved_at_least_once = False
1✔
794

795
                # Check each component to see if it has a slot for the current kwarg
796
                for component_name, component_inputs in available_inputs.items():
1✔
797
                    if input_name in component_inputs:
1✔
798
                        # If a match is found, add the kwarg to the component's input data
799
                        pipeline_input_data[component_name][input_name] = input_value
1✔
800
                        resolved_at_least_once = True
1✔
801

802
                if not resolved_at_least_once:
1✔
803
                    unresolved_kwargs[input_name] = input_value
1✔
804

805
            if unresolved_kwargs:
1✔
806
                logger.warning(
1✔
807
                    "Inputs {input_keys} were not matched to any component inputs, please check your run parameters.",
808
                    input_keys=list(unresolved_kwargs.keys()),
809
                )
810

811
            data = dict(pipeline_input_data)
1✔
812

813
        # deepcopying the inputs prevents the Pipeline run logic from being altered unexpectedly
814
        # when the same input reference is passed to multiple components.
815
        for component_name, component_inputs in data.items():
1✔
816
            data[component_name] = {k: deepcopy(v) for k, v in component_inputs.items()}
1✔
817

818
        return data
1✔
819

820
    def _init_inputs_state(self, data: Dict[str, Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
1✔
821
        for component_name, component_inputs in data.items():
1✔
822
            if component_name not in self.graph.nodes:
1✔
823
                # This is not a component name, it must be the name of one or more input sockets.
824
                # Those are handled in a different way, so we skip them here.
825
                continue
1✔
826
            instance = self.graph.nodes[component_name]["instance"]
1✔
827
            for component_input, input_value in component_inputs.items():
1✔
828
                # Handle mutable input data
829
                data[component_name][component_input] = copy(input_value)
1✔
830
                if instance.__haystack_input__._sockets_dict[component_input].is_variadic:
1✔
831
                    # Components that have variadic inputs need to receive lists as input.
832
                    # We don't want to force the user to always pass lists, so we convert single values to lists here.
833
                    # If it's already a list we assume the component takes a variadic input of lists, so we
834
                    # convert it in any case.
835
                    data[component_name][component_input] = [input_value]
1✔
836

837
        return {**data}
1✔
838

839
    def _init_run_queue(self, pipeline_inputs: Dict[str, Any]) -> List[Tuple[str, Component]]:
1✔
840
        run_queue: List[Tuple[str, Component]] = []
1✔
841

842
        # HACK: Quick workaround for the issue of execution order not being
843
        # well-defined (NB - https://github.com/deepset-ai/haystack/issues/7985).
844
        # We should fix the original execution logic instead.
845
        if networkx.is_directed_acyclic_graph(self.graph):
1✔
846
            # If the Pipeline is linear we can easily determine the order of execution with
847
            # a topological sort.
848
            # So use that to get the run order.
849
            for node in networkx.topological_sort(self.graph):
1✔
850
                run_queue.append((node, self.graph.nodes[node]["instance"]))
1✔
851
            return run_queue
1✔
852

853
        for node_name in self.graph.nodes:
1✔
854
            component = self.graph.nodes[node_name]["instance"]
1✔
855

856
            if len(component.__haystack_input__._sockets_dict) == 0:
1✔
857
                # Component has no input, can run right away
858
                run_queue.append((node_name, component))
×
859
                continue
×
860

861
            if node_name in pipeline_inputs:
1✔
862
                # This component is in the input data, if it has enough inputs it can run right away
863
                run_queue.append((node_name, component))
×
864
                continue
×
865

866
            for socket in component.__haystack_input__._sockets_dict.values():
1✔
867
                if not socket.senders or socket.is_variadic:
1✔
868
                    # Component has at least one input not connected or is variadic, can run right away.
869
                    run_queue.append((node_name, component))
×
870
                    break
×
871

872
        return run_queue
1✔
873

874
    @classmethod
1✔
875
    def from_template(
1✔
876
        cls, predefined_pipeline: PredefinedPipeline, template_params: Optional[Dict[str, Any]] = None
877
    ) -> "PipelineBase":
878
        """
879
        Create a Pipeline from a predefined template. See `PredefinedPipeline` for available options.
880

881
        :param predefined_pipeline:
882
            The predefined pipeline to use.
883
        :param template_params:
884
            An optional dictionary of parameters to use when rendering the pipeline template.
885
        :returns:
886
            An instance of `Pipeline`.
887
        """
888
        tpl = PipelineTemplate.from_predefined(predefined_pipeline)
1✔
889
        # If tpl.render() fails, we let bubble up the original error
890
        rendered = tpl.render(template_params)
1✔
891

892
        # If there was a problem with the rendered version of the
893
        # template, we add it to the error stack for debugging
894
        try:
1✔
895
            return cls.loads(rendered)
1✔
896
        except Exception as e:
×
897
            msg = f"Error unmarshalling pipeline: {e}\n"
×
898
            msg += f"Source:\n{rendered}"
×
899
            raise PipelineUnmarshalError(msg)
×
900

901
    def _init_graph(self):
1✔
902
        """Resets the visits count for each component"""
903
        for node in self.graph.nodes:
1✔
904
            self.graph.nodes[node]["visits"] = 0
1✔
905

906
    def _distribute_output(
1✔
907
        self,
908
        component_name: str,
909
        component_result: Dict[str, Any],
910
        components_inputs: Dict[str, Dict[str, Any]],
911
        run_queue: List[Tuple[str, Component]],
912
        waiting_queue: List[Tuple[str, Component]],
913
    ) -> Dict[str, Any]:
914
        """
915
        Distributes the output of a Component to the next Components that need it.
916

917
        This also updates the queues that keep track of which Components are ready to run and which are waiting for
918
        input.
919

920
        :param component_name: Name of the Component that created the output
921
        :param component_result: The output of the Component
922
        :paramt components_inputs: The current state of the inputs divided by Component name
923
        :param run_queue: Queue of Components to run
924
        :param waiting_queue: Queue of Components waiting for input
925

926
        :returns: The updated output of the Component without the keys that were distributed to other Components
927
        """
928
        # We keep track of which keys to remove from component_result at the end of the loop.
929
        # This is done after the output has been distributed to the next components, so that
930
        # we're sure all components that need this output have received it.
931
        to_remove_from_component_result = set()
1✔
932

933
        for _, receiver_name, connection in self.graph.edges(nbunch=component_name, data=True):
1✔
934
            sender_socket: OutputSocket = connection["from_socket"]
1✔
935
            receiver_socket: InputSocket = connection["to_socket"]
1✔
936

937
            if sender_socket.name not in component_result:
1✔
938
                # This output wasn't created by the sender, nothing we can do.
939
                #
940
                # Some Components might have conditional outputs, so we need to check if they actually returned
941
                # some output while iterating over their output sockets.
942
                #
943
                # A perfect example of this would be the ConditionalRouter, which will have an output for each
944
                # condition it has been initialized with.
945
                # Though it will return only one output at a time.
946
                continue
×
947

948
            if receiver_name not in components_inputs:
1✔
949
                components_inputs[receiver_name] = {}
1✔
950

951
            # We keep track of the keys that were distributed to other Components.
952
            # This key will be removed from component_result at the end of the loop.
953
            to_remove_from_component_result.add(sender_socket.name)
1✔
954

955
            value = component_result[sender_socket.name]
1✔
956

957
            if receiver_socket.is_variadic:
1✔
958
                # Usually Component inputs can only be received from one sender, the Variadic type allows
959
                # instead to receive inputs from multiple senders.
960
                #
961
                # To keep track of all the inputs received internally we always store them in a list.
962
                if receiver_socket.name not in components_inputs[receiver_name]:
1✔
963
                    # Create the list if it doesn't exist
964
                    components_inputs[receiver_name][receiver_socket.name] = []
1✔
965
                else:
966
                    # Check if the value is actually a list
967
                    assert isinstance(components_inputs[receiver_name][receiver_socket.name], list)
×
968
                components_inputs[receiver_name][receiver_socket.name].append(value)
1✔
969
            else:
970
                components_inputs[receiver_name][receiver_socket.name] = value
1✔
971

972
            receiver = self.graph.nodes[receiver_name]["instance"]
1✔
973
            pair = (receiver_name, receiver)
1✔
974

975
            is_greedy = getattr(receiver, "__haystack_is_greedy__", False)
1✔
976
            if receiver_socket.is_variadic:
1✔
977
                if is_greedy:
1✔
978
                    # If the receiver is greedy, we can run it as soon as possible.
979
                    # First we remove it from the status lists it's in if it's there or
980
                    # we risk running it multiple times.
981
                    if pair in run_queue:
×
982
                        run_queue.remove(pair)
×
983
                    if pair in waiting_queue:
×
984
                        waiting_queue.remove(pair)
×
985
                    run_queue.append(pair)
×
986
                else:
987
                    # If the receiver Component has a variadic input that is not greedy
988
                    # we put it in the waiting queue.
989
                    # This make sure that we don't run it earlier than necessary and we can collect
990
                    # as many inputs as we can before running it.
991
                    if pair not in waiting_queue:
1✔
992
                        waiting_queue.append(pair)
×
993

994
            if pair not in waiting_queue and pair not in run_queue:
1✔
995
                # Queue up the Component that received this input to run, only if it's not already waiting
996
                # for input or already ready to run.
997
                run_queue.append(pair)
1✔
998

999
        # Returns the output without the keys that were distributed to other Components
1000
        return {k: v for k, v in component_result.items() if k not in to_remove_from_component_result}
1✔
1001

1002
    def _find_next_runnable_component(
1✔
1003
        self, components_inputs: Dict[str, Dict[str, Any]], waiting_queue: List[Tuple[str, Component]]
1004
    ) -> Tuple[str, Component]:
1005
        """
1006
        Finds the next Component that can be run and returns it.
1007

1008
        :param components_inputs: The current state of the inputs divided by Component name
1009
        :param waiting_queue: Queue of Components waiting for input
1010

1011
        :returns: The name and the instance of the next Component that can be run
1012
        """
1013
        all_lazy_variadic = True
1✔
1014
        all_with_default_inputs = True
1✔
1015

1016
        filtered_waiting_queue = []
1✔
1017

1018
        for name, comp in waiting_queue:
1✔
1019
            if not _is_lazy_variadic(comp):
1✔
1020
                # Components with variadic inputs that are not greedy must be removed only if there's nothing else to
1021
                # run at this stage.
1022
                # We need to wait as long as possible to run them, so we can collect as most inputs as we can.
1023
                all_lazy_variadic = False
1✔
1024

1025
            if not _has_all_inputs_with_defaults(comp):
1✔
1026
                # Components that have defaults for all their inputs must be treated the same identical way as we treat
1027
                # lazy variadic components. If there are only components with defaults we can run them.
1028
                # If we don't do this the order of execution of the Pipeline's Components will be affected cause we
1029
                # enqueue the Components in `run_queue` at the start using the order they are added in the Pipeline.
1030
                # If a Component A with defaults is added before a Component B that has no defaults, but in the Pipeline
1031
                # logic A must be executed after B. However, B could run before A if we don't do this check.
1032
                all_with_default_inputs = False
1✔
1033

1034
            if not _is_lazy_variadic(comp) and not _has_all_inputs_with_defaults(comp):
1✔
1035
                # Keep track of the Components that are not lazy variadic and don't have all inputs with defaults.
1036
                # We'll handle these later if necessary.
1037
                filtered_waiting_queue.append((name, comp))
1✔
1038

1039
        # If all Components are lazy variadic or all Components have all inputs with defaults we can get one to run
1040
        if all_lazy_variadic or all_with_default_inputs:
1✔
1041
            return waiting_queue[0]
1✔
1042

1043
        for name, comp in filtered_waiting_queue:
1✔
1044
            # Find the first component that has all the inputs it needs to run
1045
            has_enough_inputs = True
1✔
1046
            for input_socket in comp.__haystack_input__._sockets_dict.values():  # type: ignore
1✔
1047
                if input_socket.name not in components_inputs.get(name, {}) and input_socket.is_mandatory:
1✔
1048
                    has_enough_inputs = False
1✔
1049
                    break
1✔
1050

1051
            if has_enough_inputs:
1✔
1052
                return name, comp
1✔
1053

1054
        # If we reach this point it means that we found no Component that has enough inputs to run.
1055
        # Ideally we should never reach this point, though we can't raise an exception either as
1056
        # existing use cases rely on this behavior.
1057
        # So we return the last Component, that could be the last from waiting_queue or filtered_waiting_queue.
1058
        return name, comp
1✔
1059

1060
    def _find_next_runnable_lazy_variadic_or_default_component(
1✔
1061
        self, waiting_queue: List[Tuple[str, Component]]
1062
    ) -> Tuple[str, Component]:
1063
        """
1064
        Finds the next Component that can be run and has a lazy variadic input or all inputs with default values.
1065

1066
        :param waiting_queue: Queue of Components waiting for input
1067

1068
        :returns: The name and the instance of the next Component that can be run
1069
        """
1070
        for name, comp in waiting_queue:
1✔
1071
            is_lazy_variadic = _is_lazy_variadic(comp)
1✔
1072
            has_only_defaults = _has_all_inputs_with_defaults(comp)
1✔
1073
            if is_lazy_variadic or has_only_defaults:
1✔
1074
                return name, comp
1✔
1075

1076
        # If we reach this point it means that we found no Component that has a lazy variadic input or all inputs with
1077
        # default values to run.
1078
        # Similar to `_find_next_runnable_component` we might not find the Component we want, so we optimistically
1079
        # return the last Component in the list.
1080
        # We're probably stuck in a loop in this case, but we can't raise an exception as existing use cases might
1081
        # rely on this behaviour.
1082
        # The loop detection will be handled later on.
1083
        return name, comp
1✔
1084

1085
    def _find_components_that_will_receive_no_input(
1✔
1086
        self, component_name: str, component_result: Dict[str, Any], components_inputs: Dict[str, Dict[str, Any]]
1087
    ) -> Set[Tuple[str, Component]]:
1088
        """
1089
        Find all the Components that are connected to component_name and didn't receive any input from it.
1090

1091
        Components that have a Variadic input and received already some input from other Components
1092
        but not from component_name won't be returned as they have enough inputs to run.
1093

1094
        This includes the descendants of the Components that didn't receive any input from component_name.
1095
        That is necessary to avoid getting stuck into infinite loops waiting for inputs that will never arrive.
1096

1097
        :param component_name: Name of the Component that created the output
1098
        :param component_result: Output of the Component
1099
        :param components_inputs: The current state of the inputs divided by Component name
1100
        :return: A set of Components that didn't receive any input from component_name
1101
        """
1102

1103
        # Simplifies the check if a Component is Variadic and received some input from other Components.
1104
        def is_variadic_with_existing_inputs(comp: Component) -> bool:
1✔
1105
            for receiver_socket in comp.__haystack_input__._sockets_dict.values():  # type: ignore
1✔
1106
                if component_name not in receiver_socket.senders:
1✔
1107
                    continue
×
1108
                if (
1✔
1109
                    receiver_socket.is_variadic
1110
                    and len(components_inputs.get(receiver, {}).get(receiver_socket.name, [])) > 0
1111
                ):
1112
                    # This Component already received some input to its Variadic socket from other Components.
1113
                    # It should be able to run even if it doesn't receive any input from component_name.
1114
                    return True
1✔
1115
            return False
1✔
1116

1117
        components = set()
1✔
1118
        instance: Component = self.graph.nodes[component_name]["instance"]
1✔
1119
        for socket_name, socket in instance.__haystack_output__._sockets_dict.items():  # type: ignore
1✔
1120
            if socket_name in component_result:
1✔
1121
                continue
1✔
1122
            for receiver in socket.receivers:
1✔
1123
                receiver_instance: Component = self.graph.nodes[receiver]["instance"]
1✔
1124

1125
                if is_variadic_with_existing_inputs(receiver_instance):
1✔
1126
                    continue
1✔
1127

1128
                components.add((receiver, receiver_instance))
1✔
1129
                # Get the descendants too. When we remove a Component that received no input
1130
                # it's extremely likely that its descendants will receive no input as well.
1131
                # This is fine even if the Pipeline will merge back into a single Component
1132
                # at a certain point. The merging Component will be put back into the run
1133
                # queue at a later stage.
1134
                components |= {(d, self.graph.nodes[d]["instance"]) for d in networkx.descendants(self.graph, receiver)}
1✔
1135

1136
        return components
1✔
1137

1138
    def _is_stuck_in_a_loop(self, waiting_queue: List[Tuple[str, Component]]) -> bool:
1✔
1139
        """
1140
        Checks if the Pipeline is stuck in a loop.
1141

1142
        :param waiting_queue: Queue of Components waiting for input
1143

1144
        :returns: True if the Pipeline is stuck in a loop, False otherwise
1145
        """
1146
        # Are we actually stuck or there's a lazy variadic or a component with has only default inputs
1147
        # waiting for input?
1148
        # This is our last resort, if there's no lazy variadic or component with only default inputs
1149
        # waiting for input we're stuck for real and we can't make any progress.
1150
        component_found = False
1✔
1151
        for _, comp in waiting_queue:
1✔
1152
            if _is_lazy_variadic(comp) or _has_all_inputs_with_defaults(comp):
1✔
1153
                component_found = True
1✔
1154
                break
1✔
1155

1156
        if not component_found:
1✔
1157
            # We're stuck in a loop for real, we can't make any progress.
1158
            # BAIL!
1159
            return True
1✔
1160

1161
        # If we have a single component with no variadic input or only default inputs waiting for input
1162
        # it means it has been waiting for input for at least 2 iterations.
1163
        # This will never run.
1164
        # BAIL!
1165
        return len(waiting_queue) == 1
1✔
1166

1167
    def _component_has_enough_inputs_to_run(self, name: str, inputs: Dict[str, Dict[str, Any]]) -> bool:
1✔
1168
        """
1169
        Returns True if the Component has all the inputs it needs to run.
1170

1171
        :param name: Name of the Component as defined in the Pipeline.
1172
        :param inputs: The current state of the inputs divided by Component name.
1173

1174
        :return: Whether the Component can run or not.
1175
        """
1176
        instance: Component = self.graph.nodes[name]["instance"]
1✔
1177
        if name not in inputs:
1✔
1178
            return False
1✔
1179
        expected_inputs = instance.__haystack_input__._sockets_dict.keys()  # type: ignore
1✔
1180
        current_inputs = inputs[name].keys()
1✔
1181
        return expected_inputs == current_inputs
1✔
1182

1183

1184
def _connections_status(
1✔
1185
    sender_node: str, receiver_node: str, sender_sockets: List[OutputSocket], receiver_sockets: List[InputSocket]
1186
):
1187
    """
1188
    Lists the status of the sockets, for error messages.
1189
    """
1190
    sender_sockets_entries = []
1✔
1191
    for sender_socket in sender_sockets:
1✔
1192
        sender_sockets_entries.append(f" - {sender_socket.name}: {_type_name(sender_socket.type)}")
1✔
1193
    sender_sockets_list = "\n".join(sender_sockets_entries)
1✔
1194

1195
    receiver_sockets_entries = []
1✔
1196
    for receiver_socket in receiver_sockets:
1✔
1197
        if receiver_socket.senders:
1✔
1198
            sender_status = f"sent by {','.join(receiver_socket.senders)}"
1✔
1199
        else:
1200
            sender_status = "available"
1✔
1201
        receiver_sockets_entries.append(
1✔
1202
            f" - {receiver_socket.name}: {_type_name(receiver_socket.type)} ({sender_status})"
1203
        )
1204
    receiver_sockets_list = "\n".join(receiver_sockets_entries)
1✔
1205

1206
    return f"'{sender_node}':\n{sender_sockets_list}\n'{receiver_node}':\n{receiver_sockets_list}"
1✔
1207

1208

1209
def _is_lazy_variadic(c: Component) -> bool:
1✔
1210
    """
1211
    Small utility function to check if a Component has a Variadic input that is not greedy
1212
    """
1213
    is_variadic = any(
1✔
1214
        socket.is_variadic
1215
        for socket in c.__haystack_input__._sockets_dict.values()  # type: ignore
1216
    )
1217
    if not is_variadic:
1✔
1218
        return False
1✔
1219
    return not getattr(c, "__haystack_is_greedy__", False)
1✔
1220

1221

1222
def _has_all_inputs_with_defaults(c: Component) -> bool:
1✔
1223
    """
1224
    Small utility function to check if a Component has all inputs with defaults.
1225
    """
1226
    return all(
1✔
1227
        not socket.is_mandatory
1228
        for socket in c.__haystack_input__._sockets_dict.values()  # type: ignore
1229
    )
1230

1231

1232
def _add_missing_input_defaults(name: str, comp: Component, components_inputs: Dict[str, Dict[str, Any]]):
1✔
1233
    """
1234
    Updates the inputs with the default values for the inputs that are missing
1235

1236
    :param name: Name of the Component
1237
    :param comp: Instance of the Component
1238
    :param components_inputs: The current state of the inputs divided by Component name
1239
    """
1240
    if name not in components_inputs:
1✔
1241
        components_inputs[name] = {}
1✔
1242

1243
    for input_socket in comp.__haystack_input__._sockets_dict.values():  # type: ignore
1✔
1244
        if input_socket.is_mandatory:
1✔
1245
            continue
1✔
1246

1247
        if input_socket.name not in components_inputs[name]:
1✔
1248
            components_inputs[name][input_socket.name] = input_socket.default_value
1✔
1249

1250

1251
def _enqueue_component(
1✔
1252
    component_pair: Tuple[str, Component],
1253
    run_queue: List[Tuple[str, Component]],
1254
    waiting_queue: List[Tuple[str, Component]],
1255
):
1256
    """
1257
    Append a Component in the queue of Components to run if not already in it.
1258

1259
    Remove it from the waiting list if it's there.
1260

1261
    :param component_pair: Tuple of Component name and instance
1262
    :param run_queue: Queue of Components to run
1263
    :param waiting_queue: Queue of Components waiting for input
1264
    """
1265
    if component_pair in waiting_queue:
1✔
1266
        waiting_queue.remove(component_pair)
1✔
1267

1268
    if component_pair not in run_queue:
1✔
1269
        run_queue.append(component_pair)
1✔
1270

1271

1272
def _dequeue_component(
1✔
1273
    component_pair: Tuple[str, Component],
1274
    run_queue: List[Tuple[str, Component]],
1275
    waiting_queue: List[Tuple[str, Component]],
1276
):
1277
    """
1278
    Removes a Component both from the queue of Components to run and the waiting list.
1279

1280
    :param component_pair: Tuple of Component name and instance
1281
    :param run_queue: Queue of Components to run
1282
    :param waiting_queue: Queue of Components waiting for input
1283
    """
1284
    if component_pair in waiting_queue:
1✔
1285
        waiting_queue.remove(component_pair)
1✔
1286

1287
    if component_pair in run_queue:
1✔
1288
        run_queue.remove(component_pair)
1✔
1289

1290

1291
def _enqueue_waiting_component(component_pair: Tuple[str, Component], waiting_queue: List[Tuple[str, Component]]):
1✔
1292
    """
1293
    Append a Component in the queue of Components that are waiting for inputs if not already in it.
1294

1295
    :param component_pair: Tuple of Component name and instance
1296
    :param waiting_queue: Queue of Components waiting for input
1297
    """
1298
    if component_pair not in waiting_queue:
1✔
1299
        waiting_queue.append(component_pair)
1✔
1300

1301

1302
def _dequeue_waiting_component(component_pair: Tuple[str, Component], waiting_queue: List[Tuple[str, Component]]):
1✔
1303
    """
1304
    Removes a Component from the queue of Components that are waiting for inputs.
1305

1306
    :param component_pair: Tuple of Component name and instance
1307
    :param waiting_queue: Queue of Components waiting for input
1308
    """
1309
    if component_pair in waiting_queue:
1✔
1310
        waiting_queue.remove(component_pair)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc