• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 15049844454

15 May 2025 04:07PM UTC coverage: 90.446% (+0.04%) from 90.41%
15049844454

Pull #9345

github

web-flow
Merge 9e4071f83 into 2a64cd4e9
Pull Request #9345: feat: add serialization to `State` / move `State` to utils

10981 of 12141 relevant lines covered (90.45%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.24
haystack/core/pipeline/base.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import itertools
1✔
6
from collections import defaultdict
1✔
7
from datetime import datetime
1✔
8
from enum import IntEnum
1✔
9
from pathlib import Path
1✔
10
from typing import Any, Dict, Iterator, List, Optional, Set, TextIO, Tuple, Type, TypeVar, Union
1✔
11

12
import networkx  # type:ignore
1✔
13

14
from haystack import logging, tracing
1✔
15
from haystack.core.component import Component, InputSocket, OutputSocket, component
1✔
16
from haystack.core.errors import (
1✔
17
    DeserializationError,
18
    PipelineComponentsBlockedError,
19
    PipelineConnectError,
20
    PipelineDrawingError,
21
    PipelineError,
22
    PipelineMaxComponentRuns,
23
    PipelineUnmarshalError,
24
    PipelineValidationError,
25
)
26
from haystack.core.pipeline.component_checks import (
1✔
27
    _NO_OUTPUT_PRODUCED,
28
    all_predecessors_executed,
29
    are_all_lazy_variadic_sockets_resolved,
30
    are_all_sockets_ready,
31
    can_component_run,
32
    is_any_greedy_socket_ready,
33
    is_socket_lazy_variadic,
34
)
35
from haystack.core.pipeline.utils import FIFOPriorityQueue, _deepcopy_with_exceptions, parse_connect_string
1✔
36
from haystack.core.serialization import DeserializationCallbacks, component_from_dict, component_to_dict
1✔
37
from haystack.core.type_utils import _type_name, _types_are_compatible
1✔
38
from haystack.marshal import Marshaller, YamlMarshaller
1✔
39
from haystack.utils import is_in_jupyter, type_serialization
1✔
40

41
from .descriptions import find_pipeline_inputs, find_pipeline_outputs
1✔
42
from .draw import _to_mermaid_image
1✔
43
from .template import PipelineTemplate, PredefinedPipeline
1✔
44

45
DEFAULT_MARSHALLER = YamlMarshaller()
1✔
46

47
# We use a generic type to annotate the return value of class methods,
48
# so that static analyzers won't be confused when derived classes
49
# use those methods.
50
T = TypeVar("T", bound="PipelineBase")
1✔
51

52
logger = logging.getLogger(__name__)
1✔
53

54

55
# Constants for tracing tags
56
_COMPONENT_INPUT = "haystack.component.input"
1✔
57
_COMPONENT_OUTPUT = "haystack.component.output"
1✔
58
_COMPONENT_VISITS = "haystack.component.visits"
1✔
59

60

61
class ComponentPriority(IntEnum):
1✔
62
    HIGHEST = 1
1✔
63
    READY = 2
1✔
64
    DEFER = 3
1✔
65
    DEFER_LAST = 4
1✔
66
    BLOCKED = 5
1✔
67

68

69
class PipelineBase:
1✔
70
    """
71
    Components orchestration engine.
72

73
    Builds a graph of components and orchestrates their execution according to the execution graph.
74
    """
75

76
    def __init__(
1✔
77
        self,
78
        metadata: Optional[Dict[str, Any]] = None,
79
        max_runs_per_component: int = 100,
80
        connection_type_validation: bool = True,
81
    ):
82
        """
83
        Creates the Pipeline.
84

85
        :param metadata:
86
            Arbitrary dictionary to store metadata about this `Pipeline`. Make sure all the values contained in
87
            this dictionary can be serialized and deserialized if you wish to save this `Pipeline` to file.
88
        :param max_runs_per_component:
89
            How many times the `Pipeline` can run the same Component.
90
            If this limit is reached a `PipelineMaxComponentRuns` exception is raised.
91
            If not set defaults to 100 runs per Component.
92
        :param connection_type_validation: Whether the pipeline will validate the types of the connections.
93
            Defaults to True.
94
        """
95
        self._telemetry_runs = 0
1✔
96
        self._last_telemetry_sent: Optional[datetime] = None
1✔
97
        self.metadata = metadata or {}
1✔
98
        self.graph = networkx.MultiDiGraph()
1✔
99
        self._max_runs_per_component = max_runs_per_component
1✔
100
        self._connection_type_validation = connection_type_validation
1✔
101

102
    def __eq__(self, other) -> bool:
1✔
103
        """
104
        Pipeline equality is defined by their type and the equality of their serialized form.
105

106
        Pipelines of the same type share every metadata, node and edge, but they're not required to use
107
        the same node instances: this allows pipeline saved and then loaded back to be equal to themselves.
108
        """
109
        if not isinstance(self, type(other)):
1✔
110
            return False
×
111
        return self.to_dict() == other.to_dict()
1✔
112

113
    def __repr__(self) -> str:
1✔
114
        """
115
        Returns a text representation of the Pipeline.
116
        """
117
        res = f"{object.__repr__(self)}\n"
1✔
118
        if self.metadata:
1✔
119
            res += "🧱 Metadata\n"
1✔
120
            for k, v in self.metadata.items():
1✔
121
                res += f"  - {k}: {v}\n"
1✔
122

123
        res += "🚅 Components\n"
1✔
124
        for name, instance in self.graph.nodes(data="instance"):  # type: ignore # type wrongly defined in networkx
1✔
125
            res += f"  - {name}: {instance.__class__.__name__}\n"
1✔
126

127
        res += "🛤️ Connections\n"
1✔
128
        for sender, receiver, edge_data in self.graph.edges(data=True):
1✔
129
            sender_socket = edge_data["from_socket"].name
1✔
130
            receiver_socket = edge_data["to_socket"].name
1✔
131
            res += f"  - {sender}.{sender_socket} -> {receiver}.{receiver_socket} ({edge_data['conn_type']})\n"
1✔
132

133
        return res
1✔
134

135
    def to_dict(self) -> Dict[str, Any]:
1✔
136
        """
137
        Serializes the pipeline to a dictionary.
138

139
        This is meant to be an intermediate representation but it can be also used to save a pipeline to file.
140

141
        :returns:
142
            Dictionary with serialized data.
143
        """
144
        components = {}
1✔
145
        for name, instance in self.graph.nodes(data="instance"):  # type:ignore
1✔
146
            components[name] = component_to_dict(instance, name)
1✔
147

148
        connections = []
1✔
149
        for sender, receiver, edge_data in self.graph.edges.data():
1✔
150
            sender_socket = edge_data["from_socket"].name
1✔
151
            receiver_socket = edge_data["to_socket"].name
1✔
152
            connections.append({"sender": f"{sender}.{sender_socket}", "receiver": f"{receiver}.{receiver_socket}"})
1✔
153
        return {
1✔
154
            "metadata": self.metadata,
155
            "max_runs_per_component": self._max_runs_per_component,
156
            "components": components,
157
            "connections": connections,
158
            "connection_type_validation": self._connection_type_validation,
159
        }
160

161
    @classmethod
1✔
162
    def from_dict(
1✔
163
        cls: Type[T], data: Dict[str, Any], callbacks: Optional[DeserializationCallbacks] = None, **kwargs
164
    ) -> T:
165
        """
166
        Deserializes the pipeline from a dictionary.
167

168
        :param data:
169
            Dictionary to deserialize from.
170
        :param callbacks:
171
            Callbacks to invoke during deserialization.
172
        :param kwargs:
173
            `components`: a dictionary of {name: instance} to reuse instances of components instead of creating new
174
            ones.
175
        :returns:
176
            Deserialized component.
177
        """
178
        data_copy = _deepcopy_with_exceptions(data)  # to prevent modification of original data
1✔
179
        metadata = data_copy.get("metadata", {})
1✔
180
        max_runs_per_component = data_copy.get("max_runs_per_component", 100)
1✔
181
        connection_type_validation = data_copy.get("connection_type_validation", True)
1✔
182
        pipe = cls(
1✔
183
            metadata=metadata,
184
            max_runs_per_component=max_runs_per_component,
185
            connection_type_validation=connection_type_validation,
186
        )
187
        components_to_reuse = kwargs.get("components", {})
1✔
188
        for name, component_data in data_copy.get("components", {}).items():
1✔
189
            if name in components_to_reuse:
1✔
190
                # Reuse an instance
191
                instance = components_to_reuse[name]
1✔
192
            else:
193
                if "type" not in component_data:
1✔
194
                    raise PipelineError(f"Missing 'type' in component '{name}'")
1✔
195

196
                if component_data["type"] not in component.registry:
1✔
197
                    try:
1✔
198
                        # Import the module first...
199
                        module, _ = component_data["type"].rsplit(".", 1)
1✔
200
                        logger.debug("Trying to import module {module_name}", module_name=module)
1✔
201
                        type_serialization.thread_safe_import(module)
1✔
202
                        # ...then try again
203
                        if component_data["type"] not in component.registry:
1✔
204
                            raise PipelineError(
1✔
205
                                f"Successfully imported module '{module}' but couldn't find "
206
                                f"'{component_data['type']}' in the component registry.\n"
207
                                f"The component might be registered under a different path. "
208
                                f"Here are the registered components:\n {list(component.registry.keys())}\n"
209
                            )
210
                    except (ImportError, PipelineError, ValueError) as e:
1✔
211
                        raise PipelineError(
1✔
212
                            f"Component '{component_data['type']}' (name: '{name}') not imported. Please "
213
                            f"check that the package is installed and the component path is correct."
214
                        ) from e
215

216
                # Create a new one
217
                component_class = component.registry[component_data["type"]]
1✔
218

219
                try:
1✔
220
                    instance = component_from_dict(component_class, component_data, name, callbacks)
1✔
221
                except Exception as e:
1✔
222
                    msg = (
1✔
223
                        f"Couldn't deserialize component '{name}' of class '{component_class.__name__}' "
224
                        f"with the following data: {str(component_data)}. Possible reasons include "
225
                        "malformed serialized data, mismatch between the serialized component and the "
226
                        "loaded one (due to a breaking change, see "
227
                        "https://github.com/deepset-ai/haystack/releases), etc."
228
                    )
229
                    raise DeserializationError(msg) from e
1✔
230
            pipe.add_component(name=name, instance=instance)
1✔
231

232
        for connection in data.get("connections", []):
1✔
233
            if "sender" not in connection:
1✔
234
                raise PipelineError(f"Missing sender in connection: {connection}")
1✔
235
            if "receiver" not in connection:
1✔
236
                raise PipelineError(f"Missing receiver in connection: {connection}")
1✔
237
            pipe.connect(sender=connection["sender"], receiver=connection["receiver"])
1✔
238

239
        return pipe
1✔
240

241
    def dumps(self, marshaller: Marshaller = DEFAULT_MARSHALLER) -> str:
1✔
242
        """
243
        Returns the string representation of this pipeline according to the format dictated by the `Marshaller` in use.
244

245
        :param marshaller:
246
            The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
247
        :returns:
248
            A string representing the pipeline.
249
        """
250
        return marshaller.marshal(self.to_dict())
1✔
251

252
    def dump(self, fp: TextIO, marshaller: Marshaller = DEFAULT_MARSHALLER):
1✔
253
        """
254
        Writes the string representation of this pipeline to the file-like object passed in the `fp` argument.
255

256
        :param fp:
257
            A file-like object ready to be written to.
258
        :param marshaller:
259
            The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
260
        """
261
        fp.write(marshaller.marshal(self.to_dict()))
1✔
262

263
    @classmethod
1✔
264
    def loads(
1✔
265
        cls: Type[T],
266
        data: Union[str, bytes, bytearray],
267
        marshaller: Marshaller = DEFAULT_MARSHALLER,
268
        callbacks: Optional[DeserializationCallbacks] = None,
269
    ) -> T:
270
        """
271
        Creates a `Pipeline` object from the string representation passed in the `data` argument.
272

273
        :param data:
274
            The string representation of the pipeline, can be `str`, `bytes` or `bytearray`.
275
        :param marshaller:
276
            The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
277
        :param callbacks:
278
            Callbacks to invoke during deserialization.
279
        :raises DeserializationError:
280
            If an error occurs during deserialization.
281
        :returns:
282
            A `Pipeline` object.
283
        """
284
        try:
1✔
285
            deserialized_data = marshaller.unmarshal(data)
1✔
286
        except Exception as e:
1✔
287
            raise DeserializationError(
1✔
288
                "Error while unmarshalling serialized pipeline data. This is usually "
289
                "caused by malformed or invalid syntax in the serialized representation."
290
            ) from e
291

292
        return cls.from_dict(deserialized_data, callbacks)
1✔
293

294
    @classmethod
1✔
295
    def load(
1✔
296
        cls: Type[T],
297
        fp: TextIO,
298
        marshaller: Marshaller = DEFAULT_MARSHALLER,
299
        callbacks: Optional[DeserializationCallbacks] = None,
300
    ) -> T:
301
        """
302
        Creates a `Pipeline` object a string representation.
303

304
        The string representation is read from the file-like object passed in the `fp` argument.
305

306

307
        :param fp:
308
            A file-like object ready to be read from.
309
        :param marshaller:
310
            The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
311
        :param callbacks:
312
            Callbacks to invoke during deserialization.
313
        :raises DeserializationError:
314
            If an error occurs during deserialization.
315
        :returns:
316
            A `Pipeline` object.
317
        """
318
        return cls.loads(fp.read(), marshaller, callbacks)
1✔
319

320
    def add_component(self, name: str, instance: Component) -> None:
1✔
321
        """
322
        Add the given component to the pipeline.
323

324
        Components are not connected to anything by default: use `Pipeline.connect()` to connect components together.
325
        Component names must be unique, but component instances can be reused if needed.
326

327
        :param name:
328
            The name of the component to add.
329
        :param instance:
330
            The component instance to add.
331

332
        :raises ValueError:
333
            If a component with the same name already exists.
334
        :raises PipelineValidationError:
335
            If the given instance is not a component.
336
        """
337
        # Component names are unique
338
        if name in self.graph.nodes:
1✔
339
            raise ValueError(f"A component named '{name}' already exists in this pipeline: choose another name.")
×
340

341
        # Components can't be named `_debug`
342
        if name == "_debug":
1✔
343
            raise ValueError("'_debug' is a reserved name for debug output. Choose another name.")
1✔
344

345
        # Component names can't have "."
346
        if "." in name:
1✔
347
            raise ValueError(f"{name} is an invalid component name, cannot contain '.' (dot) characters.")
1✔
348

349
        # Component instances must be components
350
        if not isinstance(instance, Component):
1✔
351
            raise PipelineValidationError(
×
352
                f"'{type(instance)}' doesn't seem to be a component. Is this class decorated with @component?"
353
            )
354

355
        if getattr(instance, "__haystack_added_to_pipeline__", None):
1✔
356
            msg = (
1✔
357
                "Component has already been added in another Pipeline. Components can't be shared between Pipelines. "
358
                "Create a new instance instead."
359
            )
360
            raise PipelineError(msg)
1✔
361

362
        setattr(instance, "__haystack_added_to_pipeline__", self)
1✔
363

364
        # Add component to the graph, disconnected
365
        logger.debug("Adding component '{component_name}' ({component})", component_name=name, component=instance)
1✔
366
        # We're completely sure the fields exist so we ignore the type error
367
        self.graph.add_node(
1✔
368
            name,
369
            instance=instance,
370
            input_sockets=instance.__haystack_input__._sockets_dict,  # type: ignore[attr-defined]
371
            output_sockets=instance.__haystack_output__._sockets_dict,  # type: ignore[attr-defined]
372
            visits=0,
373
        )
374

375
    def remove_component(self, name: str) -> Component:
1✔
376
        """
377
        Remove and returns component from the pipeline.
378

379
        Remove an existing component from the pipeline by providing its name.
380
        All edges that connect to the component will also be deleted.
381

382
        :param name:
383
            The name of the component to remove.
384
        :returns:
385
            The removed Component instance.
386

387
        :raises ValueError:
388
            If there is no component with that name already in the Pipeline.
389
        """
390

391
        # Check that a component with that name is in the Pipeline
392
        try:
1✔
393
            instance = self.get_component(name)
1✔
394
        except ValueError as exc:
1✔
395
            raise ValueError(
1✔
396
                f"There is no component named '{name}' in the pipeline. The valid component names are: ",
397
                ", ".join(n for n in self.graph.nodes),
398
            ) from exc
399

400
        # Delete component from the graph, deleting all its connections
401
        self.graph.remove_node(name)
1✔
402

403
        # Reset the Component sockets' senders and receivers
404
        input_sockets = instance.__haystack_input__._sockets_dict  # type: ignore[attr-defined]
1✔
405
        for socket in input_sockets.values():
1✔
406
            socket.senders = []
1✔
407

408
        output_sockets = instance.__haystack_output__._sockets_dict  # type: ignore[attr-defined]
1✔
409
        for socket in output_sockets.values():
1✔
410
            socket.receivers = []
1✔
411

412
        # Reset the Component's pipeline reference
413
        setattr(instance, "__haystack_added_to_pipeline__", None)
1✔
414

415
        return instance
1✔
416

417
    def connect(self, sender: str, receiver: str) -> "PipelineBase":  # noqa: PLR0915 PLR0912
1✔
418
        """
419
        Connects two components together.
420

421
        All components to connect must exist in the pipeline.
422
        If connecting to a component that has several output connections, specify the inputs and output names as
423
        'component_name.connections_name'.
424

425
        :param sender:
426
            The component that delivers the value. This can be either just a component name or can be
427
            in the format `component_name.connection_name` if the component has multiple outputs.
428
        :param receiver:
429
            The component that receives the value. This can be either just a component name or can be
430
            in the format `component_name.connection_name` if the component has multiple inputs.
431
        :param connection_type_validation: Whether the pipeline will validate the types of the connections.
432
            Defaults to the value set in the pipeline.
433
        :returns:
434
            The Pipeline instance.
435

436
        :raises PipelineConnectError:
437
            If the two components cannot be connected (for example if one of the components is
438
            not present in the pipeline, or the connections don't match by type, and so on).
439
        """
440
        # Edges may be named explicitly by passing 'node_name.edge_name' to connect().
441
        sender_component_name, sender_socket_name = parse_connect_string(sender)
1✔
442
        receiver_component_name, receiver_socket_name = parse_connect_string(receiver)
1✔
443

444
        if sender_component_name == receiver_component_name:
1✔
445
            raise PipelineConnectError("Connecting a Component to itself is not supported.")
1✔
446

447
        # Get the nodes data.
448
        try:
1✔
449
            sender_sockets = self.graph.nodes[sender_component_name]["output_sockets"]
1✔
450
        except KeyError as exc:
1✔
451
            raise ValueError(f"Component named {sender_component_name} not found in the pipeline.") from exc
1✔
452
        try:
1✔
453
            receiver_sockets = self.graph.nodes[receiver_component_name]["input_sockets"]
1✔
454
        except KeyError as exc:
1✔
455
            raise ValueError(f"Component named {receiver_component_name} not found in the pipeline.") from exc
1✔
456

457
        # If the name of either socket is given, get the socket
458
        sender_socket: Optional[OutputSocket] = None
1✔
459
        if sender_socket_name:
1✔
460
            sender_socket = sender_sockets.get(sender_socket_name)
1✔
461
            if not sender_socket:
1✔
462
                raise PipelineConnectError(
1✔
463
                    f"'{sender} does not exist. "
464
                    f"Output connections of {sender_component_name} are: "
465
                    + ", ".join([f"{name} (type {_type_name(socket.type)})" for name, socket in sender_sockets.items()])
466
                )
467

468
        receiver_socket: Optional[InputSocket] = None
1✔
469
        if receiver_socket_name:
1✔
470
            receiver_socket = receiver_sockets.get(receiver_socket_name)
1✔
471
            if not receiver_socket:
1✔
472
                raise PipelineConnectError(
1✔
473
                    f"'{receiver} does not exist. "
474
                    f"Input connections of {receiver_component_name} are: "
475
                    + ", ".join(
476
                        [f"{name} (type {_type_name(socket.type)})" for name, socket in receiver_sockets.items()]
477
                    )
478
                )
479

480
        # Look for a matching connection among the possible ones.
481
        # Note that if there is more than one possible connection but two sockets match by name, they're paired.
482
        sender_socket_candidates: List[OutputSocket] = (
1✔
483
            [sender_socket] if sender_socket else list(sender_sockets.values())
484
        )
485
        receiver_socket_candidates: List[InputSocket] = (
1✔
486
            [receiver_socket] if receiver_socket else list(receiver_sockets.values())
487
        )
488

489
        # Find all possible connections between these two components
490
        possible_connections = []
1✔
491
        for sender_sock, receiver_sock in itertools.product(sender_socket_candidates, receiver_socket_candidates):
1✔
492
            if _types_are_compatible(sender_sock.type, receiver_sock.type, self._connection_type_validation):
1✔
493
                possible_connections.append((sender_sock, receiver_sock))
1✔
494

495
        # We need this status for error messages, since we might need it in multiple places we calculate it here
496
        status = _connections_status(
1✔
497
            sender_node=sender_component_name,
498
            sender_sockets=sender_socket_candidates,
499
            receiver_node=receiver_component_name,
500
            receiver_sockets=receiver_socket_candidates,
501
        )
502

503
        if not possible_connections:
1✔
504
            # There's no possible connection between these two components
505
            if len(sender_socket_candidates) == len(receiver_socket_candidates) == 1:
1✔
506
                msg = (
1✔
507
                    f"Cannot connect '{sender_component_name}.{sender_socket_candidates[0].name}' with "
508
                    f"'{receiver_component_name}.{receiver_socket_candidates[0].name}': "
509
                    f"their declared input and output types do not match.\n{status}"
510
                )
511
            else:
512
                msg = (
×
513
                    f"Cannot connect '{sender_component_name}' with '{receiver_component_name}': "
514
                    f"no matching connections available.\n{status}"
515
                )
516
            raise PipelineConnectError(msg)
1✔
517

518
        if len(possible_connections) == 1:
1✔
519
            # There's only one possible connection, use it
520
            sender_socket = possible_connections[0][0]
1✔
521
            receiver_socket = possible_connections[0][1]
1✔
522

523
        if len(possible_connections) > 1:
1✔
524
            # There are multiple possible connection, let's try to match them by name
525
            name_matches = [
1✔
526
                (out_sock, in_sock) for out_sock, in_sock in possible_connections if in_sock.name == out_sock.name
527
            ]
528
            if len(name_matches) != 1:
1✔
529
                # There's are either no matches or more than one, we can't pick one reliably
530
                msg = (
1✔
531
                    f"Cannot connect '{sender_component_name}' with "
532
                    f"'{receiver_component_name}': more than one connection is possible "
533
                    "between these components. Please specify the connection name, like: "
534
                    f"pipeline.connect('{sender_component_name}.{possible_connections[0][0].name}', "
535
                    f"'{receiver_component_name}.{possible_connections[0][1].name}').\n{status}"
536
                )
537
                raise PipelineConnectError(msg)
1✔
538

539
            # Get the only possible match
540
            sender_socket = name_matches[0][0]
1✔
541
            receiver_socket = name_matches[0][1]
1✔
542

543
        # Connection must be valid on both sender/receiver sides
544
        if not sender_socket or not receiver_socket or not sender_component_name or not receiver_component_name:
1✔
545
            if sender_component_name and sender_socket:
×
546
                sender_repr = f"{sender_component_name}.{sender_socket.name} ({_type_name(sender_socket.type)})"
×
547
            else:
548
                sender_repr = "input needed"
×
549

550
            if receiver_component_name and receiver_socket:
×
551
                receiver_repr = f"({_type_name(receiver_socket.type)}) {receiver_component_name}.{receiver_socket.name}"
×
552
            else:
553
                receiver_repr = "output"
×
554
            msg = f"Connection must have both sender and receiver: {sender_repr} -> {receiver_repr}"
×
555
            raise PipelineConnectError(msg)
×
556

557
        logger.debug(
1✔
558
            "Connecting '{sender_component}.{sender_socket_name}' to '{receiver_component}.{receiver_socket_name}'",
559
            sender_component=sender_component_name,
560
            sender_socket_name=sender_socket.name,
561
            receiver_component=receiver_component_name,
562
            receiver_socket_name=receiver_socket.name,
563
        )
564

565
        if receiver_component_name in sender_socket.receivers and sender_component_name in receiver_socket.senders:
1✔
566
            # This is already connected, nothing to do
567
            return self
1✔
568

569
        if receiver_socket.senders and not receiver_socket.is_variadic:
1✔
570
            # Only variadic input sockets can receive from multiple senders
571
            msg = (
1✔
572
                f"Cannot connect '{sender_component_name}.{sender_socket.name}' with "
573
                f"'{receiver_component_name}.{receiver_socket.name}': "
574
                f"{receiver_component_name}.{receiver_socket.name} is already connected to {receiver_socket.senders}.\n"
575
            )
576
            raise PipelineConnectError(msg)
1✔
577

578
        # Update the sockets with the new connection
579
        sender_socket.receivers.append(receiver_component_name)
1✔
580
        receiver_socket.senders.append(sender_component_name)
1✔
581

582
        # Create the new connection
583
        self.graph.add_edge(
1✔
584
            sender_component_name,
585
            receiver_component_name,
586
            key=f"{sender_socket.name}/{receiver_socket.name}",
587
            conn_type=_type_name(sender_socket.type),
588
            from_socket=sender_socket,
589
            to_socket=receiver_socket,
590
            mandatory=receiver_socket.is_mandatory,
591
        )
592
        return self
1✔
593

594
    def get_component(self, name: str) -> Component:
1✔
595
        """
596
        Get the component with the specified name from the pipeline.
597

598
        :param name:
599
            The name of the component.
600
        :returns:
601
            The instance of that component.
602

603
        :raises ValueError:
604
            If a component with that name is not present in the pipeline.
605
        """
606
        try:
1✔
607
            return self.graph.nodes[name]["instance"]
1✔
608
        except KeyError as exc:
1✔
609
            raise ValueError(f"Component named {name} not found in the pipeline.") from exc
1✔
610

611
    def get_component_name(self, instance: Component) -> str:
1✔
612
        """
613
        Returns the name of the Component instance if it has been added to this Pipeline or an empty string otherwise.
614

615
        :param instance:
616
            The Component instance to look for.
617
        :returns:
618
            The name of the Component instance.
619
        """
620
        for name, inst in self.graph.nodes(data="instance"):  # type: ignore # type wrongly defined in networkx
1✔
621
            if inst == instance:
1✔
622
                return name
1✔
623
        return ""
1✔
624

625
    def inputs(self, include_components_with_connected_inputs: bool = False) -> Dict[str, Dict[str, Any]]:
1✔
626
        """
627
        Returns a dictionary containing the inputs of a pipeline.
628

629
        Each key in the dictionary corresponds to a component name, and its value is another dictionary that describes
630
        the input sockets of that component, including their types and whether they are optional.
631

632
        :param include_components_with_connected_inputs:
633
            If `False`, only components that have disconnected input edges are
634
            included in the output.
635
        :returns:
636
            A dictionary where each key is a pipeline component name and each value is a dictionary of
637
            inputs sockets of that component.
638
        """
639
        inputs: Dict[str, Dict[str, Any]] = {}
1✔
640
        for component_name, data in find_pipeline_inputs(self.graph, include_components_with_connected_inputs).items():
1✔
641
            sockets_description = {}
1✔
642
            for socket in data:
1✔
643
                sockets_description[socket.name] = {"type": socket.type, "is_mandatory": socket.is_mandatory}
1✔
644
                if not socket.is_mandatory:
1✔
645
                    sockets_description[socket.name]["default_value"] = socket.default_value
1✔
646

647
            if sockets_description:
1✔
648
                inputs[component_name] = sockets_description
1✔
649
        return inputs
1✔
650

651
    def outputs(self, include_components_with_connected_outputs: bool = False) -> Dict[str, Dict[str, Any]]:
1✔
652
        """
653
        Returns a dictionary containing the outputs of a pipeline.
654

655
        Each key in the dictionary corresponds to a component name, and its value is another dictionary that describes
656
        the output sockets of that component.
657

658
        :param include_components_with_connected_outputs:
659
            If `False`, only components that have disconnected output edges are
660
            included in the output.
661
        :returns:
662
            A dictionary where each key is a pipeline component name and each value is a dictionary of
663
            output sockets of that component.
664
        """
665
        outputs = {
1✔
666
            comp: {socket.name: {"type": socket.type} for socket in data}
667
            for comp, data in find_pipeline_outputs(self.graph, include_components_with_connected_outputs).items()
668
            if data
669
        }
670
        return outputs
1✔
671

672
    def show(self, server_url: str = "https://mermaid.ink", params: Optional[dict] = None, timeout: int = 30) -> None:
1✔
673
        """
674
        Display an image representing this `Pipeline` in a Jupyter notebook.
675

676
        This function generates a diagram of the `Pipeline` using a Mermaid server and displays it directly in
677
        the notebook.
678

679
        :param server_url:
680
            The base URL of the Mermaid server used for rendering (default: 'https://mermaid.ink').
681
            See https://github.com/jihchi/mermaid.ink and https://github.com/mermaid-js/mermaid-live-editor for more
682
            info on how to set up your own Mermaid server.
683

684
        :param params:
685
            Dictionary of customization parameters to modify the output. Refer to Mermaid documentation for more details
686
            Supported keys:
687
                - format: Output format ('img', 'svg', or 'pdf'). Default: 'img'.
688
                - type: Image type for /img endpoint ('jpeg', 'png', 'webp'). Default: 'png'.
689
                - theme: Mermaid theme ('default', 'neutral', 'dark', 'forest'). Default: 'neutral'.
690
                - bgColor: Background color in hexadecimal (e.g., 'FFFFFF') or named format (e.g., '!white').
691
                - width: Width of the output image (integer).
692
                - height: Height of the output image (integer).
693
                - scale: Scaling factor (1–3). Only applicable if 'width' or 'height' is specified.
694
                - fit: Whether to fit the diagram size to the page (PDF only, boolean).
695
                - paper: Paper size for PDFs (e.g., 'a4', 'a3'). Ignored if 'fit' is true.
696
                - landscape: Landscape orientation for PDFs (boolean). Ignored if 'fit' is true.
697

698
        :param timeout:
699
            Timeout in seconds for the request to the Mermaid server.
700

701
        :raises PipelineDrawingError:
702
            If the function is called outside of a Jupyter notebook or if there is an issue with rendering.
703
        """
704
        if is_in_jupyter():
1✔
705
            from IPython.display import Image, display  # type: ignore
1✔
706

707
            image_data = _to_mermaid_image(self.graph, server_url=server_url, params=params, timeout=timeout)
1✔
708
            display(Image(image_data))
1✔
709
        else:
710
            msg = "This method is only supported in Jupyter notebooks. Use Pipeline.draw() to save an image locally."
1✔
711
            raise PipelineDrawingError(msg)
1✔
712

713
    def draw(
1✔
714
        self, path: Path, server_url: str = "https://mermaid.ink", params: Optional[dict] = None, timeout: int = 30
715
    ) -> None:
716
        """
717
        Save an image representing this `Pipeline` to the specified file path.
718

719
        This function generates a diagram of the `Pipeline` using the Mermaid server and saves it to the provided path.
720

721
        :param path:
722
            The file path where the generated image will be saved.
723
        :param server_url:
724
            The base URL of the Mermaid server used for rendering (default: 'https://mermaid.ink').
725
            See https://github.com/jihchi/mermaid.ink and https://github.com/mermaid-js/mermaid-live-editor for more
726
            info on how to set up your own Mermaid server.
727
        :param params:
728
            Dictionary of customization parameters to modify the output. Refer to Mermaid documentation for more details
729
            Supported keys:
730
                - format: Output format ('img', 'svg', or 'pdf'). Default: 'img'.
731
                - type: Image type for /img endpoint ('jpeg', 'png', 'webp'). Default: 'png'.
732
                - theme: Mermaid theme ('default', 'neutral', 'dark', 'forest'). Default: 'neutral'.
733
                - bgColor: Background color in hexadecimal (e.g., 'FFFFFF') or named format (e.g., '!white').
734
                - width: Width of the output image (integer).
735
                - height: Height of the output image (integer).
736
                - scale: Scaling factor (1–3). Only applicable if 'width' or 'height' is specified.
737
                - fit: Whether to fit the diagram size to the page (PDF only, boolean).
738
                - paper: Paper size for PDFs (e.g., 'a4', 'a3'). Ignored if 'fit' is true.
739
                - landscape: Landscape orientation for PDFs (boolean). Ignored if 'fit' is true.
740

741
        :param timeout:
742
            Timeout in seconds for the request to the Mermaid server.
743

744
        :raises PipelineDrawingError:
745
            If there is an issue with rendering or saving the image.
746
        """
747
        # Before drawing we edit a bit the graph, to avoid modifying the original that is
748
        # used for running the pipeline we copy it.
749
        image_data = _to_mermaid_image(self.graph, server_url=server_url, params=params, timeout=timeout)
1✔
750
        Path(path).write_bytes(image_data)
1✔
751

752
    def walk(self) -> Iterator[Tuple[str, Component]]:
1✔
753
        """
754
        Visits each component in the pipeline exactly once and yields its name and instance.
755

756
        No guarantees are provided on the visiting order.
757

758
        :returns:
759
            An iterator of tuples of component name and component instance.
760
        """
761
        for component_name, instance in self.graph.nodes(data="instance"):  # type: ignore # type is wrong in networkx
1✔
762
            yield component_name, instance
1✔
763

764
    def warm_up(self):
1✔
765
        """
766
        Make sure all nodes are warm.
767

768
        It's the node's responsibility to make sure this method can be called at every `Pipeline.run()`
769
        without re-initializing everything.
770
        """
771
        for node in self.graph.nodes:
1✔
772
            if hasattr(self.graph.nodes[node]["instance"], "warm_up"):
1✔
773
                logger.info("Warming up component {node}...", node=node)
1✔
774
                self.graph.nodes[node]["instance"].warm_up()
1✔
775

776
    @staticmethod
1✔
777
    def _create_component_span(
1✔
778
        component_name: str, instance: Component, inputs: Dict[str, Any], parent_span: Optional[tracing.Span] = None
779
    ):
780
        return tracing.tracer.trace(
1✔
781
            "haystack.component.run",
782
            tags={
783
                "haystack.component.name": component_name,
784
                "haystack.component.type": instance.__class__.__name__,
785
                "haystack.component.input_types": {k: type(v).__name__ for k, v in inputs.items()},
786
                "haystack.component.input_spec": {
787
                    key: {
788
                        "type": (value.type.__name__ if isinstance(value.type, type) else str(value.type)),
789
                        "senders": value.senders,
790
                    }
791
                    for key, value in instance.__haystack_input__._sockets_dict.items()  # type: ignore
792
                },
793
                "haystack.component.output_spec": {
794
                    key: {
795
                        "type": (value.type.__name__ if isinstance(value.type, type) else str(value.type)),
796
                        "receivers": value.receivers,
797
                    }
798
                    for key, value in instance.__haystack_output__._sockets_dict.items()  # type: ignore
799
                },
800
            },
801
            parent_span=parent_span,
802
        )
803

804
    def _validate_input(self, data: Dict[str, Any]):
1✔
805
        """
806
        Validates pipeline input data.
807

808
        Validates that data:
809
        * Each Component name actually exists in the Pipeline
810
        * Each Component is not missing any input
811
        * Each Component has only one input per input socket, if not variadic
812
        * Each Component doesn't receive inputs that are already sent by another Component
813

814
        :param data:
815
            A dictionary of inputs for the pipeline's components. Each key is a component name.
816

817
        :raises ValueError:
818
            If inputs are invalid according to the above.
819
        """
820
        for component_name, component_inputs in data.items():
1✔
821
            if component_name not in self.graph.nodes:
1✔
822
                raise ValueError(f"Component named {component_name} not found in the pipeline.")
1✔
823
            instance = self.graph.nodes[component_name]["instance"]
1✔
824
            for socket_name, socket in instance.__haystack_input__._sockets_dict.items():
1✔
825
                if socket.senders == [] and socket.is_mandatory and socket_name not in component_inputs:
1✔
826
                    raise ValueError(f"Missing input for component {component_name}: {socket_name}")
1✔
827
            for input_name in component_inputs.keys():
1✔
828
                if input_name not in instance.__haystack_input__._sockets_dict:
1✔
829
                    raise ValueError(f"Input {input_name} not found in component {component_name}.")
1✔
830

831
        for component_name in self.graph.nodes:
1✔
832
            instance = self.graph.nodes[component_name]["instance"]
1✔
833
            for socket_name, socket in instance.__haystack_input__._sockets_dict.items():
1✔
834
                component_inputs = data.get(component_name, {})
1✔
835
                if socket.senders == [] and socket.is_mandatory and socket_name not in component_inputs:
1✔
836
                    raise ValueError(f"Missing input for component {component_name}: {socket_name}")
1✔
837
                if socket.senders and socket_name in component_inputs and not socket.is_variadic:
1✔
838
                    raise ValueError(
1✔
839
                        f"Input {socket_name} for component {component_name} is already sent by {socket.senders}."
840
                    )
841

842
    def _prepare_component_input_data(self, data: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
1✔
843
        """
844
        Prepares input data for pipeline components.
845

846
        Organizes input data for pipeline components and identifies any inputs that are not matched to any
847
        component's input slots. Deep-copies data items to avoid sharing mutables across multiple components.
848

849
        This method processes a flat dictionary of input data, where each key-value pair represents an input name
850
        and its corresponding value. It distributes these inputs to the appropriate pipeline components based on
851
        their input requirements. Inputs that don't match any component's input slots are classified as unresolved.
852

853
        :param data:
854
            A dictionary potentially having input names as keys and input values as values.
855

856
        :returns:
857
            A dictionary mapping component names to their respective matched inputs.
858
        """
859
        # check whether the data is a nested dictionary of component inputs where each key is a component name
860
        # and each value is a dictionary of input parameters for that component
861
        is_nested_component_input = all(isinstance(value, dict) for value in data.values())
1✔
862
        if not is_nested_component_input:
1✔
863
            # flat input, a dict where keys are input names and values are the corresponding values
864
            # we need to convert it to a nested dictionary of component inputs and then run the pipeline
865
            # just like in the previous case
866
            pipeline_input_data: Dict[str, Dict[str, Any]] = defaultdict(dict)
1✔
867
            unresolved_kwargs = {}
1✔
868

869
            # Retrieve the input slots for each component in the pipeline
870
            available_inputs: Dict[str, Dict[str, Any]] = self.inputs()
1✔
871

872
            # Go through all provided to distribute them to the appropriate component inputs
873
            for input_name, input_value in data.items():
1✔
874
                resolved_at_least_once = False
1✔
875

876
                # Check each component to see if it has a slot for the current kwarg
877
                for component_name, component_inputs in available_inputs.items():
1✔
878
                    if input_name in component_inputs:
1✔
879
                        # If a match is found, add the kwarg to the component's input data
880
                        pipeline_input_data[component_name][input_name] = input_value
1✔
881
                        resolved_at_least_once = True
1✔
882

883
                if not resolved_at_least_once:
1✔
884
                    unresolved_kwargs[input_name] = input_value
1✔
885

886
            if unresolved_kwargs:
1✔
887
                logger.warning(
1✔
888
                    "Inputs {input_keys} were not matched to any component inputs, please check your run parameters.",
889
                    input_keys=list(unresolved_kwargs.keys()),
890
                )
891

892
            data = dict(pipeline_input_data)
1✔
893

894
        # deepcopying the inputs prevents the Pipeline run logic from being altered unexpectedly
895
        # when the same input reference is passed to multiple components.
896
        for component_name, component_inputs in data.items():
1✔
897
            data[component_name] = {k: _deepcopy_with_exceptions(v) for k, v in component_inputs.items()}
1✔
898

899
        return data
1✔
900

901
    @classmethod
1✔
902
    def from_template(
1✔
903
        cls, predefined_pipeline: PredefinedPipeline, template_params: Optional[Dict[str, Any]] = None
904
    ) -> "PipelineBase":
905
        """
906
        Create a Pipeline from a predefined template. See `PredefinedPipeline` for available options.
907

908
        :param predefined_pipeline:
909
            The predefined pipeline to use.
910
        :param template_params:
911
            An optional dictionary of parameters to use when rendering the pipeline template.
912
        :returns:
913
            An instance of `Pipeline`.
914
        """
915
        tpl = PipelineTemplate.from_predefined(predefined_pipeline)
1✔
916
        # If tpl.render() fails, we let bubble up the original error
917
        rendered = tpl.render(template_params)
1✔
918

919
        # If there was a problem with the rendered version of the
920
        # template, we add it to the error stack for debugging
921
        try:
1✔
922
            return cls.loads(rendered)
1✔
923
        except Exception as e:
×
924
            msg = f"Error unmarshalling pipeline: {e}\n"
×
925
            msg += f"Source:\n{rendered}"
×
926
            raise PipelineUnmarshalError(msg)
×
927

928
    def _find_receivers_from(self, component_name: str) -> List[Tuple[str, OutputSocket, InputSocket]]:
1✔
929
        """
930
        Utility function to find all Components that receive input from `component_name`.
931

932
        :param component_name:
933
            Name of the sender Component
934

935
        :returns:
936
            List of tuples containing name of the receiver Component and sender OutputSocket
937
            and receiver InputSocket instances
938
        """
939
        res = []
1✔
940
        for _, receiver_name, connection in self.graph.edges(nbunch=component_name, data=True):
1✔
941
            sender_socket: OutputSocket = connection["from_socket"]
1✔
942
            receiver_socket: InputSocket = connection["to_socket"]
1✔
943
            res.append((receiver_name, sender_socket, receiver_socket))
1✔
944
        return res
1✔
945

946
    @staticmethod
1✔
947
    def _convert_to_internal_format(pipeline_inputs: Dict[str, Any]) -> Dict[str, Dict[str, List]]:
1✔
948
        """
949
        Converts the inputs to the pipeline to the format that is needed for the internal `Pipeline.run` logic.
950

951
        Example Input:
952
        {'prompt_builder': {'question': 'Who lives in Paris?'}, 'retriever': {'query': 'Who lives in Paris?'}}
953
        Example Output:
954
        {'prompt_builder': {'question': [{'sender': None, 'value': 'Who lives in Paris?'}]},
955
         'retriever': {'query': [{'sender': None, 'value': 'Who lives in Paris?'}]}}
956

957
        :param pipeline_inputs: Inputs to the pipeline.
958
        :returns: Converted inputs that can be used by the internal `Pipeline.run` logic.
959
        """
960
        inputs: Dict[str, Dict[str, List[Dict[str, Any]]]] = {}
1✔
961
        for component_name, socket_dict in pipeline_inputs.items():
1✔
962
            inputs[component_name] = {}
1✔
963
            for socket_name, value in socket_dict.items():
1✔
964
                inputs[component_name][socket_name] = [{"sender": None, "value": value}]
1✔
965

966
        return inputs
1✔
967

968
    @staticmethod
1✔
969
    def _consume_component_inputs(component_name: str, component: Dict, inputs: Dict) -> Dict[str, Any]:
1✔
970
        """
971
        Extracts the inputs needed to run for the component and removes them from the global inputs state.
972

973
        :param component_name: The name of a component.
974
        :param component: Component with component metadata.
975
        :param inputs: Global inputs state.
976
        :returns: The inputs for the component.
977
        """
978
        component_inputs = inputs.get(component_name, {})
1✔
979
        consumed_inputs = {}
1✔
980
        greedy_inputs_to_remove = set()
1✔
981
        for socket_name, socket in component["input_sockets"].items():
1✔
982
            socket_inputs = component_inputs.get(socket_name, [])
1✔
983
            socket_inputs = [sock["value"] for sock in socket_inputs if sock["value"] is not _NO_OUTPUT_PRODUCED]
1✔
984
            if socket_inputs:
1✔
985
                if not socket.is_variadic:
1✔
986
                    # We only care about the first input provided to the socket.
987
                    consumed_inputs[socket_name] = socket_inputs[0]
1✔
988
                elif socket.is_greedy:
1✔
989
                    # We need to keep track of greedy inputs because we always remove them, even if they come from
990
                    # outside the pipeline. Otherwise, a greedy input from the user would trigger a pipeline to run
991
                    # indefinitely.
992
                    greedy_inputs_to_remove.add(socket_name)
1✔
993
                    consumed_inputs[socket_name] = [socket_inputs[0]]
1✔
994
                elif is_socket_lazy_variadic(socket):
1✔
995
                    # We use all inputs provided to the socket on a lazy variadic socket.
996
                    consumed_inputs[socket_name] = socket_inputs
1✔
997

998
        # We prune all inputs except for those that were provided from outside the pipeline (e.g. user inputs).
999
        pruned_inputs = {
1✔
1000
            socket_name: [
1001
                sock for sock in socket if sock["sender"] is None and not socket_name in greedy_inputs_to_remove
1002
            ]
1003
            for socket_name, socket in component_inputs.items()
1004
        }
1005
        pruned_inputs = {socket_name: socket for socket_name, socket in pruned_inputs.items() if len(socket) > 0}
1✔
1006

1007
        inputs[component_name] = pruned_inputs
1✔
1008

1009
        return consumed_inputs
1✔
1010

1011
    def _fill_queue(
1✔
1012
        self, component_names: List[str], inputs: Dict[str, Any], component_visits: Dict[str, int]
1013
    ) -> FIFOPriorityQueue:
1014
        """
1015
        Calculates the execution priority for each component and inserts it into the priority queue.
1016

1017
        :param component_names: Names of the components to put into the queue.
1018
        :param inputs: Inputs to the components.
1019
        :param component_visits: Current state of component visits.
1020
        :returns: A prioritized queue of component names.
1021
        """
1022
        priority_queue = FIFOPriorityQueue()
1✔
1023
        for component_name in component_names:
1✔
1024
            component = self._get_component_with_graph_metadata_and_visits(
1✔
1025
                component_name, component_visits[component_name]
1026
            )
1027
            priority = self._calculate_priority(component, inputs.get(component_name, {}))
1✔
1028
            priority_queue.push(component_name, priority)
1✔
1029

1030
        return priority_queue
1✔
1031

1032
    @staticmethod
1✔
1033
    def _calculate_priority(component: Dict, inputs: Dict) -> ComponentPriority:
1✔
1034
        """
1035
        Calculates the execution priority for a component depending on the component's inputs.
1036

1037
        :param component: Component metadata and component instance.
1038
        :param inputs: Inputs to the component.
1039
        :returns: Priority value for the component.
1040
        """
1041
        if not can_component_run(component, inputs):
1✔
1042
            return ComponentPriority.BLOCKED
1✔
1043
        elif is_any_greedy_socket_ready(component, inputs) and are_all_sockets_ready(component, inputs):
1✔
1044
            return ComponentPriority.HIGHEST
1✔
1045
        elif all_predecessors_executed(component, inputs):
1✔
1046
            return ComponentPriority.READY
1✔
1047
        elif are_all_lazy_variadic_sockets_resolved(component, inputs):
1✔
1048
            return ComponentPriority.DEFER
1✔
1049
        else:
1050
            return ComponentPriority.DEFER_LAST
1✔
1051

1052
    def _get_component_with_graph_metadata_and_visits(self, component_name: str, visits: int) -> Dict[str, Any]:
1✔
1053
        """
1054
        Returns the component instance alongside input/output-socket metadata from the graph and adds current visits.
1055

1056
        We can't store visits in the pipeline graph because this would prevent reentrance / thread-safe execution.
1057

1058
        :param component_name: The name of the component.
1059
        :param visits: Number of visits for the component.
1060
        :returns: Dict including component instance, input/output-sockets and visits.
1061
        """
1062
        comp_dict = self.graph.nodes[component_name]
1✔
1063
        comp_dict = {**comp_dict, "visits": visits}
1✔
1064
        return comp_dict
1✔
1065

1066
    def _get_next_runnable_component(
1✔
1067
        self, priority_queue: FIFOPriorityQueue, component_visits: Dict[str, int]
1068
    ) -> Union[Tuple[ComponentPriority, str, Dict[str, Any]], None]:
1069
        """
1070
        Returns the next runnable component alongside its metadata from the priority queue.
1071

1072
        :param priority_queue: Priority queue of component names.
1073
        :param component_visits: Current state of component visits.
1074
        :returns: The next runnable component, the component name, and its priority
1075
            or None if no component in the queue can run.
1076
        :raises: PipelineMaxComponentRuns if the next runnable component has exceeded the maximum number of runs.
1077
        """
1078
        priority_and_component_name: Union[Tuple[ComponentPriority, str], None] = (
1✔
1079
            None if (item := priority_queue.get()) is None else (ComponentPriority(item[0]), str(item[1]))
1080
        )
1081

1082
        if priority_and_component_name is not None and priority_and_component_name[0] != ComponentPriority.BLOCKED:
1✔
1083
            priority, component_name = priority_and_component_name
1✔
1084
            component = self._get_component_with_graph_metadata_and_visits(
1✔
1085
                component_name, component_visits[component_name]
1086
            )
1087
            if component["visits"] > self._max_runs_per_component:
1✔
1088
                msg = f"Maximum run count {self._max_runs_per_component} reached for component '{component_name}'"
1✔
1089
                raise PipelineMaxComponentRuns(msg)
1✔
1090

1091
            return priority, component_name, component
1✔
1092

1093
        return None
1✔
1094

1095
    @staticmethod
1✔
1096
    def _add_missing_input_defaults(component_inputs: Dict[str, Any], component_input_sockets: Dict[str, InputSocket]):
1✔
1097
        """
1098
        Updates the inputs with the default values for the inputs that are missing
1099

1100
        :param component_inputs: Inputs for the component.
1101
        :param component_input_sockets: Input sockets of the component.
1102
        """
1103
        for name, socket in component_input_sockets.items():
1✔
1104
            if not socket.is_mandatory and name not in component_inputs:
1✔
1105
                if socket.is_variadic:
1✔
1106
                    component_inputs[name] = [socket.default_value]
1✔
1107
                else:
1108
                    component_inputs[name] = socket.default_value
1✔
1109

1110
        return component_inputs
1✔
1111

1112
    def _tiebreak_waiting_components(
1✔
1113
        self,
1114
        component_name: str,
1115
        priority: ComponentPriority,
1116
        priority_queue: FIFOPriorityQueue,
1117
        topological_sort: Union[Dict[str, int], None],
1118
    ):
1119
        """
1120
        Decides which component to run when multiple components are waiting for inputs with the same priority.
1121

1122
        :param component_name: The name of the component.
1123
        :param priority: Priority of the component.
1124
        :param priority_queue: Priority queue of component names.
1125
        :param topological_sort: Cached topological sort of all components in the pipeline.
1126
        """
1127
        components_with_same_priority = [component_name]
1✔
1128

1129
        while len(priority_queue) > 0:
1✔
1130
            next_priority, next_component_name = priority_queue.peek()
1✔
1131
            if next_priority == priority:
1✔
1132
                priority_queue.pop()  # actually remove the component
×
1133
                components_with_same_priority.append(next_component_name)
×
1134
            else:
1135
                break
×
1136

1137
        if len(components_with_same_priority) > 1:
1✔
1138
            if topological_sort is None:
×
1139
                if networkx.is_directed_acyclic_graph(self.graph):
×
1140
                    topological_sort = networkx.lexicographical_topological_sort(self.graph)
×
1141
                    topological_sort = {node: idx for idx, node in enumerate(topological_sort)}
×
1142
                else:
1143
                    condensed = networkx.condensation(self.graph)
×
1144
                    condensed_sorted = {node: idx for idx, node in enumerate(networkx.topological_sort(condensed))}
×
1145
                    topological_sort = {
×
1146
                        component_name: condensed_sorted[node]
1147
                        for component_name, node in condensed.graph["mapping"].items()
1148
                    }
1149

1150
            components_with_same_priority = sorted(
×
1151
                components_with_same_priority, key=lambda comp_name: (topological_sort[comp_name], comp_name.lower())
1152
            )
1153

1154
            component_name = components_with_same_priority[0]
×
1155

1156
        return component_name, topological_sort
1✔
1157

1158
    @staticmethod
1✔
1159
    def _write_component_outputs(
1✔
1160
        component_name: str,
1161
        component_outputs: Dict[str, Any],
1162
        inputs: Dict[str, Any],
1163
        receivers: List[Tuple],
1164
        include_outputs_from: Set[str],
1165
    ) -> Dict[str, Any]:
1166
        """
1167
        Distributes the outputs of a component to the input sockets that it is connected to.
1168

1169
        :param component_name: The name of the component.
1170
        :param component_outputs: The outputs of the component.
1171
        :param inputs: The current global input state.
1172
        :param receivers: List of components that receive inputs from the component.
1173
        :param include_outputs_from: List of component names that should always return an output from the pipeline.
1174
        """
1175
        for receiver_name, sender_socket, receiver_socket in receivers:
1✔
1176
            # We either get the value that was produced by the actor or we use the _NO_OUTPUT_PRODUCED class to indicate
1177
            # that the sender did not produce an output for this socket.
1178
            # This allows us to track if a pre-decessor already ran but did not produce an output.
1179
            value = component_outputs.get(sender_socket.name, _NO_OUTPUT_PRODUCED)
1✔
1180

1181
            if receiver_name not in inputs:
1✔
1182
                inputs[receiver_name] = {}
1✔
1183

1184
            if is_socket_lazy_variadic(receiver_socket):
1✔
1185
                # If the receiver socket is lazy variadic, we append the new input.
1186
                # Lazy variadic sockets can collect multiple inputs.
1187
                _write_to_lazy_variadic_socket(
1✔
1188
                    inputs=inputs,
1189
                    receiver_name=receiver_name,
1190
                    receiver_socket_name=receiver_socket.name,
1191
                    component_name=component_name,
1192
                    value=value,
1193
                )
1194
            else:
1195
                # If the receiver socket is not lazy variadic, it is greedy variadic or non-variadic.
1196
                # We overwrite with the new input if it's not _NO_OUTPUT_PRODUCED or if the current value is None.
1197
                _write_to_standard_socket(
1✔
1198
                    inputs=inputs,
1199
                    receiver_name=receiver_name,
1200
                    receiver_socket_name=receiver_socket.name,
1201
                    component_name=component_name,
1202
                    value=value,
1203
                )
1204

1205
        # If we want to include all outputs from this actor in the final outputs, we don't need to prune any consumed
1206
        # outputs
1207
        if component_name in include_outputs_from:
1✔
1208
            return component_outputs
1✔
1209

1210
        # We prune outputs that were consumed by any receiving sockets.
1211
        # All remaining outputs will be added to the final outputs of the pipeline.
1212
        consumed_outputs = {sender_socket.name for _, sender_socket, __ in receivers}
1✔
1213
        pruned_outputs = {key: value for key, value in component_outputs.items() if key not in consumed_outputs}
1✔
1214

1215
        return pruned_outputs
1✔
1216

1217
    @staticmethod
1✔
1218
    def _is_queue_stale(priority_queue: FIFOPriorityQueue) -> bool:
1✔
1219
        """
1220
        Checks if the priority queue needs to be recomputed because the priorities might have changed.
1221

1222
        :param priority_queue: Priority queue of component names.
1223
        """
1224
        return len(priority_queue) == 0 or priority_queue.peek()[0] > ComponentPriority.READY
1✔
1225

1226
    @staticmethod
1✔
1227
    def validate_pipeline(priority_queue: FIFOPriorityQueue) -> None:
1✔
1228
        """
1229
        Validate the pipeline to check if it is blocked or has no valid entry point.
1230

1231
        :param priority_queue: Priority queue of component names.
1232
        :raises PipelineRuntimeError:
1233
            If the pipeline is blocked or has no valid entry point.
1234
        """
1235
        if len(priority_queue) == 0:
1✔
1236
            return
×
1237

1238
        candidate = priority_queue.peek()
1✔
1239
        if candidate is not None and candidate[0] == ComponentPriority.BLOCKED:
1✔
1240
            raise PipelineComponentsBlockedError()
×
1241

1242

1243
def _connections_status(
1✔
1244
    sender_node: str, receiver_node: str, sender_sockets: List[OutputSocket], receiver_sockets: List[InputSocket]
1245
) -> str:
1246
    """
1247
    Lists the status of the sockets, for error messages.
1248
    """
1249
    sender_sockets_entries = []
1✔
1250
    for sender_socket in sender_sockets:
1✔
1251
        sender_sockets_entries.append(f" - {sender_socket.name}: {_type_name(sender_socket.type)}")
1✔
1252
    sender_sockets_list = "\n".join(sender_sockets_entries)
1✔
1253

1254
    receiver_sockets_entries = []
1✔
1255
    for receiver_socket in receiver_sockets:
1✔
1256
        if receiver_socket.senders:
1✔
1257
            sender_status = f"sent by {','.join(receiver_socket.senders)}"
1✔
1258
        else:
1259
            sender_status = "available"
1✔
1260
        receiver_sockets_entries.append(
1✔
1261
            f" - {receiver_socket.name}: {_type_name(receiver_socket.type)} ({sender_status})"
1262
        )
1263
    receiver_sockets_list = "\n".join(receiver_sockets_entries)
1✔
1264

1265
    return f"'{sender_node}':\n{sender_sockets_list}\n'{receiver_node}':\n{receiver_sockets_list}"
1✔
1266

1267

1268
# Utility functions for writing to sockets
1269

1270

1271
def _write_to_lazy_variadic_socket(
1✔
1272
    inputs: Dict[str, Any], receiver_name: str, receiver_socket_name: str, component_name: str, value: Any
1273
) -> None:
1274
    """
1275
    Write to a lazy variadic socket.
1276

1277
    Mutates inputs in place.
1278
    """
1279
    if not inputs[receiver_name].get(receiver_socket_name):
1✔
1280
        inputs[receiver_name][receiver_socket_name] = []
1✔
1281

1282
    inputs[receiver_name][receiver_socket_name].append({"sender": component_name, "value": value})
1✔
1283

1284

1285
def _write_to_standard_socket(
1✔
1286
    inputs: Dict[str, Any], receiver_name: str, receiver_socket_name: str, component_name: str, value: Any
1287
) -> None:
1288
    """
1289
    Write to a greedy variadic or non-variadic socket.
1290

1291
    Mutates inputs in place.
1292
    """
1293
    current_value = inputs[receiver_name].get(receiver_socket_name)
1✔
1294

1295
    # Only overwrite if there's no existing value, or we have a new value to provide
1296
    if current_value is None or value is not _NO_OUTPUT_PRODUCED:
1✔
1297
        inputs[receiver_name][receiver_socket_name] = [{"sender": component_name, "value": value}]
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc