• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 16902023987

12 Aug 2025 07:26AM UTC coverage: 91.97% (+0.001%) from 91.969%
16902023987

Pull #9661

github

web-flow
Merge 8abed5eba into f8d3a8299
Pull Request #9661: Fix/informative error message

12840 of 13961 relevant lines covered (91.97%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.92
haystack/core/pipeline/base.py
1
# pylint: disable=too-many-lines
2
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
3
#
4
# SPDX-License-Identifier: Apache-2.0
5

6
import itertools
1✔
7
from collections import defaultdict
1✔
8
from datetime import datetime
1✔
9
from enum import IntEnum
1✔
10
from pathlib import Path
1✔
11
from typing import Any, ContextManager, Iterator, Mapping, Optional, TextIO, TypeVar, Union
1✔
12

13
import networkx
1✔
14

15
from haystack import logging, tracing
1✔
16
from haystack.core.component import Component, InputSocket, OutputSocket, component
1✔
17
from haystack.core.errors import (
1✔
18
    DeserializationError,
19
    PipelineComponentsBlockedError,
20
    PipelineConnectError,
21
    PipelineDrawingError,
22
    PipelineError,
23
    PipelineMaxComponentRuns,
24
    PipelineUnmarshalError,
25
    PipelineValidationError,
26
)
27
from haystack.core.pipeline.component_checks import (
1✔
28
    _NO_OUTPUT_PRODUCED,
29
    all_predecessors_executed,
30
    are_all_lazy_variadic_sockets_resolved,
31
    are_all_sockets_ready,
32
    can_component_run,
33
    is_any_greedy_socket_ready,
34
    is_socket_lazy_variadic,
35
)
36
from haystack.core.pipeline.utils import FIFOPriorityQueue, _deepcopy_with_exceptions, parse_connect_string
1✔
37
from haystack.core.serialization import DeserializationCallbacks, component_from_dict, component_to_dict
1✔
38
from haystack.core.type_utils import _type_name, _types_are_compatible
1✔
39
from haystack.marshal import Marshaller, YamlMarshaller
1✔
40
from haystack.utils import is_in_jupyter, type_serialization
1✔
41

42
from .descriptions import find_pipeline_inputs, find_pipeline_outputs
1✔
43
from .draw import _to_mermaid_image
1✔
44
from .template import PipelineTemplate, PredefinedPipeline
1✔
45

46
DEFAULT_MARSHALLER = YamlMarshaller()
1✔
47

48
# We use a generic type to annotate the return value of class methods,
49
# so that static analyzers won't be confused when derived classes
50
# use those methods.
51
T = TypeVar("T", bound="PipelineBase")
1✔
52

53
logger = logging.getLogger(__name__)
1✔
54

55

56
# Constants for tracing tags
57
_COMPONENT_INPUT = "haystack.component.input"
1✔
58
_COMPONENT_OUTPUT = "haystack.component.output"
1✔
59
_COMPONENT_VISITS = "haystack.component.visits"
1✔
60

61

62
class ComponentPriority(IntEnum):
1✔
63
    HIGHEST = 1
1✔
64
    READY = 2
1✔
65
    DEFER = 3
1✔
66
    DEFER_LAST = 4
1✔
67
    BLOCKED = 5
1✔
68

69

70
class PipelineBase:  # noqa: PLW1641
1✔
71
    """
72
    Components orchestration engine.
73

74
    Builds a graph of components and orchestrates their execution according to the execution graph.
75
    """
76

77
    def __init__(
1✔
78
        self,
79
        metadata: Optional[dict[str, Any]] = None,
80
        max_runs_per_component: int = 100,
81
        connection_type_validation: bool = True,
82
    ):
83
        """
84
        Creates the Pipeline.
85

86
        :param metadata:
87
            Arbitrary dictionary to store metadata about this `Pipeline`. Make sure all the values contained in
88
            this dictionary can be serialized and deserialized if you wish to save this `Pipeline` to file.
89
        :param max_runs_per_component:
90
            How many times the `Pipeline` can run the same Component.
91
            If this limit is reached a `PipelineMaxComponentRuns` exception is raised.
92
            If not set defaults to 100 runs per Component.
93
        :param connection_type_validation: Whether the pipeline will validate the types of the connections.
94
            Defaults to True.
95
        """
96
        self._telemetry_runs = 0
1✔
97
        self._last_telemetry_sent: Optional[datetime] = None
1✔
98
        self.metadata = metadata or {}
1✔
99
        self.graph = networkx.MultiDiGraph()
1✔
100
        self._max_runs_per_component = max_runs_per_component
1✔
101
        self._connection_type_validation = connection_type_validation
1✔
102

103
    def __eq__(self, other: object) -> bool:
1✔
104
        """
105
        Pipeline equality is defined by their type and the equality of their serialized form.
106

107
        Pipelines of the same type share every metadata, node and edge, but they're not required to use
108
        the same node instances: this allows pipeline saved and then loaded back to be equal to themselves.
109
        """
110
        if not isinstance(self, type(other)):
1✔
111
            return False
×
112
        assert isinstance(other, PipelineBase)
1✔
113
        return self.to_dict() == other.to_dict()
1✔
114

115
    def __repr__(self) -> str:
1✔
116
        """
117
        Returns a text representation of the Pipeline.
118
        """
119
        res = f"{object.__repr__(self)}\n"
1✔
120
        if self.metadata:
1✔
121
            res += "đź§± Metadata\n"
1✔
122
            for k, v in self.metadata.items():
1✔
123
                res += f"  - {k}: {v}\n"
1✔
124

125
        res += "đźš… Components\n"
1✔
126
        for name, instance in self.graph.nodes(data="instance"):
1✔
127
            res += f"  - {name}: {instance.__class__.__name__}\n"
1✔
128

129
        res += "🛤️ Connections\n"
1✔
130
        for sender, receiver, edge_data in self.graph.edges(data=True):
1✔
131
            sender_socket = edge_data["from_socket"].name
1✔
132
            receiver_socket = edge_data["to_socket"].name
1✔
133
            res += f"  - {sender}.{sender_socket} -> {receiver}.{receiver_socket} ({edge_data['conn_type']})\n"
1✔
134

135
        return res
1✔
136

137
    def to_dict(self) -> dict[str, Any]:
1✔
138
        """
139
        Serializes the pipeline to a dictionary.
140

141
        This is meant to be an intermediate representation but it can be also used to save a pipeline to file.
142

143
        :returns:
144
            Dictionary with serialized data.
145
        """
146
        components = {}
1✔
147
        for name, instance in self.graph.nodes(data="instance"):
1✔
148
            components[name] = component_to_dict(instance, name)
1✔
149

150
        connections = []
1✔
151
        for sender, receiver, edge_data in self.graph.edges.data():
1✔
152
            sender_socket = edge_data["from_socket"].name
1✔
153
            receiver_socket = edge_data["to_socket"].name
1✔
154
            connections.append({"sender": f"{sender}.{sender_socket}", "receiver": f"{receiver}.{receiver_socket}"})
1✔
155
        return {
1✔
156
            "metadata": self.metadata,
157
            "max_runs_per_component": self._max_runs_per_component,
158
            "components": components,
159
            "connections": connections,
160
            "connection_type_validation": self._connection_type_validation,
161
        }
162

163
    @classmethod
1✔
164
    def from_dict(
1✔
165
        cls: type[T], data: dict[str, Any], callbacks: Optional[DeserializationCallbacks] = None, **kwargs: Any
166
    ) -> T:
167
        """
168
        Deserializes the pipeline from a dictionary.
169

170
        :param data:
171
            Dictionary to deserialize from.
172
        :param callbacks:
173
            Callbacks to invoke during deserialization.
174
        :param kwargs:
175
            `components`: a dictionary of `{name: instance}` to reuse instances of components instead of creating new
176
            ones.
177
        :returns:
178
            Deserialized component.
179
        """
180
        data_copy = _deepcopy_with_exceptions(data)  # to prevent modification of original data
1✔
181
        metadata = data_copy.get("metadata", {})
1✔
182
        max_runs_per_component = data_copy.get("max_runs_per_component", 100)
1✔
183
        connection_type_validation = data_copy.get("connection_type_validation", True)
1✔
184
        pipe = cls(
1✔
185
            metadata=metadata,
186
            max_runs_per_component=max_runs_per_component,
187
            connection_type_validation=connection_type_validation,
188
        )
189
        components_to_reuse = kwargs.get("components", {})
1✔
190
        for name, component_data in data_copy.get("components", {}).items():
1✔
191
            if name in components_to_reuse:
1✔
192
                # Reuse an instance
193
                instance = components_to_reuse[name]
1✔
194
            else:
195
                if "type" not in component_data:
1✔
196
                    raise PipelineError(f"Missing 'type' in component '{name}'")
1✔
197

198
                if component_data["type"] not in component.registry:
1✔
199
                    try:
1✔
200
                        # Import the module first...
201
                        module, _ = component_data["type"].rsplit(".", 1)
1✔
202
                        logger.debug("Trying to import module {module_name}", module_name=module)
1✔
203
                        type_serialization.thread_safe_import(module)
1✔
204
                        # ...then try again
205
                        if component_data["type"] not in component.registry:
1✔
206
                            raise PipelineError(
1✔
207
                                f"Successfully imported module '{module}' but couldn't find "
208
                                f"'{component_data['type']}' in the component registry.\n"
209
                                f"The component might be registered under a different path. "
210
                                f"Here are the registered components:\n {list(component.registry.keys())}\n"
211
                            )
212
                    except (ImportError, PipelineError, ValueError) as e:
1✔
213
                        raise PipelineError(
1✔
214
                            f"Component '{component_data['type']}' (name: '{name}') not imported. Please "
215
                            f"check that the package is installed and the component path is correct."
216
                        ) from e
217

218
                # Create a new one
219
                component_class = component.registry[component_data["type"]]
1✔
220

221
                try:
1✔
222
                    instance = component_from_dict(component_class, component_data, name, callbacks)
1✔
223
                except Exception as e:
1✔
224
                    msg = (
1✔
225
                        f"Couldn't deserialize component '{name}' of class '{component_class.__name__}' "
226
                        f"with the following data: {str(component_data)}. Possible reasons include "
227
                        "malformed serialized data, mismatch between the serialized component and the "
228
                        "loaded one (due to a breaking change, see "
229
                        "https://github.com/deepset-ai/haystack/releases), etc."
230
                    )
231
                    raise DeserializationError(msg) from e
1✔
232
            pipe.add_component(name=name, instance=instance)
1✔
233

234
        for connection in data.get("connections", []):
1✔
235
            if "sender" not in connection:
1✔
236
                raise PipelineError(f"Missing sender in connection: {connection}")
1✔
237
            if "receiver" not in connection:
1✔
238
                raise PipelineError(f"Missing receiver in connection: {connection}")
1✔
239
            pipe.connect(sender=connection["sender"], receiver=connection["receiver"])
1✔
240

241
        return pipe
1✔
242

243
    def dumps(self, marshaller: Marshaller = DEFAULT_MARSHALLER) -> str:
1✔
244
        """
245
        Returns the string representation of this pipeline according to the format dictated by the `Marshaller` in use.
246

247
        :param marshaller:
248
            The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
249
        :returns:
250
            A string representing the pipeline.
251
        """
252
        return marshaller.marshal(self.to_dict())
1✔
253

254
    def dump(self, fp: TextIO, marshaller: Marshaller = DEFAULT_MARSHALLER) -> None:
1✔
255
        """
256
        Writes the string representation of this pipeline to the file-like object passed in the `fp` argument.
257

258
        :param fp:
259
            A file-like object ready to be written to.
260
        :param marshaller:
261
            The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
262
        """
263
        fp.write(marshaller.marshal(self.to_dict()))
1✔
264

265
    @classmethod
1✔
266
    def loads(
1✔
267
        cls: type[T],
268
        data: Union[str, bytes, bytearray],
269
        marshaller: Marshaller = DEFAULT_MARSHALLER,
270
        callbacks: Optional[DeserializationCallbacks] = None,
271
    ) -> T:
272
        """
273
        Creates a `Pipeline` object from the string representation passed in the `data` argument.
274

275
        :param data:
276
            The string representation of the pipeline, can be `str`, `bytes` or `bytearray`.
277
        :param marshaller:
278
            The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
279
        :param callbacks:
280
            Callbacks to invoke during deserialization.
281
        :raises DeserializationError:
282
            If an error occurs during deserialization.
283
        :returns:
284
            A `Pipeline` object.
285
        """
286
        try:
1✔
287
            deserialized_data = marshaller.unmarshal(data)
1✔
288
        except Exception as e:
1✔
289
            raise DeserializationError(
1✔
290
                "Error while unmarshalling serialized pipeline data. This is usually "
291
                "caused by malformed or invalid syntax in the serialized representation."
292
            ) from e
293

294
        return cls.from_dict(deserialized_data, callbacks)
1✔
295

296
    @classmethod
1✔
297
    def load(
1✔
298
        cls: type[T],
299
        fp: TextIO,
300
        marshaller: Marshaller = DEFAULT_MARSHALLER,
301
        callbacks: Optional[DeserializationCallbacks] = None,
302
    ) -> T:
303
        """
304
        Creates a `Pipeline` object a string representation.
305

306
        The string representation is read from the file-like object passed in the `fp` argument.
307

308

309
        :param fp:
310
            A file-like object ready to be read from.
311
        :param marshaller:
312
            The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
313
        :param callbacks:
314
            Callbacks to invoke during deserialization.
315
        :raises DeserializationError:
316
            If an error occurs during deserialization.
317
        :returns:
318
            A `Pipeline` object.
319
        """
320
        return cls.loads(fp.read(), marshaller, callbacks)
1✔
321

322
    def add_component(self, name: str, instance: Component) -> None:
1✔
323
        """
324
        Add the given component to the pipeline.
325

326
        Components are not connected to anything by default: use `Pipeline.connect()` to connect components together.
327
        Component names must be unique, but component instances can be reused if needed.
328

329
        :param name:
330
            The name of the component to add.
331
        :param instance:
332
            The component instance to add.
333

334
        :raises ValueError:
335
            If a component with the same name already exists.
336
        :raises PipelineValidationError:
337
            If the given instance is not a component.
338
        """
339
        # Component names are unique
340
        if name in self.graph.nodes:
1✔
341
            raise ValueError(f"A component named '{name}' already exists in this pipeline: choose another name.")
×
342

343
        # Components can't be named `_debug`
344
        if name == "_debug":
1✔
345
            raise ValueError("'_debug' is a reserved name for debug output. Choose another name.")
1✔
346

347
        # Component names can't have "."
348
        if "." in name:
1✔
349
            raise ValueError(f"{name} is an invalid component name, cannot contain '.' (dot) characters.")
1✔
350

351
        # Component instances must be components
352
        if not isinstance(instance, Component):
1✔
353
            raise PipelineValidationError(
×
354
                f"'{type(instance)}' doesn't seem to be a component. Is this class decorated with @component?"
355
            )
356

357
        if getattr(instance, "__haystack_added_to_pipeline__", None):
1✔
358
            msg = (
1✔
359
                "Component has already been added in another Pipeline. Components can't be shared between Pipelines. "
360
                "Create a new instance instead."
361
            )
362
            raise PipelineError(msg)
1✔
363

364
        setattr(instance, "__haystack_added_to_pipeline__", self)
1✔
365
        setattr(instance, "__component_name__", name)
1✔
366

367
        # Add component to the graph, disconnected
368
        logger.debug("Adding component '{component_name}' ({component})", component_name=name, component=instance)
1✔
369
        # We're completely sure the fields exist so we ignore the type error
370
        self.graph.add_node(
1✔
371
            name,
372
            instance=instance,
373
            input_sockets=instance.__haystack_input__._sockets_dict,  # type: ignore[attr-defined]
374
            output_sockets=instance.__haystack_output__._sockets_dict,  # type: ignore[attr-defined]
375
            visits=0,
376
        )
377

378
    def remove_component(self, name: str) -> Component:
1✔
379
        """
380
        Remove and returns component from the pipeline.
381

382
        Remove an existing component from the pipeline by providing its name.
383
        All edges that connect to the component will also be deleted.
384

385
        :param name:
386
            The name of the component to remove.
387
        :returns:
388
            The removed Component instance.
389

390
        :raises ValueError:
391
            If there is no component with that name already in the Pipeline.
392
        """
393

394
        # Check that a component with that name is in the Pipeline
395
        try:
1✔
396
            instance = self.get_component(name)
1✔
397
        except ValueError as exc:
1✔
398
            raise ValueError(
1✔
399
                f"There is no component named '{name}' in the pipeline. The valid component names are: ",
400
                ", ".join(n for n in self.graph.nodes),
401
            ) from exc
402

403
        # Delete component from the graph, deleting all its connections
404
        self.graph.remove_node(name)
1✔
405

406
        # Reset the Component sockets' senders and receivers
407
        input_sockets = instance.__haystack_input__._sockets_dict  # type: ignore[attr-defined]
1✔
408
        for socket in input_sockets.values():
1✔
409
            socket.senders = []
1✔
410

411
        output_sockets = instance.__haystack_output__._sockets_dict  # type: ignore[attr-defined]
1✔
412
        for socket in output_sockets.values():
1✔
413
            socket.receivers = []
1✔
414

415
        # Reset the Component's pipeline reference
416
        setattr(instance, "__haystack_added_to_pipeline__", None)
1✔
417

418
        return instance
1✔
419

420
    def connect(self, sender: str, receiver: str) -> "PipelineBase":  # noqa: PLR0915 PLR0912 C901 pylint: disable=too-many-branches
1✔
421
        """
422
        Connects two components together.
423

424
        All components to connect must exist in the pipeline.
425
        If connecting to a component that has several output connections, specify the inputs and output names as
426
        'component_name.connections_name'.
427

428
        :param sender:
429
            The component that delivers the value. This can be either just a component name or can be
430
            in the format `component_name.connection_name` if the component has multiple outputs.
431
        :param receiver:
432
            The component that receives the value. This can be either just a component name or can be
433
            in the format `component_name.connection_name` if the component has multiple inputs.
434

435
        :returns:
436
            The Pipeline instance.
437

438
        :raises PipelineConnectError:
439
            If the two components cannot be connected (for example if one of the components is
440
            not present in the pipeline, or the connections don't match by type, and so on).
441
        """
442
        # Edges may be named explicitly by passing 'node_name.edge_name' to connect().
443
        sender_component_name, sender_socket_name = parse_connect_string(sender)
1✔
444
        receiver_component_name, receiver_socket_name = parse_connect_string(receiver)
1✔
445

446
        if sender_component_name == receiver_component_name:
1✔
447
            raise PipelineConnectError("Connecting a Component to itself is not supported.")
1✔
448

449
        # Get the nodes data.
450
        try:
1✔
451
            sender_sockets = self.graph.nodes[sender_component_name]["output_sockets"]
1✔
452
        except KeyError as exc:
1✔
453
            raise ValueError(f"Component named {sender_component_name} not found in the pipeline.") from exc
1✔
454
        try:
1✔
455
            receiver_sockets = self.graph.nodes[receiver_component_name]["input_sockets"]
1✔
456
        except KeyError as exc:
1✔
457
            raise ValueError(f"Component named {receiver_component_name} not found in the pipeline.") from exc
1✔
458

459
        if not sender_sockets:
1✔
460
            raise PipelineConnectError(
1✔
461
                f"'{sender_component_name}' does not have any output connections. "
462
                f"Please check that the output types of '{sender_component_name}.run' are set, "
463
                f"for example by using the '@component.output_types' decorator."
464
            )
465

466
        # If the name of either socket is given, get the socket
467
        sender_socket: Optional[OutputSocket] = None
1✔
468
        if sender_socket_name:
1✔
469
            sender_socket = sender_sockets.get(sender_socket_name)
1✔
470
            if not sender_socket:
1✔
471
                raise PipelineConnectError(
1✔
472
                    f"'{sender}' does not exist. "
473
                    f"Output connections of {sender_component_name} are: "
474
                    + ", ".join([f"{name} (type {_type_name(socket.type)})" for name, socket in sender_sockets.items()])
475
                )
476

477
        receiver_socket: Optional[InputSocket] = None
1✔
478
        if receiver_socket_name:
1✔
479
            receiver_socket = receiver_sockets.get(receiver_socket_name)
1✔
480
            if not receiver_socket:
1✔
481
                raise PipelineConnectError(
1✔
482
                    f"'{receiver} does not exist. "
483
                    f"Input connections of {receiver_component_name} are: "
484
                    + ", ".join(
485
                        [f"{name} (type {_type_name(socket.type)})" for name, socket in receiver_sockets.items()]
486
                    )
487
                )
488

489
        # Look for a matching connection among the possible ones.
490
        # Note that if there is more than one possible connection but two sockets match by name, they're paired.
491
        sender_socket_candidates: list[OutputSocket] = (
1✔
492
            [sender_socket] if sender_socket else list(sender_sockets.values())
493
        )
494
        receiver_socket_candidates: list[InputSocket] = (
1✔
495
            [receiver_socket] if receiver_socket else list(receiver_sockets.values())
496
        )
497

498
        # Find all possible connections between these two components
499
        possible_connections = []
1✔
500
        for sender_sock, receiver_sock in itertools.product(sender_socket_candidates, receiver_socket_candidates):
1✔
501
            if _types_are_compatible(sender_sock.type, receiver_sock.type, self._connection_type_validation):
1✔
502
                possible_connections.append((sender_sock, receiver_sock))
1✔
503

504
        # We need this status for error messages, since we might need it in multiple places we calculate it here
505
        status = _connections_status(
1✔
506
            sender_node=sender_component_name,
507
            sender_sockets=sender_socket_candidates,
508
            receiver_node=receiver_component_name,
509
            receiver_sockets=receiver_socket_candidates,
510
        )
511

512
        if not possible_connections:
1✔
513
            # There's no possible connection between these two components
514
            if len(sender_socket_candidates) == len(receiver_socket_candidates) == 1:
1✔
515
                msg = (
1✔
516
                    f"Cannot connect '{sender_component_name}.{sender_socket_candidates[0].name}' with "
517
                    f"'{receiver_component_name}.{receiver_socket_candidates[0].name}': "
518
                    f"their declared input and output types do not match.\n{status}"
519
                )
520
            else:
521
                msg = (
×
522
                    f"Cannot connect '{sender_component_name}' with '{receiver_component_name}': "
523
                    f"no matching connections available.\n{status}"
524
                )
525
            raise PipelineConnectError(msg)
1✔
526

527
        if len(possible_connections) == 1:
1✔
528
            # There's only one possible connection, use it
529
            sender_socket = possible_connections[0][0]
1✔
530
            receiver_socket = possible_connections[0][1]
1✔
531

532
        if len(possible_connections) > 1:
1✔
533
            # There are multiple possible connection, let's try to match them by name
534
            name_matches = [
1✔
535
                (out_sock, in_sock) for out_sock, in_sock in possible_connections if in_sock.name == out_sock.name
536
            ]
537
            if len(name_matches) != 1:
1✔
538
                # There's are either no matches or more than one, we can't pick one reliably
539
                msg = (
1✔
540
                    f"Cannot connect '{sender_component_name}' with "
541
                    f"'{receiver_component_name}': more than one connection is possible "
542
                    "between these components. Please specify the connection name, like: "
543
                    f"pipeline.connect('{sender_component_name}.{possible_connections[0][0].name}', "
544
                    f"'{receiver_component_name}.{possible_connections[0][1].name}').\n{status}"
545
                )
546
                raise PipelineConnectError(msg)
1✔
547

548
            # Get the only possible match
549
            sender_socket = name_matches[0][0]
1✔
550
            receiver_socket = name_matches[0][1]
1✔
551

552
        # Connection must be valid on both sender/receiver sides
553
        if not sender_socket or not receiver_socket or not sender_component_name or not receiver_component_name:
1✔
554
            if sender_component_name and sender_socket:
×
555
                sender_repr = f"{sender_component_name}.{sender_socket.name} ({_type_name(sender_socket.type)})"
×
556
            else:
557
                sender_repr = "input needed"
×
558

559
            if receiver_component_name and receiver_socket:
×
560
                receiver_repr = f"({_type_name(receiver_socket.type)}) {receiver_component_name}.{receiver_socket.name}"
×
561
            else:
562
                receiver_repr = "output"
×
563
            msg = f"Connection must have both sender and receiver: {sender_repr} -> {receiver_repr}"
×
564
            raise PipelineConnectError(msg)
×
565

566
        logger.debug(
1✔
567
            "Connecting '{sender_component}.{sender_socket_name}' to '{receiver_component}.{receiver_socket_name}'",
568
            sender_component=sender_component_name,
569
            sender_socket_name=sender_socket.name,
570
            receiver_component=receiver_component_name,
571
            receiver_socket_name=receiver_socket.name,
572
        )
573

574
        if receiver_component_name in sender_socket.receivers and sender_component_name in receiver_socket.senders:
1✔
575
            # This is already connected, nothing to do
576
            return self
1✔
577

578
        if receiver_socket.senders and not receiver_socket.is_variadic:
1✔
579
            # Only variadic input sockets can receive from multiple senders
580
            msg = (
1✔
581
                f"Cannot connect '{sender_component_name}.{sender_socket.name}' with "
582
                f"'{receiver_component_name}.{receiver_socket.name}': "
583
                f"{receiver_component_name}.{receiver_socket.name} is already connected to {receiver_socket.senders}.\n"
584
            )
585
            raise PipelineConnectError(msg)
1✔
586

587
        # Update the sockets with the new connection
588
        sender_socket.receivers.append(receiver_component_name)
1✔
589
        receiver_socket.senders.append(sender_component_name)
1✔
590

591
        # Create the new connection
592
        self.graph.add_edge(
1✔
593
            sender_component_name,
594
            receiver_component_name,
595
            key=f"{sender_socket.name}/{receiver_socket.name}",
596
            conn_type=_type_name(sender_socket.type),
597
            from_socket=sender_socket,
598
            to_socket=receiver_socket,
599
            mandatory=receiver_socket.is_mandatory,
600
        )
601
        return self
1✔
602

603
    def get_component(self, name: str) -> Component:
1✔
604
        """
605
        Get the component with the specified name from the pipeline.
606

607
        :param name:
608
            The name of the component.
609
        :returns:
610
            The instance of that component.
611

612
        :raises ValueError:
613
            If a component with that name is not present in the pipeline.
614
        """
615
        try:
1✔
616
            return self.graph.nodes[name]["instance"]
1✔
617
        except KeyError as exc:
1✔
618
            raise ValueError(f"Component named {name} not found in the pipeline.") from exc
1✔
619

620
    def get_component_name(self, instance: Component) -> str:
1✔
621
        """
622
        Returns the name of the Component instance if it has been added to this Pipeline or an empty string otherwise.
623

624
        :param instance:
625
            The Component instance to look for.
626
        :returns:
627
            The name of the Component instance.
628
        """
629
        for name, inst in self.graph.nodes(data="instance"):
1✔
630
            if inst == instance:
1✔
631
                return name
1✔
632
        return ""
1✔
633

634
    def inputs(self, include_components_with_connected_inputs: bool = False) -> dict[str, dict[str, Any]]:
1✔
635
        """
636
        Returns a dictionary containing the inputs of a pipeline.
637

638
        Each key in the dictionary corresponds to a component name, and its value is another dictionary that describes
639
        the input sockets of that component, including their types and whether they are optional.
640

641
        :param include_components_with_connected_inputs:
642
            If `False`, only components that have disconnected input edges are
643
            included in the output.
644
        :returns:
645
            A dictionary where each key is a pipeline component name and each value is a dictionary of
646
            inputs sockets of that component.
647
        """
648
        inputs: dict[str, dict[str, Any]] = {}
1✔
649
        for component_name, data in find_pipeline_inputs(self.graph, include_components_with_connected_inputs).items():
1✔
650
            sockets_description = {}
1✔
651
            for socket in data:
1✔
652
                sockets_description[socket.name] = {"type": socket.type, "is_mandatory": socket.is_mandatory}
1✔
653
                if not socket.is_mandatory:
1✔
654
                    sockets_description[socket.name]["default_value"] = socket.default_value
1✔
655

656
            if sockets_description:
1✔
657
                inputs[component_name] = sockets_description
1✔
658
        return inputs
1✔
659

660
    def outputs(self, include_components_with_connected_outputs: bool = False) -> dict[str, dict[str, Any]]:
1✔
661
        """
662
        Returns a dictionary containing the outputs of a pipeline.
663

664
        Each key in the dictionary corresponds to a component name, and its value is another dictionary that describes
665
        the output sockets of that component.
666

667
        :param include_components_with_connected_outputs:
668
            If `False`, only components that have disconnected output edges are
669
            included in the output.
670
        :returns:
671
            A dictionary where each key is a pipeline component name and each value is a dictionary of
672
            output sockets of that component.
673
        """
674
        outputs = {
1✔
675
            comp: {socket.name: {"type": socket.type} for socket in data}
676
            for comp, data in find_pipeline_outputs(self.graph, include_components_with_connected_outputs).items()
677
            if data
678
        }
679
        return outputs
1✔
680

681
    def show(
1✔
682
        self,
683
        *,
684
        server_url: str = "https://mermaid.ink",
685
        params: Optional[dict] = None,
686
        timeout: int = 30,
687
        super_component_expansion: bool = False,
688
    ) -> None:
689
        """
690
        Display an image representing this `Pipeline` in a Jupyter notebook.
691

692
        This function generates a diagram of the `Pipeline` using a Mermaid server and displays it directly in
693
        the notebook.
694

695
        :param server_url:
696
            The base URL of the Mermaid server used for rendering (default: 'https://mermaid.ink').
697
            See https://github.com/jihchi/mermaid.ink and https://github.com/mermaid-js/mermaid-live-editor for more
698
            info on how to set up your own Mermaid server.
699

700
        :param params:
701
            Dictionary of customization parameters to modify the output. Refer to Mermaid documentation for more details
702
            Supported keys:
703
                - format: Output format ('img', 'svg', or 'pdf'). Default: 'img'.
704
                - type: Image type for /img endpoint ('jpeg', 'png', 'webp'). Default: 'png'.
705
                - theme: Mermaid theme ('default', 'neutral', 'dark', 'forest'). Default: 'neutral'.
706
                - bgColor: Background color in hexadecimal (e.g., 'FFFFFF') or named format (e.g., '!white').
707
                - width: Width of the output image (integer).
708
                - height: Height of the output image (integer).
709
                - scale: Scaling factor (1–3). Only applicable if 'width' or 'height' is specified.
710
                - fit: Whether to fit the diagram size to the page (PDF only, boolean).
711
                - paper: Paper size for PDFs (e.g., 'a4', 'a3'). Ignored if 'fit' is true.
712
                - landscape: Landscape orientation for PDFs (boolean). Ignored if 'fit' is true.
713

714
        :param timeout:
715
            Timeout in seconds for the request to the Mermaid server.
716

717
        :param super_component_expansion:
718
            If set to True and the pipeline contains SuperComponents the diagram will show the internal structure of
719
            super-components as if they were components part of the pipeline instead of a "black-box".
720
            Otherwise, only the super-component itself will be displayed.
721

722
        :raises PipelineDrawingError:
723
            If the function is called outside of a Jupyter notebook or if there is an issue with rendering.
724
        """
725

726
        if is_in_jupyter():
1✔
727
            from IPython.display import Image, display
1✔
728

729
            if super_component_expansion:
1✔
730
                graph, super_component_mapping = self._merge_super_component_pipelines()
×
731
            else:
732
                graph = self.graph
1✔
733
                super_component_mapping = None
1✔
734

735
            image_data = _to_mermaid_image(
1✔
736
                graph,
737
                server_url=server_url,
738
                params=params,
739
                timeout=timeout,
740
                super_component_mapping=super_component_mapping,
741
            )
742
            display(Image(image_data))
1✔
743
        else:
744
            msg = "This method is only supported in Jupyter notebooks. Use Pipeline.draw() to save an image locally."
1✔
745
            raise PipelineDrawingError(msg)
1✔
746

747
    def draw(
1✔
748
        self,
749
        *,
750
        path: Path,
751
        server_url: str = "https://mermaid.ink",
752
        params: Optional[dict] = None,
753
        timeout: int = 30,
754
        super_component_expansion: bool = False,
755
    ) -> None:
756
        """
757
        Save an image representing this `Pipeline` to the specified file path.
758

759
        This function generates a diagram of the `Pipeline` using the Mermaid server and saves it to the provided path.
760

761
        :param path:
762
            The file path where the generated image will be saved.
763

764
        :param server_url:
765
            The base URL of the Mermaid server used for rendering (default: 'https://mermaid.ink').
766
            See https://github.com/jihchi/mermaid.ink and https://github.com/mermaid-js/mermaid-live-editor for more
767
            info on how to set up your own Mermaid server.
768

769
        :param params:
770
            Dictionary of customization parameters to modify the output. Refer to Mermaid documentation for more details
771
            Supported keys:
772
                - format: Output format ('img', 'svg', or 'pdf'). Default: 'img'.
773
                - type: Image type for /img endpoint ('jpeg', 'png', 'webp'). Default: 'png'.
774
                - theme: Mermaid theme ('default', 'neutral', 'dark', 'forest'). Default: 'neutral'.
775
                - bgColor: Background color in hexadecimal (e.g., 'FFFFFF') or named format (e.g., '!white').
776
                - width: Width of the output image (integer).
777
                - height: Height of the output image (integer).
778
                - scale: Scaling factor (1–3). Only applicable if 'width' or 'height' is specified.
779
                - fit: Whether to fit the diagram size to the page (PDF only, boolean).
780
                - paper: Paper size for PDFs (e.g., 'a4', 'a3'). Ignored if 'fit' is true.
781
                - landscape: Landscape orientation for PDFs (boolean). Ignored if 'fit' is true.
782

783
        :param timeout:
784
            Timeout in seconds for the request to the Mermaid server.
785

786
        :param super_component_expansion:
787
            If set to True and the pipeline contains SuperComponents the diagram will show the internal structure of
788
            super-components as if they were components part of the pipeline instead of a "black-box".
789
            Otherwise, only the super-component itself will be displayed.
790

791
        :raises PipelineDrawingError:
792
            If there is an issue with rendering or saving the image.
793
        """
794

795
        # Before drawing we edit a bit the graph, to avoid modifying the original that is
796
        # used for running the pipeline we copy it.
797
        if super_component_expansion:
1✔
798
            graph, super_component_mapping = self._merge_super_component_pipelines()
×
799
        else:
800
            graph = self.graph
1✔
801
            super_component_mapping = None
1✔
802

803
        image_data = _to_mermaid_image(
1✔
804
            graph,
805
            server_url=server_url,
806
            params=params,
807
            timeout=timeout,
808
            super_component_mapping=super_component_mapping,
809
        )
810
        Path(path).write_bytes(image_data)
1✔
811

812
    def walk(self) -> Iterator[tuple[str, Component]]:
1✔
813
        """
814
        Visits each component in the pipeline exactly once and yields its name and instance.
815

816
        No guarantees are provided on the visiting order.
817

818
        :returns:
819
            An iterator of tuples of component name and component instance.
820
        """
821
        for component_name, instance in self.graph.nodes(data="instance"):
1✔
822
            yield component_name, instance
1✔
823

824
    def warm_up(self) -> None:
1✔
825
        """
826
        Make sure all nodes are warm.
827

828
        It's the node's responsibility to make sure this method can be called at every `Pipeline.run()`
829
        without re-initializing everything.
830
        """
831
        for node in self.graph.nodes:
1✔
832
            if hasattr(self.graph.nodes[node]["instance"], "warm_up"):
1✔
833
                logger.info("Warming up component {node}...", node=node)
1✔
834
                self.graph.nodes[node]["instance"].warm_up()
1✔
835

836
    @staticmethod
1✔
837
    def _create_component_span(
1✔
838
        component_name: str, instance: Component, inputs: dict[str, Any], parent_span: Optional[tracing.Span] = None
839
    ) -> ContextManager[tracing.Span]:
840
        return tracing.tracer.trace(
1✔
841
            "haystack.component.run",
842
            tags={
843
                "haystack.component.name": component_name,
844
                "haystack.component.type": instance.__class__.__name__,
845
                "haystack.component.input_types": {k: type(v).__name__ for k, v in inputs.items()},
846
                "haystack.component.input_spec": {
847
                    key: {
848
                        "type": (value.type.__name__ if isinstance(value.type, type) else str(value.type)),
849
                        "senders": value.senders,
850
                    }
851
                    for key, value in instance.__haystack_input__._sockets_dict.items()  # type: ignore
852
                },
853
                "haystack.component.output_spec": {
854
                    key: {
855
                        "type": (value.type.__name__ if isinstance(value.type, type) else str(value.type)),
856
                        "receivers": value.receivers,
857
                    }
858
                    for key, value in instance.__haystack_output__._sockets_dict.items()  # type: ignore
859
                },
860
            },
861
            parent_span=parent_span,
862
        )
863

864
    def validate_input(self, data: dict[str, Any]) -> None:
1✔
865
        """
866
        Validates pipeline input data.
867

868
        Validates that data:
869
        * Each Component name actually exists in the Pipeline
870
        * Each Component is not missing any input
871
        * Each Component has only one input per input socket, if not variadic
872
        * Each Component doesn't receive inputs that are already sent by another Component
873

874
        :param data:
875
            A dictionary of inputs for the pipeline's components. Each key is a component name.
876

877
        :raises ValueError:
878
            If inputs are invalid according to the above.
879
        """
880
        for component_name, component_inputs in data.items():
1✔
881
            if component_name not in self.graph.nodes:
1✔
882
                raise ValueError(f"Component named {component_name} not found in the pipeline.")
1✔
883
            instance = self.graph.nodes[component_name]["instance"]
1✔
884
            for socket_name, socket in instance.__haystack_input__._sockets_dict.items():
1✔
885
                if socket.senders == [] and socket.is_mandatory and socket_name not in component_inputs:
1✔
886
                    raise ValueError(f"Missing input for component {component_name}: {socket_name}")
1✔
887
            for input_name in component_inputs.keys():
1✔
888
                if input_name not in instance.__haystack_input__._sockets_dict:
1✔
889
                    raise ValueError(f"Input {input_name} not found in component {component_name}.")
1✔
890

891
        for component_name in self.graph.nodes:
1✔
892
            instance = self.graph.nodes[component_name]["instance"]
1✔
893
            for socket_name, socket in instance.__haystack_input__._sockets_dict.items():
1✔
894
                component_inputs = data.get(component_name, {})
1✔
895
                if socket.senders == [] and socket.is_mandatory and socket_name not in component_inputs:
1✔
896
                    raise ValueError(f"Missing input for component {component_name}: {socket_name}")
1✔
897
                if socket.senders and socket_name in component_inputs and not socket.is_variadic:
1✔
898
                    raise ValueError(
1✔
899
                        f"Input {socket_name} for component {component_name} is already sent by {socket.senders}."
900
                    )
901

902
    def _prepare_component_input_data(self, data: dict[str, Any]) -> dict[str, dict[str, Any]]:
1✔
903
        """
904
        Prepares input data for pipeline components.
905

906
        Organizes input data for pipeline components and identifies any inputs that are not matched to any
907
        component's input slots. Deep-copies data items to avoid sharing mutables across multiple components.
908

909
        This method processes a flat dictionary of input data, where each key-value pair represents an input name
910
        and its corresponding value. It distributes these inputs to the appropriate pipeline components based on
911
        their input requirements. Inputs that don't match any component's input slots are classified as unresolved.
912

913
        :param data:
914
            A dictionary potentially having input names as keys and input values as values.
915

916
        :returns:
917
            A dictionary mapping component names to their respective matched inputs.
918
        """
919
        # check whether the data is a nested dictionary of component inputs where each key is a component name
920
        # and each value is a dictionary of input parameters for that component
921
        is_nested_component_input = all(isinstance(value, dict) for value in data.values())
1✔
922
        if not is_nested_component_input:
1✔
923
            # flat input, a dict where keys are input names and values are the corresponding values
924
            # we need to convert it to a nested dictionary of component inputs and then run the pipeline
925
            # just like in the previous case
926
            pipeline_input_data: dict[str, dict[str, Any]] = defaultdict(dict)
1✔
927
            unresolved_kwargs = {}
1✔
928

929
            # Retrieve the input slots for each component in the pipeline
930
            available_inputs: dict[str, dict[str, Any]] = self.inputs()
1✔
931

932
            # Go through all provided to distribute them to the appropriate component inputs
933
            for input_name, input_value in data.items():
1✔
934
                resolved_at_least_once = False
1✔
935

936
                # Check each component to see if it has a slot for the current kwarg
937
                for component_name, component_inputs in available_inputs.items():
1✔
938
                    if input_name in component_inputs:
1✔
939
                        # If a match is found, add the kwarg to the component's input data
940
                        pipeline_input_data[component_name][input_name] = input_value
1✔
941
                        resolved_at_least_once = True
1✔
942

943
                if not resolved_at_least_once:
1✔
944
                    unresolved_kwargs[input_name] = input_value
1✔
945

946
            if unresolved_kwargs:
1✔
947
                logger.warning(
1✔
948
                    "Inputs {input_keys} were not matched to any component inputs, please check your run parameters.",
949
                    input_keys=list(unresolved_kwargs.keys()),
950
                )
951

952
            data = dict(pipeline_input_data)
1✔
953

954
        # deepcopying the inputs prevents the Pipeline run logic from being altered unexpectedly
955
        # when the same input reference is passed to multiple components.
956
        for component_name, component_inputs in data.items():
1✔
957
            data[component_name] = {k: _deepcopy_with_exceptions(v) for k, v in component_inputs.items()}
1✔
958

959
        return data
1✔
960

961
    @classmethod
1✔
962
    def from_template(
1✔
963
        cls, predefined_pipeline: PredefinedPipeline, template_params: Optional[dict[str, Any]] = None
964
    ) -> "PipelineBase":
965
        """
966
        Create a Pipeline from a predefined template. See `PredefinedPipeline` for available options.
967

968
        :param predefined_pipeline:
969
            The predefined pipeline to use.
970
        :param template_params:
971
            An optional dictionary of parameters to use when rendering the pipeline template.
972
        :returns:
973
            An instance of `Pipeline`.
974
        """
975
        tpl = PipelineTemplate.from_predefined(predefined_pipeline)
1✔
976
        # If tpl.render() fails, we let bubble up the original error
977
        rendered = tpl.render(template_params)
1✔
978

979
        # If there was a problem with the rendered version of the
980
        # template, we add it to the error stack for debugging
981
        try:
1✔
982
            return cls.loads(rendered)
1✔
983
        except Exception as e:
×
984
            msg = f"Error unmarshalling pipeline: {e}\n"
×
985
            msg += f"Source:\n{rendered}"
×
986
            raise PipelineUnmarshalError(msg)
×
987

988
    def _find_receivers_from(self, component_name: str) -> list[tuple[str, OutputSocket, InputSocket]]:
1✔
989
        """
990
        Utility function to find all Components that receive input from `component_name`.
991

992
        :param component_name:
993
            Name of the sender Component
994

995
        :returns:
996
            List of tuples containing name of the receiver Component and sender OutputSocket
997
            and receiver InputSocket instances
998
        """
999
        res = []
1✔
1000
        for _, receiver_name, connection in self.graph.edges(nbunch=component_name, data=True):
1✔
1001
            sender_socket: OutputSocket = connection["from_socket"]
1✔
1002
            receiver_socket: InputSocket = connection["to_socket"]
1✔
1003
            res.append((receiver_name, sender_socket, receiver_socket))
1✔
1004
        return res
1✔
1005

1006
    @staticmethod
1✔
1007
    def _convert_to_internal_format(pipeline_inputs: dict[str, Any]) -> dict[str, dict[str, list]]:
1✔
1008
        """
1009
        Converts the inputs to the pipeline to the format that is needed for the internal `Pipeline.run` logic.
1010

1011
        Example Input:
1012
        {'prompt_builder': {'question': 'Who lives in Paris?'}, 'retriever': {'query': 'Who lives in Paris?'}}
1013
        Example Output:
1014
        {'prompt_builder': {'question': [{'sender': None, 'value': 'Who lives in Paris?'}]},
1015
         'retriever': {'query': [{'sender': None, 'value': 'Who lives in Paris?'}]}}
1016

1017
        :param pipeline_inputs: Inputs to the pipeline.
1018
        :returns: Converted inputs that can be used by the internal `Pipeline.run` logic.
1019
        """
1020
        inputs: dict[str, dict[str, list[dict[str, Any]]]] = {}
1✔
1021
        for component_name, socket_dict in pipeline_inputs.items():
1✔
1022
            inputs[component_name] = {}
1✔
1023
            for socket_name, value in socket_dict.items():
1✔
1024
                inputs[component_name][socket_name] = [{"sender": None, "value": value}]
1✔
1025

1026
        return inputs
1✔
1027

1028
    @staticmethod
1✔
1029
    def _consume_component_inputs(
1✔
1030
        component_name: str, component: dict, inputs: dict, is_resume: bool = False
1031
    ) -> dict[str, Any]:
1032
        """
1033
        Extracts the inputs needed to run for the component and removes them from the global inputs state.
1034

1035
        :param component_name: The name of a component.
1036
        :param component: Component with component metadata.
1037
        :param inputs: Global inputs state.
1038
        :returns: The inputs for the component.
1039
        """
1040
        component_inputs = inputs.get(component_name, {})
1✔
1041
        consumed_inputs = {}
1✔
1042
        greedy_inputs_to_remove = set()
1✔
1043
        for socket_name, socket in component["input_sockets"].items():
1✔
1044
            socket_inputs = component_inputs.get(socket_name, [])
1✔
1045
            socket_inputs = [sock["value"] for sock in socket_inputs if sock["value"] is not _NO_OUTPUT_PRODUCED]
1✔
1046

1047
            # if we are resuming a component, the inputs are already consumed, so we just return the first input
1048
            if is_resume:
1✔
1049
                consumed_inputs[socket_name] = socket_inputs[0]
1✔
1050
                continue
1✔
1051
            if socket_inputs:
1✔
1052
                if not socket.is_variadic:
1✔
1053
                    # We only care about the first input provided to the socket.
1054
                    consumed_inputs[socket_name] = socket_inputs[0]
1✔
1055
                elif socket.is_greedy:
1✔
1056
                    # We need to keep track of greedy inputs because we always remove them, even if they come from
1057
                    # outside the pipeline. Otherwise, a greedy input from the user would trigger a pipeline to run
1058
                    # indefinitely.
1059
                    greedy_inputs_to_remove.add(socket_name)
1✔
1060
                    consumed_inputs[socket_name] = [socket_inputs[0]]
1✔
1061
                elif is_socket_lazy_variadic(socket):
1✔
1062
                    # We use all inputs provided to the socket on a lazy variadic socket.
1063
                    consumed_inputs[socket_name] = socket_inputs
1✔
1064

1065
        # We prune all inputs except for those that were provided from outside the pipeline (e.g. user inputs).
1066
        pruned_inputs = {
1✔
1067
            socket_name: [
1068
                sock for sock in socket if sock["sender"] is None and not socket_name in greedy_inputs_to_remove
1069
            ]
1070
            for socket_name, socket in component_inputs.items()
1071
        }
1072
        pruned_inputs = {socket_name: socket for socket_name, socket in pruned_inputs.items() if len(socket) > 0}
1✔
1073

1074
        inputs[component_name] = pruned_inputs
1✔
1075

1076
        return consumed_inputs
1✔
1077

1078
    def _fill_queue(
1✔
1079
        self, component_names: list[str], inputs: dict[str, Any], component_visits: dict[str, int]
1080
    ) -> FIFOPriorityQueue:
1081
        """
1082
        Calculates the execution priority for each component and inserts it into the priority queue.
1083

1084
        :param component_names: Names of the components to put into the queue.
1085
        :param inputs: Inputs to the components.
1086
        :param component_visits: Current state of component visits.
1087
        :returns: A prioritized queue of component names.
1088
        """
1089
        priority_queue = FIFOPriorityQueue()
1✔
1090
        for component_name in component_names:
1✔
1091
            component = self._get_component_with_graph_metadata_and_visits(
1✔
1092
                component_name, component_visits[component_name]
1093
            )
1094
            priority = self._calculate_priority(component, inputs.get(component_name, {}))
1✔
1095
            priority_queue.push(component_name, priority)
1✔
1096

1097
        return priority_queue
1✔
1098

1099
    @staticmethod
1✔
1100
    def _calculate_priority(component: dict, inputs: dict) -> ComponentPriority:
1✔
1101
        """
1102
        Calculates the execution priority for a component depending on the component's inputs.
1103

1104
        :param component: Component metadata and component instance.
1105
        :param inputs: Inputs to the component.
1106
        :returns: Priority value for the component.
1107
        """
1108
        if not can_component_run(component, inputs):
1✔
1109
            return ComponentPriority.BLOCKED
1✔
1110
        elif is_any_greedy_socket_ready(component, inputs) and are_all_sockets_ready(component, inputs):
1✔
1111
            return ComponentPriority.HIGHEST
1✔
1112
        elif all_predecessors_executed(component, inputs):
1✔
1113
            return ComponentPriority.READY
1✔
1114
        elif are_all_lazy_variadic_sockets_resolved(component, inputs):
1✔
1115
            return ComponentPriority.DEFER
1✔
1116
        else:
1117
            return ComponentPriority.DEFER_LAST
1✔
1118

1119
    def _get_component_with_graph_metadata_and_visits(self, component_name: str, visits: int) -> dict[str, Any]:
1✔
1120
        """
1121
        Returns the component instance alongside input/output-socket metadata from the graph and adds current visits.
1122

1123
        We can't store visits in the pipeline graph because this would prevent reentrance / thread-safe execution.
1124

1125
        :param component_name: The name of the component.
1126
        :param visits: Number of visits for the component.
1127
        :returns: Dict including component instance, input/output-sockets and visits.
1128
        """
1129
        comp_dict = self.graph.nodes[component_name]
1✔
1130
        comp_dict = {**comp_dict, "visits": visits}
1✔
1131
        return comp_dict
1✔
1132

1133
    def _get_next_runnable_component(
1✔
1134
        self, priority_queue: FIFOPriorityQueue, component_visits: dict[str, int]
1135
    ) -> Union[tuple[ComponentPriority, str, dict[str, Any]], None]:
1136
        """
1137
        Returns the next runnable component alongside its metadata from the priority queue.
1138

1139
        :param priority_queue: Priority queue of component names.
1140
        :param component_visits: Current state of component visits.
1141
        :returns: The next runnable component, the component name, and its priority
1142
            or None if no component in the queue can run.
1143
        :raises: PipelineMaxComponentRuns if the next runnable component has exceeded the maximum number of runs.
1144
        """
1145
        priority_and_component_name: Union[tuple[ComponentPriority, str], None] = (
1✔
1146
            None if (item := priority_queue.get()) is None else (ComponentPriority(item[0]), str(item[1]))
1147
        )
1148

1149
        if priority_and_component_name is None:
1✔
1150
            return None
1✔
1151

1152
        priority, component_name = priority_and_component_name
1✔
1153
        comp = self._get_component_with_graph_metadata_and_visits(component_name, component_visits[component_name])
1✔
1154
        if comp["visits"] > self._max_runs_per_component:
1✔
1155
            msg = f"Maximum run count {self._max_runs_per_component} reached for component '{component_name}'"
1✔
1156
            raise PipelineMaxComponentRuns(msg)
1✔
1157
        return priority, component_name, comp
1✔
1158

1159
    @staticmethod
1✔
1160
    def _add_missing_input_defaults(
1✔
1161
        component_inputs: dict[str, Any], component_input_sockets: dict[str, InputSocket]
1162
    ) -> dict[str, Any]:
1163
        """
1164
        Updates the inputs with the default values for the inputs that are missing
1165

1166
        :param component_inputs: Inputs for the component.
1167
        :param component_input_sockets: Input sockets of the component.
1168
        """
1169
        for name, socket in component_input_sockets.items():
1✔
1170
            if not socket.is_mandatory and name not in component_inputs:
1✔
1171
                if socket.is_variadic:
1✔
1172
                    component_inputs[name] = [socket.default_value]
1✔
1173
                else:
1174
                    component_inputs[name] = socket.default_value
1✔
1175

1176
        return component_inputs
1✔
1177

1178
    def _tiebreak_waiting_components(
1✔
1179
        self,
1180
        component_name: str,
1181
        priority: ComponentPriority,
1182
        priority_queue: FIFOPriorityQueue,
1183
        topological_sort: Union[dict[str, int], None],
1184
    ) -> tuple[str, Union[dict[str, int], None]]:
1185
        """
1186
        Decides which component to run when multiple components are waiting for inputs with the same priority.
1187

1188
        :param component_name: The name of the component.
1189
        :param priority: Priority of the component.
1190
        :param priority_queue: Priority queue of component names.
1191
        :param topological_sort: Cached topological sort of all components in the pipeline.
1192
        """
1193
        components_with_same_priority = [component_name]
1✔
1194

1195
        while len(priority_queue) > 0:
1✔
1196
            next_priority, next_component_name = priority_queue.peek()
1✔
1197
            if next_priority == priority:
1✔
1198
                priority_queue.pop()  # actually remove the component
×
1199
                components_with_same_priority.append(next_component_name)
×
1200
            else:
1201
                break
×
1202

1203
        if len(components_with_same_priority) > 1:
1✔
1204
            if topological_sort is None:
×
1205
                if networkx.is_directed_acyclic_graph(self.graph):
×
1206
                    topological_sort = networkx.lexicographical_topological_sort(self.graph)
×
1207
                    topological_sort = {node: idx for idx, node in enumerate(topological_sort)}
×
1208
                else:
1209
                    condensed = networkx.condensation(self.graph)
×
1210
                    condensed_sorted = {node: idx for idx, node in enumerate(networkx.topological_sort(condensed))}
×
1211
                    topological_sort = {
×
1212
                        component_name: condensed_sorted[node]
1213
                        for component_name, node in condensed.graph["mapping"].items()
1214
                    }
1215

1216
            components_with_same_priority = sorted(
×
1217
                components_with_same_priority, key=lambda comp_name: (topological_sort[comp_name], comp_name.lower())
1218
            )
1219

1220
            component_name = components_with_same_priority[0]
×
1221

1222
        return component_name, topological_sort
1✔
1223

1224
    @staticmethod
1✔
1225
    def _write_component_outputs(
1✔
1226
        component_name: str,
1227
        component_outputs: Mapping[str, Any],
1228
        inputs: dict[str, Any],
1229
        receivers: list[tuple],
1230
        include_outputs_from: set[str],
1231
    ) -> Mapping[str, Any]:
1232
        """
1233
        Distributes the outputs of a component to the input sockets that it is connected to.
1234

1235
        :param component_name: The name of the component.
1236
        :param component_outputs: The outputs of the component.
1237
        :param inputs: The current global input state.
1238
        :param receivers: List of components that receive inputs from the component.
1239
        :param include_outputs_from: List of component names that should always return an output from the pipeline.
1240
        """
1241
        for receiver_name, sender_socket, receiver_socket in receivers:
1✔
1242
            # We either get the value that was produced by the actor or we use the _NO_OUTPUT_PRODUCED class to indicate
1243
            # that the sender did not produce an output for this socket.
1244
            # This allows us to track if a predecessor already ran but did not produce an output.
1245
            value = component_outputs.get(sender_socket.name, _NO_OUTPUT_PRODUCED)
1✔
1246

1247
            if receiver_name not in inputs:
1✔
1248
                inputs[receiver_name] = {}
1✔
1249

1250
            if is_socket_lazy_variadic(receiver_socket):
1✔
1251
                # If the receiver socket is lazy variadic, we append the new input.
1252
                # Lazy variadic sockets can collect multiple inputs.
1253
                _write_to_lazy_variadic_socket(
1✔
1254
                    inputs=inputs,
1255
                    receiver_name=receiver_name,
1256
                    receiver_socket_name=receiver_socket.name,
1257
                    component_name=component_name,
1258
                    value=value,
1259
                )
1260
            else:
1261
                # If the receiver socket is not lazy variadic, it is greedy variadic or non-variadic.
1262
                # We overwrite with the new input if it's not _NO_OUTPUT_PRODUCED or if the current value is None.
1263
                _write_to_standard_socket(
1✔
1264
                    inputs=inputs,
1265
                    receiver_name=receiver_name,
1266
                    receiver_socket_name=receiver_socket.name,
1267
                    component_name=component_name,
1268
                    value=value,
1269
                )
1270

1271
        # If we want to include all outputs from this actor in the final outputs, we don't need to prune any consumed
1272
        # outputs
1273
        if component_name in include_outputs_from:
1✔
1274
            return component_outputs
1✔
1275

1276
        # We prune outputs that were consumed by any receiving sockets.
1277
        # All remaining outputs will be added to the final outputs of the pipeline.
1278
        consumed_outputs = {sender_socket.name for _, sender_socket, __ in receivers}
1✔
1279
        pruned_outputs = {key: value for key, value in component_outputs.items() if key not in consumed_outputs}
1✔
1280

1281
        return pruned_outputs
1✔
1282

1283
    @staticmethod
1✔
1284
    def _is_queue_stale(priority_queue: FIFOPriorityQueue) -> bool:
1✔
1285
        """
1286
        Checks if the priority queue needs to be recomputed because the priorities might have changed.
1287

1288
        :param priority_queue: Priority queue of component names.
1289
        """
1290
        return len(priority_queue) == 0 or priority_queue.peek()[0] > ComponentPriority.READY
1✔
1291

1292
    @staticmethod
1✔
1293
    def validate_pipeline(priority_queue: FIFOPriorityQueue) -> None:
1✔
1294
        """
1295
        Validate the pipeline to check if it is blocked or has no valid entry point.
1296

1297
        :param priority_queue: Priority queue of component names.
1298
        :raises PipelineRuntimeError:
1299
            If the pipeline is blocked or has no valid entry point.
1300
        """
1301
        if len(priority_queue) == 0:
1✔
1302
            return
×
1303

1304
        candidate = priority_queue.peek()
1✔
1305
        if candidate is not None and candidate[0] == ComponentPriority.BLOCKED:
1✔
1306
            raise PipelineComponentsBlockedError()
×
1307

1308
    def _find_super_components(self) -> list[tuple[str, Component]]:
1✔
1309
        """
1310
        Find all SuperComponents in the pipeline.
1311

1312
        :returns:
1313
            List of tuples containing (component_name, component_instance) representing a SuperComponent.
1314
        """
1315

1316
        super_components = []
×
1317
        for comp_name, comp in self.walk():
×
1318
            # a SuperComponent has a "pipeline" attribute which itself a Pipeline instance
1319
            # we don't test against SuperComponent because doing so always lead to circular imports
1320
            if hasattr(comp, "pipeline") and isinstance(comp.pipeline, self.__class__):
×
1321
                super_components.append((comp_name, comp))
×
1322
        return super_components
×
1323

1324
    def _merge_super_component_pipelines(self) -> tuple["networkx.MultiDiGraph", dict[str, str]]:
1✔
1325
        """
1326
        Merge the internal pipelines of SuperComponents into the main pipeline graph structure.
1327

1328
        This creates a new networkx.MultiDiGraph containing all the components from both the main pipeline
1329
        and all the internal SuperComponents' pipelines. The SuperComponents are removed and their internal
1330
        components are connected to corresponding input and output sockets of the main pipeline.
1331

1332
        :returns:
1333
            A tuple containing:
1334
            - A networkx.MultiDiGraph with the expanded structure of the main pipeline and all it's SuperComponents
1335
            - A dictionary mapping component names to boolean indicating that this component was part of a
1336
              SuperComponent
1337
            - A dictionary mapping component names to their SuperComponent name
1338
        """
1339
        merged_graph = self.graph.copy()
×
1340
        super_component_mapping: dict[str, str] = {}
×
1341

1342
        for super_name, super_component in self._find_super_components():
×
1343
            internal_pipeline = super_component.pipeline  # type: ignore
×
1344
            internal_graph = internal_pipeline.graph.copy()
×
1345

1346
            # Mark all components in the internal pipeline as being part of a SuperComponent
1347
            for node in internal_graph.nodes():
×
1348
                super_component_mapping[node] = super_name
×
1349

1350
            # edges connected to the super component
1351
            incoming_edges = list(merged_graph.in_edges(super_name, data=True))
×
1352
            outgoing_edges = list(merged_graph.out_edges(super_name, data=True))
×
1353

1354
            # merge the SuperComponent graph into the main graph and remove the super component node
1355
            # since its components are now part of the main graph
1356
            merged_graph = networkx.compose(merged_graph, internal_graph)
×
1357
            merged_graph.remove_node(super_name)
×
1358

1359
            # get the entry and exit points of the SuperComponent internal pipeline
1360
            entry_points = [n for n in internal_graph.nodes() if internal_graph.in_degree(n) == 0]
×
1361
            exit_points = [n for n in internal_graph.nodes() if internal_graph.out_degree(n) == 0]
×
1362

1363
            # connect the incoming edges to entry points
1364
            for sender, _, edge_data in incoming_edges:
×
1365
                sender_socket = edge_data["from_socket"]
×
1366
                for entry_point in entry_points:
×
1367
                    # find a matching input socket in the entry point
1368
                    entry_point_sockets = internal_graph.nodes[entry_point]["input_sockets"]
×
1369
                    for socket_name, socket in entry_point_sockets.items():
×
1370
                        if _types_are_compatible(sender_socket.type, socket.type, self._connection_type_validation):
×
1371
                            merged_graph.add_edge(
×
1372
                                sender,
1373
                                entry_point,
1374
                                key=f"{sender_socket.name}/{socket_name}",
1375
                                conn_type=_type_name(sender_socket.type),
1376
                                from_socket=sender_socket,
1377
                                to_socket=socket,
1378
                                mandatory=socket.is_mandatory,
1379
                            )
1380

1381
            # connect outgoing edges from exit points
1382
            for _, receiver, edge_data in outgoing_edges:
×
1383
                receiver_socket = edge_data["to_socket"]
×
1384
                for exit_point in exit_points:
×
1385
                    # find a matching output socket in the exit point
1386
                    exit_point_sockets = internal_graph.nodes[exit_point]["output_sockets"]
×
1387
                    for socket_name, socket in exit_point_sockets.items():
×
1388
                        if _types_are_compatible(socket.type, receiver_socket.type, self._connection_type_validation):
×
1389
                            merged_graph.add_edge(
×
1390
                                exit_point,
1391
                                receiver,
1392
                                key=f"{socket_name}/{receiver_socket.name}",
1393
                                conn_type=_type_name(socket.type),
1394
                                from_socket=socket,
1395
                                to_socket=receiver_socket,
1396
                                mandatory=receiver_socket.is_mandatory,
1397
                            )
1398

1399
        return merged_graph, super_component_mapping
×
1400

1401
    def _is_pipeline_possibly_blocked(self, current_pipeline_outputs: dict[str, Any]) -> bool:
1✔
1402
        """
1403
        Heuristically determines whether the pipeline is possibly blocked based on its current outputs.
1404

1405
        This method checks if the pipeline has produced any of the expected outputs.
1406
        - If no outputs are expected (i.e., `self.outputs()` returns an empty list), the method assumes the pipeline
1407
        is not blocked.
1408
        - If at least one expected output is present in `current_pipeline_outputs`, the pipeline is also assumed to not
1409
        be blocked.
1410
        - If none of the expected outputs are present, the pipeline is considered to be possibly blocked.
1411

1412
        Note: This check is not definitive—it is intended as a best-effort guess to detect a stalled or misconfigured
1413
        pipeline when there are no more runnable components.
1414

1415
        :param current_pipeline_outputs: A dictionary of outputs currently produced by the pipeline.
1416
        :returns:
1417
            bool: True if the pipeline is possibly blocked (i.e., expected outputs are missing), False otherwise.
1418
        """
1419
        expected_outputs = self.outputs()
1✔
1420
        return bool(expected_outputs) and not any(k in current_pipeline_outputs for k in expected_outputs)
1✔
1421

1422

1423
def _connections_status(
1✔
1424
    sender_node: str, receiver_node: str, sender_sockets: list[OutputSocket], receiver_sockets: list[InputSocket]
1425
) -> str:
1426
    """
1427
    Lists the status of the sockets, for error messages.
1428
    """
1429
    sender_sockets_entries = []
1✔
1430
    for sender_socket in sender_sockets:
1✔
1431
        sender_sockets_entries.append(f" - {sender_socket.name}: {_type_name(sender_socket.type)}")
1✔
1432
    sender_sockets_list = "\n".join(sender_sockets_entries)
1✔
1433

1434
    receiver_sockets_entries = []
1✔
1435
    for receiver_socket in receiver_sockets:
1✔
1436
        if receiver_socket.senders:
1✔
1437
            sender_status = f"sent by {','.join(receiver_socket.senders)}"
1✔
1438
        else:
1439
            sender_status = "available"
1✔
1440
        receiver_sockets_entries.append(
1✔
1441
            f" - {receiver_socket.name}: {_type_name(receiver_socket.type)} ({sender_status})"
1442
        )
1443
    receiver_sockets_list = "\n".join(receiver_sockets_entries)
1✔
1444

1445
    return f"'{sender_node}':\n{sender_sockets_list}\n'{receiver_node}':\n{receiver_sockets_list}"
1✔
1446

1447

1448
# Utility functions for writing to sockets
1449

1450

1451
def _write_to_lazy_variadic_socket(
1✔
1452
    inputs: dict[str, Any], receiver_name: str, receiver_socket_name: str, component_name: str, value: Any
1453
) -> None:
1454
    """
1455
    Write to a lazy variadic socket.
1456

1457
    Mutates inputs in place.
1458
    """
1459
    if not inputs[receiver_name].get(receiver_socket_name):
1✔
1460
        inputs[receiver_name][receiver_socket_name] = []
1✔
1461

1462
    inputs[receiver_name][receiver_socket_name].append({"sender": component_name, "value": value})
1✔
1463

1464

1465
def _write_to_standard_socket(
1✔
1466
    inputs: dict[str, Any], receiver_name: str, receiver_socket_name: str, component_name: str, value: Any
1467
) -> None:
1468
    """
1469
    Write to a greedy variadic or non-variadic socket.
1470

1471
    Mutates inputs in place.
1472
    """
1473
    current_value = inputs[receiver_name].get(receiver_socket_name)
1✔
1474

1475
    # Only overwrite if there's no existing value, or we have a new value to provide
1476
    if current_value is None or value is not _NO_OUTPUT_PRODUCED:
1✔
1477
        inputs[receiver_name][receiver_socket_name] = [{"sender": component_name, "value": value}]
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc