• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 13432090183

20 Feb 2025 09:30AM UTC coverage: 91.155% (+0.007%) from 91.148%
13432090183

Pull #8875

github

web-flow
Merge 013fa6d37 into 8cafcddb0
Pull Request #8875: feat: Add Type Validation parameter for Pipeline Connections

9430 of 10345 relevant lines covered (91.16%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.66
haystack/core/pipeline/base.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import itertools
1✔
6
from collections import defaultdict
1✔
7
from copy import deepcopy
1✔
8
from datetime import datetime
1✔
9
from enum import IntEnum
1✔
10
from pathlib import Path
1✔
11
from typing import Any, Dict, Iterator, List, Optional, TextIO, Tuple, Type, TypeVar, Union
1✔
12

13
import networkx  # type:ignore
1✔
14

15
from haystack import logging
1✔
16
from haystack.core.component import Component, InputSocket, OutputSocket, component
1✔
17
from haystack.core.errors import (
1✔
18
    DeserializationError,
19
    PipelineConnectError,
20
    PipelineDrawingError,
21
    PipelineError,
22
    PipelineMaxComponentRuns,
23
    PipelineRuntimeError,
24
    PipelineUnmarshalError,
25
    PipelineValidationError,
26
)
27
from haystack.core.pipeline.component_checks import (
1✔
28
    _NO_OUTPUT_PRODUCED,
29
    all_predecessors_executed,
30
    are_all_lazy_variadic_sockets_resolved,
31
    are_all_sockets_ready,
32
    can_component_run,
33
    is_any_greedy_socket_ready,
34
    is_socket_lazy_variadic,
35
)
36
from haystack.core.pipeline.utils import FIFOPriorityQueue, parse_connect_string
1✔
37
from haystack.core.serialization import DeserializationCallbacks, component_from_dict, component_to_dict
1✔
38
from haystack.core.type_utils import _type_name, _types_are_compatible
1✔
39
from haystack.marshal import Marshaller, YamlMarshaller
1✔
40
from haystack.utils import is_in_jupyter, type_serialization
1✔
41

42
from .descriptions import find_pipeline_inputs, find_pipeline_outputs
1✔
43
from .draw import _to_mermaid_image
1✔
44
from .template import PipelineTemplate, PredefinedPipeline
1✔
45

46
DEFAULT_MARSHALLER = YamlMarshaller()
1✔
47

48
# We use a generic type to annotate the return value of class methods,
49
# so that static analyzers won't be confused when derived classes
50
# use those methods.
51
T = TypeVar("T", bound="PipelineBase")
1✔
52

53
logger = logging.getLogger(__name__)
1✔
54

55

56
class ComponentPriority(IntEnum):
1✔
57
    HIGHEST = 1
1✔
58
    READY = 2
1✔
59
    DEFER = 3
1✔
60
    DEFER_LAST = 4
1✔
61
    BLOCKED = 5
1✔
62

63

64
class PipelineBase:
1✔
65
    """
66
    Components orchestration engine.
67

68
    Builds a graph of components and orchestrates their execution according to the execution graph.
69
    """
70

71
    def __init__(
1✔
72
        self,
73
        metadata: Optional[Dict[str, Any]] = None,
74
        max_runs_per_component: int = 100,
75
        connection_type_validation: bool = True,
76
    ):
77
        """
78
        Creates the Pipeline.
79

80
        :param metadata:
81
            Arbitrary dictionary to store metadata about this `Pipeline`. Make sure all the values contained in
82
            this dictionary can be serialized and deserialized if you wish to save this `Pipeline` to file.
83
        :param max_runs_per_component:
84
            How many times the `Pipeline` can run the same Component.
85
            If this limit is reached a `PipelineMaxComponentRuns` exception is raised.
86
            If not set defaults to 100 runs per Component.
87
        :param connection_type_validation: Whether the pipeline will validate the types of the connections.
88
            Defaults to True.
89
        """
90
        self._telemetry_runs = 0
1✔
91
        self._last_telemetry_sent: Optional[datetime] = None
1✔
92
        self.metadata = metadata or {}
1✔
93
        self.graph = networkx.MultiDiGraph()
1✔
94
        self._max_runs_per_component = max_runs_per_component
1✔
95
        self._connection_type_validation = connection_type_validation
1✔
96

97
    def __eq__(self, other) -> bool:
1✔
98
        """
99
        Pipeline equality is defined by their type and the equality of their serialized form.
100

101
        Pipelines of the same type share every metadata, node and edge, but they're not required to use
102
        the same node instances: this allows pipeline saved and then loaded back to be equal to themselves.
103
        """
104
        if not isinstance(self, type(other)):
1✔
105
            return False
×
106
        return self.to_dict() == other.to_dict()
1✔
107

108
    def __repr__(self) -> str:
1✔
109
        """
110
        Returns a text representation of the Pipeline.
111
        """
112
        res = f"{object.__repr__(self)}\n"
1✔
113
        if self.metadata:
1✔
114
            res += "🧱 Metadata\n"
1✔
115
            for k, v in self.metadata.items():
1✔
116
                res += f"  - {k}: {v}\n"
1✔
117

118
        res += "🚅 Components\n"
1✔
119
        for name, instance in self.graph.nodes(data="instance"):  # type: ignore # type wrongly defined in networkx
1✔
120
            res += f"  - {name}: {instance.__class__.__name__}\n"
1✔
121

122
        res += "🛤️ Connections\n"
1✔
123
        for sender, receiver, edge_data in self.graph.edges(data=True):
1✔
124
            sender_socket = edge_data["from_socket"].name
1✔
125
            receiver_socket = edge_data["to_socket"].name
1✔
126
            res += f"  - {sender}.{sender_socket} -> {receiver}.{receiver_socket} ({edge_data['conn_type']})\n"
1✔
127

128
        return res
1✔
129

130
    def to_dict(self) -> Dict[str, Any]:
1✔
131
        """
132
        Serializes the pipeline to a dictionary.
133

134
        This is meant to be an intermediate representation but it can be also used to save a pipeline to file.
135

136
        :returns:
137
            Dictionary with serialized data.
138
        """
139
        components = {}
1✔
140
        for name, instance in self.graph.nodes(data="instance"):  # type:ignore
1✔
141
            components[name] = component_to_dict(instance, name)
1✔
142

143
        connections = []
1✔
144
        for sender, receiver, edge_data in self.graph.edges.data():
1✔
145
            sender_socket = edge_data["from_socket"].name
1✔
146
            receiver_socket = edge_data["to_socket"].name
1✔
147
            connections.append(
1✔
148
                {
149
                    "sender": f"{sender}.{sender_socket}",
150
                    "receiver": f"{receiver}.{receiver_socket}",
151
                    "connection_type_validation": edge_data["connection_type_validation"],
152
                }
153
            )
154
        return {
1✔
155
            "metadata": self.metadata,
156
            "max_runs_per_component": self._max_runs_per_component,
157
            "components": components,
158
            "connections": connections,
159
        }
160

161
    @classmethod
1✔
162
    def from_dict(
1✔
163
        cls: Type[T], data: Dict[str, Any], callbacks: Optional[DeserializationCallbacks] = None, **kwargs
164
    ) -> T:
165
        """
166
        Deserializes the pipeline from a dictionary.
167

168
        :param data:
169
            Dictionary to deserialize from.
170
        :param callbacks:
171
            Callbacks to invoke during deserialization.
172
        :param kwargs:
173
            `components`: a dictionary of {name: instance} to reuse instances of components instead of creating new
174
            ones.
175
        :returns:
176
            Deserialized component.
177
        """
178
        data_copy = deepcopy(data)  # to prevent modification of original data
1✔
179
        metadata = data_copy.get("metadata", {})
1✔
180
        max_runs_per_component = data_copy.get("max_runs_per_component", 100)
1✔
181
        pipe = cls(metadata=metadata, max_runs_per_component=max_runs_per_component)
1✔
182
        components_to_reuse = kwargs.get("components", {})
1✔
183
        for name, component_data in data_copy.get("components", {}).items():
1✔
184
            if name in components_to_reuse:
1✔
185
                # Reuse an instance
186
                instance = components_to_reuse[name]
1✔
187
            else:
188
                if "type" not in component_data:
1✔
189
                    raise PipelineError(f"Missing 'type' in component '{name}'")
1✔
190

191
                if component_data["type"] not in component.registry:
1✔
192
                    try:
1✔
193
                        # Import the module first...
194
                        module, _ = component_data["type"].rsplit(".", 1)
1✔
195
                        logger.debug("Trying to import module {module_name}", module_name=module)
1✔
196
                        type_serialization.thread_safe_import(module)
1✔
197
                        # ...then try again
198
                        if component_data["type"] not in component.registry:
1✔
199
                            raise PipelineError(
×
200
                                f"Successfully imported module {module} but can't find it in the component registry."
201
                                "This is unexpected and most likely a bug."
202
                            )
203
                    except (ImportError, PipelineError, ValueError) as e:
1✔
204
                        raise PipelineError(
1✔
205
                            f"Component '{component_data['type']}' (name: '{name}') not imported."
206
                        ) from e
207

208
                # Create a new one
209
                component_class = component.registry[component_data["type"]]
1✔
210

211
                try:
1✔
212
                    instance = component_from_dict(component_class, component_data, name, callbacks)
1✔
213
                except Exception as e:
1✔
214
                    msg = (
1✔
215
                        f"Couldn't deserialize component '{name}' of class '{component_class.__name__}' "
216
                        f"with the following data: {str(component_data)}. Possible reasons include "
217
                        "malformed serialized data, mismatch between the serialized component and the "
218
                        "loaded one (due to a breaking change, see "
219
                        "https://github.com/deepset-ai/haystack/releases), etc."
220
                    )
221
                    raise DeserializationError(msg) from e
1✔
222
            pipe.add_component(name=name, instance=instance)
1✔
223

224
        for connection in data.get("connections", []):
1✔
225
            if "sender" not in connection:
1✔
226
                raise PipelineError(f"Missing sender in connection: {connection}")
1✔
227
            if "receiver" not in connection:
1✔
228
                raise PipelineError(f"Missing receiver in connection: {connection}")
1✔
229
            pipe.connect(
1✔
230
                sender=connection["sender"],
231
                receiver=connection["receiver"],
232
                connection_type_validation=connection.get("connection_type_validation"),
233
            )
234

235
        return pipe
1✔
236

237
    def dumps(self, marshaller: Marshaller = DEFAULT_MARSHALLER) -> str:
1✔
238
        """
239
        Returns the string representation of this pipeline according to the format dictated by the `Marshaller` in use.
240

241
        :param marshaller:
242
            The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
243
        :returns:
244
            A string representing the pipeline.
245
        """
246
        return marshaller.marshal(self.to_dict())
1✔
247

248
    def dump(self, fp: TextIO, marshaller: Marshaller = DEFAULT_MARSHALLER):
1✔
249
        """
250
        Writes the string representation of this pipeline to the file-like object passed in the `fp` argument.
251

252
        :param fp:
253
            A file-like object ready to be written to.
254
        :param marshaller:
255
            The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
256
        """
257
        fp.write(marshaller.marshal(self.to_dict()))
1✔
258

259
    @classmethod
1✔
260
    def loads(
1✔
261
        cls: Type[T],
262
        data: Union[str, bytes, bytearray],
263
        marshaller: Marshaller = DEFAULT_MARSHALLER,
264
        callbacks: Optional[DeserializationCallbacks] = None,
265
    ) -> T:
266
        """
267
        Creates a `Pipeline` object from the string representation passed in the `data` argument.
268

269
        :param data:
270
            The string representation of the pipeline, can be `str`, `bytes` or `bytearray`.
271
        :param marshaller:
272
            The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
273
        :param callbacks:
274
            Callbacks to invoke during deserialization.
275
        :raises DeserializationError:
276
            If an error occurs during deserialization.
277
        :returns:
278
            A `Pipeline` object.
279
        """
280
        try:
1✔
281
            deserialized_data = marshaller.unmarshal(data)
1✔
282
        except Exception as e:
1✔
283
            raise DeserializationError(
1✔
284
                "Error while unmarshalling serialized pipeline data. This is usually "
285
                "caused by malformed or invalid syntax in the serialized representation."
286
            ) from e
287

288
        return cls.from_dict(deserialized_data, callbacks)
1✔
289

290
    @classmethod
1✔
291
    def load(
1✔
292
        cls: Type[T],
293
        fp: TextIO,
294
        marshaller: Marshaller = DEFAULT_MARSHALLER,
295
        callbacks: Optional[DeserializationCallbacks] = None,
296
    ) -> T:
297
        """
298
        Creates a `Pipeline` object a string representation.
299

300
        The string representation is read from the file-like object passed in the `fp` argument.
301

302

303
        :param fp:
304
            A file-like object ready to be read from.
305
        :param marshaller:
306
            The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
307
        :param callbacks:
308
            Callbacks to invoke during deserialization.
309
        :raises DeserializationError:
310
            If an error occurs during deserialization.
311
        :returns:
312
            A `Pipeline` object.
313
        """
314
        return cls.loads(fp.read(), marshaller, callbacks)
1✔
315

316
    def add_component(self, name: str, instance: Component) -> None:
1✔
317
        """
318
        Add the given component to the pipeline.
319

320
        Components are not connected to anything by default: use `Pipeline.connect()` to connect components together.
321
        Component names must be unique, but component instances can be reused if needed.
322

323
        :param name:
324
            The name of the component to add.
325
        :param instance:
326
            The component instance to add.
327

328
        :raises ValueError:
329
            If a component with the same name already exists.
330
        :raises PipelineValidationError:
331
            If the given instance is not a component.
332
        """
333
        # Component names are unique
334
        if name in self.graph.nodes:
1✔
335
            raise ValueError(f"A component named '{name}' already exists in this pipeline: choose another name.")
×
336

337
        # Components can't be named `_debug`
338
        if name == "_debug":
1✔
339
            raise ValueError("'_debug' is a reserved name for debug output. Choose another name.")
×
340

341
        # Component instances must be components
342
        if not isinstance(instance, Component):
1✔
343
            raise PipelineValidationError(
×
344
                f"'{type(instance)}' doesn't seem to be a component. Is this class decorated with @component?"
345
            )
346

347
        if getattr(instance, "__haystack_added_to_pipeline__", None):
1✔
348
            msg = (
1✔
349
                "Component has already been added in another Pipeline. Components can't be shared between Pipelines. "
350
                "Create a new instance instead."
351
            )
352
            raise PipelineError(msg)
1✔
353

354
        setattr(instance, "__haystack_added_to_pipeline__", self)
1✔
355

356
        # Add component to the graph, disconnected
357
        logger.debug("Adding component '{component_name}' ({component})", component_name=name, component=instance)
1✔
358
        # We're completely sure the fields exist so we ignore the type error
359
        self.graph.add_node(
1✔
360
            name,
361
            instance=instance,
362
            input_sockets=instance.__haystack_input__._sockets_dict,  # type: ignore[attr-defined]
363
            output_sockets=instance.__haystack_output__._sockets_dict,  # type: ignore[attr-defined]
364
            visits=0,
365
        )
366

367
    def remove_component(self, name: str) -> Component:
1✔
368
        """
369
        Remove and returns component from the pipeline.
370

371
        Remove an existing component from the pipeline by providing its name.
372
        All edges that connect to the component will also be deleted.
373

374
        :param name:
375
            The name of the component to remove.
376
        :returns:
377
            The removed Component instance.
378

379
        :raises ValueError:
380
            If there is no component with that name already in the Pipeline.
381
        """
382

383
        # Check that a component with that name is in the Pipeline
384
        try:
1✔
385
            instance = self.get_component(name)
1✔
386
        except ValueError as exc:
1✔
387
            raise ValueError(
1✔
388
                f"There is no component named '{name}' in the pipeline. The valid component names are: ",
389
                ", ".join(n for n in self.graph.nodes),
390
            ) from exc
391

392
        # Delete component from the graph, deleting all its connections
393
        self.graph.remove_node(name)
1✔
394

395
        # Reset the Component sockets' senders and receivers
396
        input_sockets = instance.__haystack_input__._sockets_dict  # type: ignore[attr-defined]
1✔
397
        for socket in input_sockets.values():
1✔
398
            socket.senders = []
1✔
399

400
        output_sockets = instance.__haystack_output__._sockets_dict  # type: ignore[attr-defined]
1✔
401
        for socket in output_sockets.values():
1✔
402
            socket.receivers = []
1✔
403

404
        # Reset the Component's pipeline reference
405
        setattr(instance, "__haystack_added_to_pipeline__", None)
1✔
406

407
        return instance
1✔
408

409
    def connect(  # noqa: PLR0915 PLR0912
1✔
410
        self, sender: str, receiver: str, connection_type_validation: Optional[bool] = None
411
    ) -> "PipelineBase":
412
        """
413
        Connects two components together.
414

415
        All components to connect must exist in the pipeline.
416
        If connecting to a component that has several output connections, specify the inputs and output names as
417
        'component_name.connections_name'.
418

419
        :param sender:
420
            The component that delivers the value. This can be either just a component name or can be
421
            in the format `component_name.connection_name` if the component has multiple outputs.
422
        :param receiver:
423
            The component that receives the value. This can be either just a component name or can be
424
            in the format `component_name.connection_name` if the component has multiple inputs.
425
        :param connection_type_validation: Whether the pipeline will validate the types of the connections.
426
            Defaults to the value set in the pipeline.
427
        :returns:
428
            The Pipeline instance.
429

430
        :raises PipelineConnectError:
431
            If the two components cannot be connected (for example if one of the components is
432
            not present in the pipeline, or the connections don't match by type, and so on).
433
        """
434
        resolved_connection_type_validation = connection_type_validation or self._connection_type_validation
1✔
435

436
        # Edges may be named explicitly by passing 'node_name.edge_name' to connect().
437
        sender_component_name, sender_socket_name = parse_connect_string(sender)
1✔
438
        receiver_component_name, receiver_socket_name = parse_connect_string(receiver)
1✔
439

440
        if sender_component_name == receiver_component_name:
1✔
441
            raise PipelineConnectError("Connecting a Component to itself is not supported.")
1✔
442

443
        # Get the nodes data.
444
        try:
1✔
445
            sender_sockets = self.graph.nodes[sender_component_name]["output_sockets"]
1✔
446
        except KeyError as exc:
1✔
447
            raise ValueError(f"Component named {sender_component_name} not found in the pipeline.") from exc
1✔
448
        try:
1✔
449
            receiver_sockets = self.graph.nodes[receiver_component_name]["input_sockets"]
1✔
450
        except KeyError as exc:
1✔
451
            raise ValueError(f"Component named {receiver_component_name} not found in the pipeline.") from exc
1✔
452

453
        # If the name of either socket is given, get the socket
454
        sender_socket: Optional[OutputSocket] = None
1✔
455
        if sender_socket_name:
1✔
456
            sender_socket = sender_sockets.get(sender_socket_name)
1✔
457
            if not sender_socket:
1✔
458
                raise PipelineConnectError(
1✔
459
                    f"'{sender} does not exist. "
460
                    f"Output connections of {sender_component_name} are: "
461
                    + ", ".join([f"{name} (type {_type_name(socket.type)})" for name, socket in sender_sockets.items()])
462
                )
463

464
        receiver_socket: Optional[InputSocket] = None
1✔
465
        if receiver_socket_name:
1✔
466
            receiver_socket = receiver_sockets.get(receiver_socket_name)
1✔
467
            if not receiver_socket:
1✔
468
                raise PipelineConnectError(
1✔
469
                    f"'{receiver} does not exist. "
470
                    f"Input connections of {receiver_component_name} are: "
471
                    + ", ".join(
472
                        [f"{name} (type {_type_name(socket.type)})" for name, socket in receiver_sockets.items()]
473
                    )
474
                )
475

476
        # Look for a matching connection among the possible ones.
477
        # Note that if there is more than one possible connection but two sockets match by name, they're paired.
478
        sender_socket_candidates: List[OutputSocket] = (
1✔
479
            [sender_socket] if sender_socket else list(sender_sockets.values())
480
        )
481
        receiver_socket_candidates: List[InputSocket] = (
1✔
482
            [receiver_socket] if receiver_socket else list(receiver_sockets.values())
483
        )
484

485
        # Find all possible connections between these two components
486
        possible_connections = []
1✔
487
        for sender_sock, receiver_sock in itertools.product(sender_socket_candidates, receiver_socket_candidates):
1✔
488
            if _types_are_compatible(sender_sock.type, receiver_sock.type, resolved_connection_type_validation):
1✔
489
                possible_connections.append((sender_sock, receiver_sock))
1✔
490

491
        # We need this status for error messages, since we might need it in multiple places we calculate it here
492
        status = _connections_status(
1✔
493
            sender_node=sender_component_name,
494
            sender_sockets=sender_socket_candidates,
495
            receiver_node=receiver_component_name,
496
            receiver_sockets=receiver_socket_candidates,
497
        )
498

499
        if not possible_connections:
1✔
500
            # There's no possible connection between these two components
501
            if len(sender_socket_candidates) == len(receiver_socket_candidates) == 1:
1✔
502
                msg = (
1✔
503
                    f"Cannot connect '{sender_component_name}.{sender_socket_candidates[0].name}' with "
504
                    f"'{receiver_component_name}.{receiver_socket_candidates[0].name}': "
505
                    f"their declared input and output types do not match.\n{status}"
506
                )
507
            else:
508
                msg = (
×
509
                    f"Cannot connect '{sender_component_name}' with '{receiver_component_name}': "
510
                    f"no matching connections available.\n{status}"
511
                )
512
            raise PipelineConnectError(msg)
1✔
513

514
        if len(possible_connections) == 1:
1✔
515
            # There's only one possible connection, use it
516
            sender_socket = possible_connections[0][0]
1✔
517
            receiver_socket = possible_connections[0][1]
1✔
518

519
        if len(possible_connections) > 1:
1✔
520
            # There are multiple possible connection, let's try to match them by name
521
            name_matches = [
1✔
522
                (out_sock, in_sock) for out_sock, in_sock in possible_connections if in_sock.name == out_sock.name
523
            ]
524
            if len(name_matches) != 1:
1✔
525
                # There's are either no matches or more than one, we can't pick one reliably
526
                msg = (
1✔
527
                    f"Cannot connect '{sender_component_name}' with "
528
                    f"'{receiver_component_name}': more than one connection is possible "
529
                    "between these components. Please specify the connection name, like: "
530
                    f"pipeline.connect('{sender_component_name}.{possible_connections[0][0].name}', "
531
                    f"'{receiver_component_name}.{possible_connections[0][1].name}').\n{status}"
532
                )
533
                raise PipelineConnectError(msg)
1✔
534

535
            # Get the only possible match
536
            sender_socket = name_matches[0][0]
1✔
537
            receiver_socket = name_matches[0][1]
1✔
538

539
        # Connection must be valid on both sender/receiver sides
540
        if not sender_socket or not receiver_socket or not sender_component_name or not receiver_component_name:
1✔
541
            if sender_component_name and sender_socket:
×
542
                sender_repr = f"{sender_component_name}.{sender_socket.name} ({_type_name(sender_socket.type)})"
×
543
            else:
544
                sender_repr = "input needed"
×
545

546
            if receiver_component_name and receiver_socket:
×
547
                receiver_repr = f"({_type_name(receiver_socket.type)}) {receiver_component_name}.{receiver_socket.name}"
×
548
            else:
549
                receiver_repr = "output"
×
550
            msg = f"Connection must have both sender and receiver: {sender_repr} -> {receiver_repr}"
×
551
            raise PipelineConnectError(msg)
×
552

553
        logger.debug(
1✔
554
            "Connecting '{sender_component}.{sender_socket_name}' to '{receiver_component}.{receiver_socket_name}'",
555
            sender_component=sender_component_name,
556
            sender_socket_name=sender_socket.name,
557
            receiver_component=receiver_component_name,
558
            receiver_socket_name=receiver_socket.name,
559
        )
560

561
        if receiver_component_name in sender_socket.receivers and sender_component_name in receiver_socket.senders:
1✔
562
            # This is already connected, nothing to do
563
            return self
1✔
564

565
        if receiver_socket.senders and not receiver_socket.is_variadic:
1✔
566
            # Only variadic input sockets can receive from multiple senders
567
            msg = (
1✔
568
                f"Cannot connect '{sender_component_name}.{sender_socket.name}' with "
569
                f"'{receiver_component_name}.{receiver_socket.name}': "
570
                f"{receiver_component_name}.{receiver_socket.name} is already connected to {receiver_socket.senders}.\n"
571
            )
572
            raise PipelineConnectError(msg)
1✔
573

574
        # Update the sockets with the new connection
575
        sender_socket.receivers.append(receiver_component_name)
1✔
576
        receiver_socket.senders.append(sender_component_name)
1✔
577

578
        # Create the new connection
579
        self.graph.add_edge(
1✔
580
            sender_component_name,
581
            receiver_component_name,
582
            key=f"{sender_socket.name}/{receiver_socket.name}",
583
            conn_type=_type_name(sender_socket.type),
584
            from_socket=sender_socket,
585
            to_socket=receiver_socket,
586
            connection_type_validation=connection_type_validation,
587
            mandatory=receiver_socket.is_mandatory,
588
        )
589
        return self
1✔
590

591
    def get_component(self, name: str) -> Component:
1✔
592
        """
593
        Get the component with the specified name from the pipeline.
594

595
        :param name:
596
            The name of the component.
597
        :returns:
598
            The instance of that component.
599

600
        :raises ValueError:
601
            If a component with that name is not present in the pipeline.
602
        """
603
        try:
1✔
604
            return self.graph.nodes[name]["instance"]
1✔
605
        except KeyError as exc:
1✔
606
            raise ValueError(f"Component named {name} not found in the pipeline.") from exc
1✔
607

608
    def get_component_name(self, instance: Component) -> str:
1✔
609
        """
610
        Returns the name of the Component instance if it has been added to this Pipeline or an empty string otherwise.
611

612
        :param instance:
613
            The Component instance to look for.
614
        :returns:
615
            The name of the Component instance.
616
        """
617
        for name, inst in self.graph.nodes(data="instance"):  # type: ignore # type wrongly defined in networkx
1✔
618
            if inst == instance:
1✔
619
                return name
1✔
620
        return ""
1✔
621

622
    def inputs(self, include_components_with_connected_inputs: bool = False) -> Dict[str, Dict[str, Any]]:
1✔
623
        """
624
        Returns a dictionary containing the inputs of a pipeline.
625

626
        Each key in the dictionary corresponds to a component name, and its value is another dictionary that describes
627
        the input sockets of that component, including their types and whether they are optional.
628

629
        :param include_components_with_connected_inputs:
630
            If `False`, only components that have disconnected input edges are
631
            included in the output.
632
        :returns:
633
            A dictionary where each key is a pipeline component name and each value is a dictionary of
634
            inputs sockets of that component.
635
        """
636
        inputs: Dict[str, Dict[str, Any]] = {}
1✔
637
        for component_name, data in find_pipeline_inputs(self.graph, include_components_with_connected_inputs).items():
1✔
638
            sockets_description = {}
1✔
639
            for socket in data:
1✔
640
                sockets_description[socket.name] = {"type": socket.type, "is_mandatory": socket.is_mandatory}
1✔
641
                if not socket.is_mandatory:
1✔
642
                    sockets_description[socket.name]["default_value"] = socket.default_value
1✔
643

644
            if sockets_description:
1✔
645
                inputs[component_name] = sockets_description
1✔
646
        return inputs
1✔
647

648
    def outputs(self, include_components_with_connected_outputs: bool = False) -> Dict[str, Dict[str, Any]]:
1✔
649
        """
650
        Returns a dictionary containing the outputs of a pipeline.
651

652
        Each key in the dictionary corresponds to a component name, and its value is another dictionary that describes
653
        the output sockets of that component.
654

655
        :param include_components_with_connected_outputs:
656
            If `False`, only components that have disconnected output edges are
657
            included in the output.
658
        :returns:
659
            A dictionary where each key is a pipeline component name and each value is a dictionary of
660
            output sockets of that component.
661
        """
662
        outputs = {
1✔
663
            comp: {socket.name: {"type": socket.type} for socket in data}
664
            for comp, data in find_pipeline_outputs(self.graph, include_components_with_connected_outputs).items()
665
            if data
666
        }
667
        return outputs
1✔
668

669
    def show(self, server_url: str = "https://mermaid.ink", params: Optional[dict] = None) -> None:
1✔
670
        """
671
        Display an image representing this `Pipeline` in a Jupyter notebook.
672

673
        This function generates a diagram of the `Pipeline` using a Mermaid server and displays it directly in
674
        the notebook.
675

676
        :param server_url:
677
            The base URL of the Mermaid server used for rendering (default: 'https://mermaid.ink').
678
            See https://github.com/jihchi/mermaid.ink and https://github.com/mermaid-js/mermaid-live-editor for more
679
            info on how to set up your own Mermaid server.
680

681
        :param params:
682
            Dictionary of customization parameters to modify the output. Refer to Mermaid documentation for more details
683
            Supported keys:
684
                - format: Output format ('img', 'svg', or 'pdf'). Default: 'img'.
685
                - type: Image type for /img endpoint ('jpeg', 'png', 'webp'). Default: 'png'.
686
                - theme: Mermaid theme ('default', 'neutral', 'dark', 'forest'). Default: 'neutral'.
687
                - bgColor: Background color in hexadecimal (e.g., 'FFFFFF') or named format (e.g., '!white').
688
                - width: Width of the output image (integer).
689
                - height: Height of the output image (integer).
690
                - scale: Scaling factor (1–3). Only applicable if 'width' or 'height' is specified.
691
                - fit: Whether to fit the diagram size to the page (PDF only, boolean).
692
                - paper: Paper size for PDFs (e.g., 'a4', 'a3'). Ignored if 'fit' is true.
693
                - landscape: Landscape orientation for PDFs (boolean). Ignored if 'fit' is true.
694

695
        :raises PipelineDrawingError:
696
            If the function is called outside of a Jupyter notebook or if there is an issue with rendering.
697
        """
698
        if is_in_jupyter():
1✔
699
            from IPython.display import Image, display  # type: ignore
1✔
700

701
            image_data = _to_mermaid_image(self.graph, server_url=server_url, params=params)
1✔
702
            display(Image(image_data))
1✔
703
        else:
704
            msg = "This method is only supported in Jupyter notebooks. Use Pipeline.draw() to save an image locally."
1✔
705
            raise PipelineDrawingError(msg)
1✔
706

707
    def draw(self, path: Path, server_url: str = "https://mermaid.ink", params: Optional[dict] = None) -> None:
1✔
708
        """
709
        Save an image representing this `Pipeline` to the specified file path.
710

711
        This function generates a diagram of the `Pipeline` using the Mermaid server and saves it to the provided path.
712

713
        :param path:
714
            The file path where the generated image will be saved.
715
        :param server_url:
716
            The base URL of the Mermaid server used for rendering (default: 'https://mermaid.ink').
717
            See https://github.com/jihchi/mermaid.ink and https://github.com/mermaid-js/mermaid-live-editor for more
718
            info on how to set up your own Mermaid server.
719
        :param params:
720
            Dictionary of customization parameters to modify the output. Refer to Mermaid documentation for more details
721
            Supported keys:
722
                - format: Output format ('img', 'svg', or 'pdf'). Default: 'img'.
723
                - type: Image type for /img endpoint ('jpeg', 'png', 'webp'). Default: 'png'.
724
                - theme: Mermaid theme ('default', 'neutral', 'dark', 'forest'). Default: 'neutral'.
725
                - bgColor: Background color in hexadecimal (e.g., 'FFFFFF') or named format (e.g., '!white').
726
                - width: Width of the output image (integer).
727
                - height: Height of the output image (integer).
728
                - scale: Scaling factor (1–3). Only applicable if 'width' or 'height' is specified.
729
                - fit: Whether to fit the diagram size to the page (PDF only, boolean).
730
                - paper: Paper size for PDFs (e.g., 'a4', 'a3'). Ignored if 'fit' is true.
731
                - landscape: Landscape orientation for PDFs (boolean). Ignored if 'fit' is true.
732

733
        :raises PipelineDrawingError:
734
            If there is an issue with rendering or saving the image.
735
        """
736
        # Before drawing we edit a bit the graph, to avoid modifying the original that is
737
        # used for running the pipeline we copy it.
738
        image_data = _to_mermaid_image(self.graph, server_url=server_url, params=params)
1✔
739
        Path(path).write_bytes(image_data)
1✔
740

741
    def walk(self) -> Iterator[Tuple[str, Component]]:
1✔
742
        """
743
        Visits each component in the pipeline exactly once and yields its name and instance.
744

745
        No guarantees are provided on the visiting order.
746

747
        :returns:
748
            An iterator of tuples of component name and component instance.
749
        """
750
        for component_name, instance in self.graph.nodes(data="instance"):  # type: ignore # type is wrong in networkx
1✔
751
            yield component_name, instance
1✔
752

753
    def warm_up(self):
1✔
754
        """
755
        Make sure all nodes are warm.
756

757
        It's the node's responsibility to make sure this method can be called at every `Pipeline.run()`
758
        without re-initializing everything.
759
        """
760
        for node in self.graph.nodes:
1✔
761
            if hasattr(self.graph.nodes[node]["instance"], "warm_up"):
1✔
762
                logger.info("Warming up component {node}...", node=node)
×
763
                self.graph.nodes[node]["instance"].warm_up()
×
764

765
    def _validate_input(self, data: Dict[str, Any]):
1✔
766
        """
767
        Validates pipeline input data.
768

769
        Validates that data:
770
        * Each Component name actually exists in the Pipeline
771
        * Each Component is not missing any input
772
        * Each Component has only one input per input socket, if not variadic
773
        * Each Component doesn't receive inputs that are already sent by another Component
774

775
        :param data:
776
            A dictionary of inputs for the pipeline's components. Each key is a component name.
777

778
        :raises ValueError:
779
            If inputs are invalid according to the above.
780
        """
781
        for component_name, component_inputs in data.items():
1✔
782
            if component_name not in self.graph.nodes:
1✔
783
                raise ValueError(f"Component named {component_name} not found in the pipeline.")
1✔
784
            instance = self.graph.nodes[component_name]["instance"]
1✔
785
            for socket_name, socket in instance.__haystack_input__._sockets_dict.items():
1✔
786
                if socket.senders == [] and socket.is_mandatory and socket_name not in component_inputs:
1✔
787
                    raise ValueError(f"Missing input for component {component_name}: {socket_name}")
1✔
788
            for input_name in component_inputs.keys():
1✔
789
                if input_name not in instance.__haystack_input__._sockets_dict:
1✔
790
                    raise ValueError(f"Input {input_name} not found in component {component_name}.")
1✔
791

792
        for component_name in self.graph.nodes:
1✔
793
            instance = self.graph.nodes[component_name]["instance"]
1✔
794
            for socket_name, socket in instance.__haystack_input__._sockets_dict.items():
1✔
795
                component_inputs = data.get(component_name, {})
1✔
796
                if socket.senders == [] and socket.is_mandatory and socket_name not in component_inputs:
1✔
797
                    raise ValueError(f"Missing input for component {component_name}: {socket_name}")
1✔
798
                if socket.senders and socket_name in component_inputs and not socket.is_variadic:
1✔
799
                    raise ValueError(
1✔
800
                        f"Input {socket_name} for component {component_name} is already sent by {socket.senders}."
801
                    )
802

803
    def _prepare_component_input_data(self, data: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
1✔
804
        """
805
        Prepares input data for pipeline components.
806

807
        Organizes input data for pipeline components and identifies any inputs that are not matched to any
808
        component's input slots. Deep-copies data items to avoid sharing mutables across multiple components.
809

810
        This method processes a flat dictionary of input data, where each key-value pair represents an input name
811
        and its corresponding value. It distributes these inputs to the appropriate pipeline components based on
812
        their input requirements. Inputs that don't match any component's input slots are classified as unresolved.
813

814
        :param data:
815
            A dictionary potentially having input names as keys and input values as values.
816

817
        :returns:
818
            A dictionary mapping component names to their respective matched inputs.
819
        """
820
        # check whether the data is a nested dictionary of component inputs where each key is a component name
821
        # and each value is a dictionary of input parameters for that component
822
        is_nested_component_input = all(isinstance(value, dict) for value in data.values())
1✔
823
        if not is_nested_component_input:
1✔
824
            # flat input, a dict where keys are input names and values are the corresponding values
825
            # we need to convert it to a nested dictionary of component inputs and then run the pipeline
826
            # just like in the previous case
827
            pipeline_input_data: Dict[str, Dict[str, Any]] = defaultdict(dict)
1✔
828
            unresolved_kwargs = {}
1✔
829

830
            # Retrieve the input slots for each component in the pipeline
831
            available_inputs: Dict[str, Dict[str, Any]] = self.inputs()
1✔
832

833
            # Go through all provided to distribute them to the appropriate component inputs
834
            for input_name, input_value in data.items():
1✔
835
                resolved_at_least_once = False
1✔
836

837
                # Check each component to see if it has a slot for the current kwarg
838
                for component_name, component_inputs in available_inputs.items():
1✔
839
                    if input_name in component_inputs:
1✔
840
                        # If a match is found, add the kwarg to the component's input data
841
                        pipeline_input_data[component_name][input_name] = input_value
1✔
842
                        resolved_at_least_once = True
1✔
843

844
                if not resolved_at_least_once:
1✔
845
                    unresolved_kwargs[input_name] = input_value
1✔
846

847
            if unresolved_kwargs:
1✔
848
                logger.warning(
1✔
849
                    "Inputs {input_keys} were not matched to any component inputs, please check your run parameters.",
850
                    input_keys=list(unresolved_kwargs.keys()),
851
                )
852

853
            data = dict(pipeline_input_data)
1✔
854

855
        # deepcopying the inputs prevents the Pipeline run logic from being altered unexpectedly
856
        # when the same input reference is passed to multiple components.
857
        for component_name, component_inputs in data.items():
1✔
858
            data[component_name] = {k: deepcopy(v) for k, v in component_inputs.items()}
1✔
859

860
        return data
1✔
861

862
    @classmethod
1✔
863
    def from_template(
1✔
864
        cls, predefined_pipeline: PredefinedPipeline, template_params: Optional[Dict[str, Any]] = None
865
    ) -> "PipelineBase":
866
        """
867
        Create a Pipeline from a predefined template. See `PredefinedPipeline` for available options.
868

869
        :param predefined_pipeline:
870
            The predefined pipeline to use.
871
        :param template_params:
872
            An optional dictionary of parameters to use when rendering the pipeline template.
873
        :returns:
874
            An instance of `Pipeline`.
875
        """
876
        tpl = PipelineTemplate.from_predefined(predefined_pipeline)
1✔
877
        # If tpl.render() fails, we let bubble up the original error
878
        rendered = tpl.render(template_params)
1✔
879

880
        # If there was a problem with the rendered version of the
881
        # template, we add it to the error stack for debugging
882
        try:
1✔
883
            return cls.loads(rendered)
1✔
884
        except Exception as e:
×
885
            msg = f"Error unmarshalling pipeline: {e}\n"
×
886
            msg += f"Source:\n{rendered}"
×
887
            raise PipelineUnmarshalError(msg)
×
888

889
    def _find_receivers_from(self, component_name: str) -> List[Tuple[str, OutputSocket, InputSocket]]:
1✔
890
        """
891
        Utility function to find all Components that receive input from `component_name`.
892

893
        :param component_name:
894
            Name of the sender Component
895

896
        :returns:
897
            List of tuples containing name of the receiver Component and sender OutputSocket
898
            and receiver InputSocket instances
899
        """
900
        res = []
1✔
901
        for _, receiver_name, connection in self.graph.edges(nbunch=component_name, data=True):
1✔
902
            sender_socket: OutputSocket = connection["from_socket"]
1✔
903
            receiver_socket: InputSocket = connection["to_socket"]
1✔
904
            res.append((receiver_name, sender_socket, receiver_socket))
1✔
905
        return res
1✔
906

907
    @staticmethod
1✔
908
    def _convert_to_internal_format(pipeline_inputs: Dict[str, Any]) -> Dict[str, Dict[str, List]]:
1✔
909
        """
910
        Converts the inputs to the pipeline to the format that is needed for the internal `Pipeline.run` logic.
911

912
        Example Input:
913
        {'prompt_builder': {'question': 'Who lives in Paris?'}, 'retriever': {'query': 'Who lives in Paris?'}}
914
        Example Output:
915
        {'prompt_builder': {'question': [{'sender': None, 'value': 'Who lives in Paris?'}]},
916
         'retriever': {'query': [{'sender': None, 'value': 'Who lives in Paris?'}]}}
917

918
        :param pipeline_inputs: Inputs to the pipeline.
919
        :returns: Converted inputs that can be used by the internal `Pipeline.run` logic.
920
        """
921
        inputs: Dict[str, Dict[str, List[Dict[str, Any]]]] = {}
1✔
922
        for component_name, socket_dict in pipeline_inputs.items():
1✔
923
            inputs[component_name] = {}
1✔
924
            for socket_name, value in socket_dict.items():
1✔
925
                inputs[component_name][socket_name] = [{"sender": None, "value": value}]
1✔
926

927
        return inputs
1✔
928

929
    @staticmethod
1✔
930
    def _consume_component_inputs(component_name: str, component: Dict, inputs: Dict) -> Dict[str, Any]:
1✔
931
        """
932
        Extracts the inputs needed to run for the component and removes them from the global inputs state.
933

934
        :param component_name: The name of a component.
935
        :param component: Component with component metadata.
936
        :param inputs: Global inputs state.
937
        :returns: The inputs for the component.
938
        """
939
        component_inputs = inputs.get(component_name, {})
1✔
940
        consumed_inputs = {}
1✔
941
        greedy_inputs_to_remove = set()
1✔
942
        for socket_name, socket in component["input_sockets"].items():
1✔
943
            socket_inputs = component_inputs.get(socket_name, [])
1✔
944
            socket_inputs = [sock["value"] for sock in socket_inputs if sock["value"] is not _NO_OUTPUT_PRODUCED]
1✔
945
            if socket_inputs:
1✔
946
                if not socket.is_variadic:
1✔
947
                    # We only care about the first input provided to the socket.
948
                    consumed_inputs[socket_name] = socket_inputs[0]
1✔
949
                elif socket.is_greedy:
1✔
950
                    # We need to keep track of greedy inputs because we always remove them, even if they come from
951
                    # outside the pipeline. Otherwise, a greedy input from the user would trigger a pipeline to run
952
                    # indefinitely.
953
                    greedy_inputs_to_remove.add(socket_name)
1✔
954
                    consumed_inputs[socket_name] = [socket_inputs[0]]
1✔
955
                elif is_socket_lazy_variadic(socket):
1✔
956
                    # We use all inputs provided to the socket on a lazy variadic socket.
957
                    consumed_inputs[socket_name] = socket_inputs
1✔
958

959
        # We prune all inputs except for those that were provided from outside the pipeline (e.g. user inputs).
960
        pruned_inputs = {
1✔
961
            socket_name: [
962
                sock for sock in socket if sock["sender"] is None and not socket_name in greedy_inputs_to_remove
963
            ]
964
            for socket_name, socket in component_inputs.items()
965
        }
966
        pruned_inputs = {socket_name: socket for socket_name, socket in pruned_inputs.items() if len(socket) > 0}
1✔
967

968
        inputs[component_name] = pruned_inputs
1✔
969

970
        return consumed_inputs
1✔
971

972
    def _fill_queue(
1✔
973
        self, component_names: List[str], inputs: Dict[str, Any], component_visits: Dict[str, int]
974
    ) -> FIFOPriorityQueue:
975
        """
976
        Calculates the execution priority for each component and inserts it into the priority queue.
977

978
        :param component_names: Names of the components to put into the queue.
979
        :param inputs: Inputs to the components.
980
        :param component_visits: Current state of component visits.
981
        :returns: A prioritized queue of component names.
982
        """
983
        priority_queue = FIFOPriorityQueue()
1✔
984
        for component_name in component_names:
1✔
985
            component = self._get_component_with_graph_metadata_and_visits(
1✔
986
                component_name, component_visits[component_name]
987
            )
988
            priority = self._calculate_priority(component, inputs.get(component_name, {}))
1✔
989
            priority_queue.push(component_name, priority)
1✔
990

991
        return priority_queue
1✔
992

993
    @staticmethod
1✔
994
    def _calculate_priority(component: Dict, inputs: Dict) -> ComponentPriority:
1✔
995
        """
996
        Calculates the execution priority for a component depending on the component's inputs.
997

998
        :param component: Component metadata and component instance.
999
        :param inputs: Inputs to the component.
1000
        :returns: Priority value for the component.
1001
        """
1002
        if not can_component_run(component, inputs):
1✔
1003
            return ComponentPriority.BLOCKED
1✔
1004
        elif is_any_greedy_socket_ready(component, inputs) and are_all_sockets_ready(component, inputs):
1✔
1005
            return ComponentPriority.HIGHEST
1✔
1006
        elif all_predecessors_executed(component, inputs):
1✔
1007
            return ComponentPriority.READY
1✔
1008
        elif are_all_lazy_variadic_sockets_resolved(component, inputs):
1✔
1009
            return ComponentPriority.DEFER
1✔
1010
        else:
1011
            return ComponentPriority.DEFER_LAST
1✔
1012

1013
    def _get_component_with_graph_metadata_and_visits(self, component_name: str, visits: int) -> Dict[str, Any]:
1✔
1014
        """
1015
        Returns the component instance alongside input/output-socket metadata from the graph and adds current visits.
1016

1017
        We can't store visits in the pipeline graph because this would prevent reentrance / thread-safe execution.
1018

1019
        :param component_name: The name of the component.
1020
        :param visits: Number of visits for the component.
1021
        :returns: Dict including component instance, input/output-sockets and visits.
1022
        """
1023
        comp_dict = self.graph.nodes[component_name]
1✔
1024
        comp_dict = {**comp_dict, "visits": visits}
1✔
1025
        return comp_dict
1✔
1026

1027
    def _get_next_runnable_component(
1✔
1028
        self, priority_queue: FIFOPriorityQueue, component_visits: Dict[str, int]
1029
    ) -> Union[Tuple[ComponentPriority, str, Dict[str, Any]], None]:
1030
        """
1031
        Returns the next runnable component alongside its metadata from the priority queue.
1032

1033
        :param priority_queue: Priority queue of component names.
1034
        :param component_visits: Current state of component visits.
1035
        :returns: The next runnable component, the component name, and its priority
1036
            or None if no component in the queue can run.
1037
        :raises: PipelineMaxComponentRuns if the next runnable component has exceeded the maximum number of runs.
1038
        """
1039
        priority_and_component_name: Union[Tuple[ComponentPriority, str], None] = (
1✔
1040
            None if (item := priority_queue.get()) is None else (ComponentPriority(item[0]), str(item[1]))
1041
        )
1042

1043
        if priority_and_component_name is not None and priority_and_component_name[0] != ComponentPriority.BLOCKED:
1✔
1044
            priority, component_name = priority_and_component_name
1✔
1045
            component = self._get_component_with_graph_metadata_and_visits(
1✔
1046
                component_name, component_visits[component_name]
1047
            )
1048
            if component["visits"] > self._max_runs_per_component:
1✔
1049
                msg = f"Maximum run count {self._max_runs_per_component} reached for component '{component_name}'"
1✔
1050
                raise PipelineMaxComponentRuns(msg)
1✔
1051

1052
            return priority, component_name, component
1✔
1053

1054
        return None
1✔
1055

1056
    @staticmethod
1✔
1057
    def _add_missing_input_defaults(component_inputs: Dict[str, Any], component_input_sockets: Dict[str, InputSocket]):
1✔
1058
        """
1059
        Updates the inputs with the default values for the inputs that are missing
1060

1061
        :param component_inputs: Inputs for the component.
1062
        :param component_input_sockets: Input sockets of the component.
1063
        """
1064
        for name, socket in component_input_sockets.items():
1✔
1065
            if not socket.is_mandatory and name not in component_inputs:
1✔
1066
                if socket.is_variadic:
1✔
1067
                    component_inputs[name] = [socket.default_value]
1✔
1068
                else:
1069
                    component_inputs[name] = socket.default_value
1✔
1070

1071
        return component_inputs
1✔
1072

1073
    @staticmethod
1✔
1074
    def _write_component_outputs(
1✔
1075
        component_name, component_outputs, inputs, receivers, include_outputs_from
1076
    ) -> Dict[str, Any]:
1077
        """
1078
        Distributes the outputs of a component to the input sockets that it is connected to.
1079

1080
        :param component_name: The name of the component.
1081
        :param component_outputs: The outputs of the component.
1082
        :param inputs: The current global input state.
1083
        :param receivers: List of receiver_name, sender_socket, receiver_socket for connected components.
1084
        :param include_outputs_from: List of component names that should always return an output from the pipeline.
1085
        """
1086
        for receiver_name, sender_socket, receiver_socket in receivers:
1✔
1087
            # We either get the value that was produced by the actor or we use the _NO_OUTPUT_PRODUCED class to indicate
1088
            # that the sender did not produce an output for this socket.
1089
            # This allows us to track if a pre-decessor already ran but did not produce an output.
1090
            value = component_outputs.get(sender_socket.name, _NO_OUTPUT_PRODUCED)
1✔
1091
            if receiver_name not in inputs:
1✔
1092
                inputs[receiver_name] = {}
1✔
1093

1094
            # If we have a non-variadic or a greedy variadic receiver socket, we can just overwrite any inputs
1095
            # that might already exist (to be reconsidered but mirrors current behavior).
1096
            if not is_socket_lazy_variadic(receiver_socket):
1✔
1097
                inputs[receiver_name][receiver_socket.name] = [{"sender": component_name, "value": value}]
1✔
1098

1099
            # If the receiver socket is lazy variadic, and it already has an input, we need to append the new input.
1100
            # Lazy variadic sockets can collect multiple inputs.
1101
            else:
1102
                if not inputs[receiver_name].get(receiver_socket.name):
1✔
1103
                    inputs[receiver_name][receiver_socket.name] = []
1✔
1104

1105
                inputs[receiver_name][receiver_socket.name].append({"sender": component_name, "value": value})
1✔
1106

1107
        # If we want to include all outputs from this actor in the final outputs, we don't need to prune any consumed
1108
        # outputs
1109
        if component_name in include_outputs_from:
1✔
1110
            return component_outputs
1✔
1111

1112
        # We prune outputs that were consumed by any receiving sockets.
1113
        # All remaining outputs will be added to the final outputs of the pipeline.
1114
        consumed_outputs = {sender_socket.name for _, sender_socket, __ in receivers}
1✔
1115
        pruned_outputs = {key: value for key, value in component_outputs.items() if key not in consumed_outputs}
1✔
1116

1117
        return pruned_outputs
1✔
1118

1119
    @staticmethod
1✔
1120
    def _is_queue_stale(priority_queue: FIFOPriorityQueue) -> bool:
1✔
1121
        """
1122
        Checks if the priority queue needs to be recomputed because the priorities might have changed.
1123

1124
        :param priority_queue: Priority queue of component names.
1125
        """
1126
        return len(priority_queue) == 0 or priority_queue.peek()[0] > ComponentPriority.READY
1✔
1127

1128
    @staticmethod
1✔
1129
    def validate_pipeline(priority_queue: FIFOPriorityQueue) -> None:
1✔
1130
        """
1131
        Validate the pipeline to check if it is blocked or has no valid entry point.
1132

1133
        :param priority_queue: Priority queue of component names.
1134
        """
1135
        if len(priority_queue) == 0:
1✔
1136
            return
×
1137

1138
        candidate = priority_queue.peek()
1✔
1139
        if candidate is not None and candidate[0] == ComponentPriority.BLOCKED:
1✔
1140
            raise PipelineRuntimeError(
×
1141
                "Cannot run pipeline - all components are blocked. "
1142
                "This typically happens when:\n"
1143
                "1. There is no valid entry point for the pipeline\n"
1144
                "2. There is a circular dependency preventing the pipeline from running\n"
1145
                "Check the connections between these components and ensure all required inputs are provided."
1146
            )
1147

1148

1149
def _connections_status(
1✔
1150
    sender_node: str, receiver_node: str, sender_sockets: List[OutputSocket], receiver_sockets: List[InputSocket]
1151
) -> str:
1152
    """
1153
    Lists the status of the sockets, for error messages.
1154
    """
1155
    sender_sockets_entries = []
1✔
1156
    for sender_socket in sender_sockets:
1✔
1157
        sender_sockets_entries.append(f" - {sender_socket.name}: {_type_name(sender_socket.type)}")
1✔
1158
    sender_sockets_list = "\n".join(sender_sockets_entries)
1✔
1159

1160
    receiver_sockets_entries = []
1✔
1161
    for receiver_socket in receiver_sockets:
1✔
1162
        if receiver_socket.senders:
1✔
1163
            sender_status = f"sent by {','.join(receiver_socket.senders)}"
1✔
1164
        else:
1165
            sender_status = "available"
1✔
1166
        receiver_sockets_entries.append(
1✔
1167
            f" - {receiver_socket.name}: {_type_name(receiver_socket.type)} ({sender_status})"
1168
        )
1169
    receiver_sockets_list = "\n".join(receiver_sockets_entries)
1✔
1170

1171
    return f"'{sender_node}':\n{sender_sockets_list}\n'{receiver_node}':\n{receiver_sockets_list}"
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc