• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / canals / 5834429870

11 Aug 2023 03:38PM UTC coverage: 95.179% (+1.7%) from 93.466%
5834429870

Pull #55

github

web-flow
Merge 063ea707d into 8e973ea23
Pull Request #55: experiment: FSM implementation

176 of 179 branches covered (98.32%)

Branch coverage included in aggregate %.

673 of 713 relevant lines covered (94.39%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.7
canals/pipeline/pipeline.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4
from typing import Optional, Any, Dict, List, Union, Tuple
1✔
5

6
import datetime
1✔
7
import logging
1✔
8
from pathlib import Path
1✔
9
from copy import deepcopy
1✔
10

11
import networkx
1✔
12

13
from canals.component import component, Component
1✔
14
from canals.errors import (
1✔
15
    PipelineError,
16
    PipelineConnectError,
17
    PipelineMaxLoops,
18
    PipelineRuntimeError,
19
    PipelineValidationError,
20
)
21
from canals.pipeline.draw import _draw, _convert_for_debug, RenderingEngines
1✔
22
from canals.pipeline.sockets import InputSocket, OutputSocket
1✔
23
from canals.pipeline.validation import _validate_pipeline_input
1✔
24
from canals.pipeline.connections import _parse_connection_name, _find_unambiguous_connection
1✔
25
from canals.utils import _type_name
1✔
26

27
logger = logging.getLogger(__name__)
1✔
28

29

30
class Pipeline:
1✔
31
    """
32
    Components orchestration engine.
33

34
    Builds a graph of components and orchestrates their execution according to the execution graph.
35
    """
36

37
    def __init__(
1✔
38
        self,
39
        metadata: Optional[Dict[str, Any]] = None,
40
        max_loops_allowed: int = 100,
41
        debug_path: Union[Path, str] = Path(".canals_debug/"),
42
    ):
43
        """
44
        Creates the Pipeline.
45

46
        Args:
47
            metadata: arbitrary dictionary to store metadata about this pipeline. Make sure all the values contained in
48
                this dictionary can be serialized and deserialized if you wish to save this pipeline to file with
49
                `save_pipelines()/load_pipelines()`.
50
            max_loops_allowed: how many times the pipeline can run the same node before throwing an exception.
51
            debug_path: when debug is enabled in `run()`, where to save the debug data.
52
        """
53
        self.metadata = metadata or {}
1✔
54
        self.max_loops_allowed = max_loops_allowed
1✔
55
        self.graph = networkx.MultiDiGraph()
1✔
56
        self.valid_states: Dict[str, List[List[Tuple[str, str]]]] = {}
1✔
57
        self.debug: Dict[int, Dict[str, Any]] = {}
1✔
58
        self.debug_path = Path(debug_path)
1✔
59

60
    def __eq__(self, other) -> bool:
1✔
61
        """
62
        Equal pipelines share every metadata, node and edge, but they're not required to use
63
        the same node instances: this allows pipeline saved and then loaded back to be equal to themselves.
64
        """
65
        if (
1✔
66
            not isinstance(other, type(self))
67
            or not getattr(self, "metadata") == getattr(other, "metadata")
68
            or not getattr(self, "max_loops_allowed") == getattr(other, "max_loops_allowed")
69
            or not hasattr(self, "graph")
70
            or not hasattr(other, "graph")
71
        ):
72
            return False
×
73

74
        return (
1✔
75
            self.graph.adj == other.graph.adj
76
            and self._comparable_nodes_list(self.graph) == self._comparable_nodes_list(other.graph)
77
            and self.graph.graph == other.graph.graph
78
        )
79

80
    def to_dict(self) -> Dict[str, Any]:
1✔
81
        """
82
        Returns this Pipeline instance as a dictionary.
83
        This is meant to be an intermediate representation but it can be also used to save a pipeline to file.
84
        """
85
        components = {name: instance.to_dict() for name, instance in self.graph.nodes(data="instance")}
1✔
86
        connections = []
1✔
87
        for sender, receiver, sockets in self.graph.edges:
1✔
88
            (sender_socket, receiver_socket) = sockets.split("/")
1✔
89
            connections.append(
1✔
90
                {
91
                    "sender": f"{sender}.{sender_socket}",
92
                    "receiver": f"{receiver}.{receiver_socket}",
93
                }
94
            )
95
        return {
1✔
96
            "metadata": self.metadata,
97
            "max_loops_allowed": self.max_loops_allowed,
98
            "components": components,
99
            "connections": connections,
100
        }
101

102
    @classmethod
1✔
103
    def from_dict(cls, data: Dict[str, Any], **kwargs) -> "Pipeline":
1✔
104
        """
105
        Creates a Pipeline instance from a dictionary.
106
        A sample `data` dictionary could be formatted like so:
107
        ```
108
        {
109
            "metadata": {"test": "test"},
110
            "max_loops_allowed": 100,
111
            "components": {
112
                "add_two": {
113
                    "type": "AddFixedValue",
114
                    "hash": "123",
115
                    "init_parameters": {"add": 2},
116
                },
117
                "add_default": {
118
                    "type": "AddFixedValue",
119
                    "hash": "456",
120
                    "init_parameters": {"add": 1},
121
                },
122
                "double": {
123
                    "type": "Double",
124
                    "hash": "789"
125
                },
126
            },
127
            "connections": [
128
                {"sender": "add_two.result", "receiver": "double.value"},
129
                {"sender": "double.value", "receiver": "add_default.value"},
130
            ],
131
        }
132
        ```
133

134
        Supported kwargs:
135
        `components`: a dictionary of {name: instance} to reuse instances of components instead of creating new ones.
136
        """
137
        metadata = data.get("metadata", {})
1✔
138
        max_loops_allowed = data.get("max_loops_allowed", 100)
1✔
139
        debug_path = Path(data.get("debug_path", ".canals_debug/"))
1✔
140
        pipe = cls(
1✔
141
            metadata=metadata,
142
            max_loops_allowed=max_loops_allowed,
143
            debug_path=debug_path,
144
        )
145
        components_to_reuse = kwargs.get("components", {})
1✔
146
        for name, component_data in data.get("components", {}).items():
1✔
147
            if name in components_to_reuse:
1✔
148
                # Reuse an instance
149
                instance = components_to_reuse[name]
1✔
150
            else:
151
                if "type" not in component_data:
1✔
152
                    raise PipelineError(f"Missing 'type' in component '{name}'")
1✔
153
                if component_data["type"] not in component.registry:
1✔
154
                    raise PipelineError(f"Component '{component_data['type']}' not imported.")
1✔
155
                # Create a new one
156
                instance = component.registry[component_data["type"]].from_dict(component_data)
1✔
157
            pipe.add_component(name=name, instance=instance)
1✔
158

159
        for connection in data.get("connections", []):
1✔
160
            if "sender" not in connection:
1✔
161
                raise PipelineError(f"Missing sender in connection: {connection}")
1✔
162
            if "receiver" not in connection:
1✔
163
                raise PipelineError(f"Missing receiver in connection: {connection}")
1✔
164
            pipe.connect(connect_from=connection["sender"], connect_to=connection["receiver"])
1✔
165

166
        return pipe
1✔
167

168
    def _comparable_nodes_list(self, graph: networkx.MultiDiGraph) -> List[Dict[str, Any]]:
1✔
169
        """
170
        Replaces instances of nodes with their class name and defaults list in order to make sure they're comparable.
171
        """
172
        nodes = []
1✔
173
        for node in graph.nodes:
1✔
174
            comparable_node = graph.nodes[node]
1✔
175
            if hasattr(comparable_node, "defaults"):
1✔
176
                comparable_node["defaults"] = comparable_node["instance"].defaults
×
177
            comparable_node["instance"] = comparable_node["instance"].__class__
1✔
178
            nodes.append(comparable_node)
1✔
179
        nodes.sort()
1✔
180
        return nodes
1✔
181

182
    def add_component(self, name: str, instance: Component) -> None:
1✔
183
        """
184
        Create a component for the given component. Components are not connected to anything by default:
185
        use `Pipeline.connect()` to connect components together.
186

187
        Component names must be unique, but component instances can be reused if needed.
188

189
        Args:
190
            name: the name of the component.
191
            instance: the component instance.
192

193
        Returns:
194
            None
195

196
        Raises:
197
            ValueError: if a component with the same name already exists
198
            PipelineValidationError: if the given instance is not a Canals component
199
        """
200
        # Component names are unique
201
        if name in self.graph.nodes:
1✔
202
            raise ValueError(f"A component named '{name}' already exists in this pipeline: choose another name.")
×
203

204
        # Components can't be named `_debug`
205
        if name == "_debug":
1✔
206
            raise ValueError("'_debug' is a reserved name for debug output. Choose another name.")
×
207

208
        # Component instances must be components
209
        if not hasattr(instance, "__canals_component__"):
1✔
210
            raise PipelineValidationError(
×
211
                f"'{type(instance)}' doesn't seem to be a component. Is this class decorated with @component?"
212
            )
213

214
        # Create the component's input and output sockets
215
        inputs = getattr(instance.run, "__canals_input__", {})
1✔
216
        outputs = getattr(instance.run, "__canals_output__", {})
1✔
217
        input_sockets = {name: InputSocket(**data) for name, data in inputs.items()}
1✔
218
        output_sockets = {name: OutputSocket(**data) for name, data in outputs.items()}
1✔
219

220
        # Add component to the graph, disconnected
221
        logger.debug("Adding component '%s' (%s)", name, instance)
1✔
222
        self.graph.add_node(
1✔
223
            name,
224
            instance=instance,
225
            input_sockets=input_sockets,
226
            output_sockets=output_sockets,
227
            visits=0,
228
        )
229

230
    def connect(self, connect_from: str, connect_to: str) -> None:
1✔
231
        """
232
        Connects two components together. All components to connect must exist in the pipeline.
233
        If connecting to an component that has several output connections, specify the inputs and output names as
234
        'component_name.connections_name'.
235

236
        Args:
237
            connect_from: the component that delivers the value. This can be either just a component name or can be
238
                in the format `component_name.connection_name` if the component has multiple outputs.
239
            connect_to: the component that receives the value. This can be either just a component name or can be
240
                in the format `component_name.connection_name` if the component has multiple inputs.
241

242
        Returns:
243
            None
244

245
        Raises:
246
            PipelineConnectError: if the two components cannot be connected (for example if one of the components is
247
                not present in the pipeline, or the connections don't match by type, and so on).
248
        """
249
        # Edges may be named explicitly by passing 'node_name.edge_name' to connect().
250
        from_node, from_socket_name = _parse_connection_name(connect_from)
1✔
251
        to_node, to_socket_name = _parse_connection_name(connect_to)
1✔
252

253
        # Get the nodes data.
254
        try:
1✔
255
            from_sockets = self.graph.nodes[from_node]["output_sockets"]
1✔
256
        except KeyError as exc:
1✔
257
            raise ValueError(f"Component named {from_node} not found in the pipeline.") from exc
1✔
258

259
        try:
1✔
260
            to_sockets = self.graph.nodes[to_node]["input_sockets"]
1✔
261
        except KeyError as exc:
1✔
262
            raise ValueError(f"Component named {to_node} not found in the pipeline.") from exc
1✔
263

264
        # If the name of either socket is given, get the socket
265
        if from_socket_name:
1✔
266
            from_socket = from_sockets.get(from_socket_name, None)
1✔
267
            if not from_socket:
1✔
268
                raise PipelineConnectError(
1✔
269
                    f"'{from_node}.{from_socket_name} does not exist. "
270
                    f"Output connections of {from_node} are: "
271
                    + ", ".join([f"{name} (type {_type_name(socket.type)})" for name, socket in from_sockets.items()])
272
                )
273
        if to_socket_name:
1✔
274
            to_socket = to_sockets.get(to_socket_name, None)
1✔
275
            if not to_socket:
1✔
276
                raise PipelineConnectError(
1✔
277
                    f"'{to_node}.{to_socket_name} does not exist. "
278
                    f"Input connections of {to_node} are: "
279
                    + ", ".join([f"{name} (type {_type_name(socket.type)})" for name, socket in to_sockets.items()])
280
                )
281

282
        # Look for an unambiguous connection among the possible ones.
283
        # Note that if there is more than one possible connection but two sockets match by name, they're paired.
284
        from_sockets = [from_socket] if from_socket_name else list(from_sockets.values())
1✔
285
        to_sockets = [to_socket] if to_socket_name else list(to_sockets.values())
1✔
286
        from_socket, to_socket = _find_unambiguous_connection(
1✔
287
            sender_node=from_node, sender_sockets=from_sockets, receiver_node=to_node, receiver_sockets=to_sockets
288
        )
289

290
        # Connect the components on these sockets
291
        self._direct_connect(from_node=from_node, from_socket=from_socket, to_node=to_node, to_socket=to_socket)
1✔
292

293
    def _direct_connect(self, from_node: str, from_socket: OutputSocket, to_node: str, to_socket: InputSocket) -> None:
1✔
294
        """
295
        Directly connect socket to socket. This method does not type-check the connections: use 'Pipeline.connect()'
296
        instead (which uses 'find_unambiguous_connection()' to validate types).
297
        """
298
        # Make sure the receiving socket isn't already connected - sending sockets can be connected as many times as needed,
299
        # so they don't need this check
300
        if to_socket.sender:
1✔
301
            raise PipelineConnectError(
1✔
302
                f"Cannot connect '{from_node}.{from_socket.name}' with '{to_node}.{to_socket.name}': "
303
                f"{to_node}.{to_socket.name} is already connected to {to_socket.sender}.\n"
304
            )
305

306
        # Create the connection
307
        logger.debug("Connecting '%s.%s' to '%s.%s'", from_node, from_socket.name, to_node, to_socket.name)
1✔
308
        edge_key = f"{from_socket.name}/{to_socket.name}"
1✔
309
        self.graph.add_edge(
1✔
310
            from_node,
311
            to_node,
312
            key=edge_key,
313
            conn_type=_type_name(from_socket.type),
314
            from_socket=from_socket,
315
            to_socket=to_socket,
316
        )
317

318
        # Stores the name of the node that will send its output to this socket
319
        to_socket.sender = from_node
1✔
320

321
    def get_component(self, name: str) -> Component:
1✔
322
        """
323
        Returns an instance of a component.
324

325
        Args:
326
            name: the name of the component
327

328
        Returns:
329
            The instance of that component.
330

331
        Raises:
332
            ValueError: if a component with that name is not present in the pipeline.
333
        """
334
        try:
×
335
            return self.graph.nodes[name]["instance"]
×
336
        except KeyError as exc:
×
337
            raise ValueError(f"Component named {name} not found in the pipeline.") from exc
×
338

339
    def draw(self, path: Path, engine: RenderingEngines = "mermaid-img") -> None:
1✔
340
        """
341
        Draws the pipeline. Requires either `graphviz` as a system dependency, or an internet connection for Mermaid.
342
        Run `pip install canals[graphviz]` or `pip install canals[mermaid]` to install missing dependencies.
343

344
        Args:
345
            path: where to save the diagram.
346
            engine: which format to save the graph as. Accepts 'graphviz', 'mermaid-text', 'mermaid-img'.
347
                Default is 'mermaid-img'.
348

349
        Returns:
350
            None
351

352
        Raises:
353
            ImportError: if `engine='graphviz'` and `pygraphviz` is not installed.
354
            HTTPConnectionError: (and similar) if the internet connection is down or other connection issues.
355
        """
356
        sockets = {
1✔
357
            comp: "\n".join([f"{name}: {socket}" for name, socket in data.get("input_sockets", {}).items()])
358
            for comp, data in self.graph.nodes(data=True)
359
        }
360
        print(sockets)
1✔
361
        _draw(graph=deepcopy(self.graph), path=path, engine=engine)
1✔
362

363
    def warm_up(self):
1✔
364
        """
365
        Make sure all nodes are warm.
366

367
        It's the node's responsibility to make sure this method can be called at every `Pipeline.run()`
368
        without re-initializing everything.
369
        """
370
        for node in self.graph.nodes:
1✔
371
            if hasattr(self.graph.nodes[node]["instance"], "warm_up"):
1✔
372
                logger.info("Warming up component %s...", node)
×
373
                self.graph.nodes[node]["instance"].warm_up()
×
374

375
    def _record_pipeline_step(self, step, inputs_buffer, pipeline_output):
1✔
376
        """
377
        Stores a snapshot of this step into the self.debug dictionary of the pipeline.
378
        """
379
        mermaid_graph = _convert_for_debug(deepcopy(self.graph))
×
380
        self.debug[step] = {
×
381
            "time": datetime.datetime.now(),
382
            "inputs_buffer": list(inputs_buffer.items()),
383
            "pipeline_output": pipeline_output,
384
            "diagram": mermaid_graph,
385
        }
386

387
    def _clear_visits_count(self):
1✔
388
        """
389
        Make sure all nodes's visits count is zero.
390
        """
391
        for node in self.graph.nodes:
1✔
392
            self.graph.nodes[node]["visits"] = 0
1✔
393

394
    def _check_max_loops(self, component_name: str):
1✔
395
        """
396
        Verify whether this component run too many times.
397
        """
398
        if self.graph.nodes[component_name]["visits"] > self.max_loops_allowed:
1✔
399
            raise PipelineMaxLoops(
1✔
400
                f"Maximum loops count ({self.max_loops_allowed}) exceeded for component '{component_name}'."
401
            )
402

403
    def _compute_valid_states(self):
1✔
404
        """
405
        Returns a list of all the valid minimal states that would lead to a specific component to run.
406
        These tuples are used by `_state_transition_function()` with `issubset()` to compute the next transition.
407
        """
408
        self.valid_states = {}
1✔
409
        for component_name in self.graph.nodes:
1✔
410
            input_from_loop, input_outside_loop = self._identify_looping_inputs(component_name)
1✔
411
            if input_from_loop and input_outside_loop:
1✔
412
                # Is a loop merger, so it has two valid states, one for the loop and one for the external input
413
                self.valid_states[component_name] = [
1✔
414
                    [(component_name, socket) for socket in input_from_loop],
415
                    [(component_name, socket) for socket in input_outside_loop],
416
                ]
417
                continue
1✔
418

419
            # It's a regular component, so it has one minimum valid state only
420
            valid_state = []
1✔
421
            for socket_name, socket in self.graph.nodes[component_name]["input_sockets"].items():
1✔
422
                if not socket.has_default:
1✔
423
                    valid_state.append((component_name, socket_name))
1✔
424

425
            self.valid_states[component_name] = [valid_state]
1✔
426

427
    def _identify_looping_inputs(self, component_name: str):
1✔
428
        """
429
        Identify which of the input sockets of this component are coming from a loop and which are not.
430
        """
431
        input_from_loop = []
1✔
432
        input_outside_loop = []
1✔
433

434
        for socket in self.graph.nodes[component_name]["input_sockets"]:
1✔
435
            sender = self.graph.nodes[component_name]["input_sockets"][socket].sender
1✔
436
            if sender and networkx.has_path(self.graph, component_name, sender):
1✔
437
                input_from_loop.append(socket)
1✔
438
            else:
439
                input_outside_loop.append(socket)
1✔
440
        return input_from_loop, input_outside_loop
1✔
441

442
    def _identify_looping_outputs(self, component_name: str):
1✔
443
        """
444
        Identify which of the output sockets of this component are going into a loop and which are not.
445
        """
446
        output_to_loop = []
1✔
447
        output_outside_loop = []
1✔
448

449
        for socket in self.graph.nodes[component_name]["output_sockets"]:
1✔
450
            for _, to_node, _ in self.graph.out_edges(component_name, keys=True):
1✔
451
                if to_node and networkx.has_path(self.graph, to_node, component_name):
1✔
452
                    output_to_loop.append(socket)
1✔
453
                else:
454
                    output_outside_loop.append(socket)
1✔
455

456
        return output_to_loop, output_outside_loop
1✔
457

458
    def _state_transition_function(self, state: Tuple[Tuple[str, str], ...]) -> List[str]:
1✔
459
        """
460
        Given the current state as a list of tuples of (component, socket), returns the transition to perform
461
        as list of components that should run.
462

463
        Args:
464
            current_state (Tuple[Tuple[str, str]]): the current state as a list of tuples of (component, socket)
465
        """
466
        transition = []
1✔
467
        for component_name in self.graph.nodes:
1✔
468
            for valid_state in self.valid_states[component_name]:
1✔
469
                if set(valid_state).issubset(set(state)):
1✔
470
                    transition.append(component_name)
1✔
471

472
        return transition
1✔
473

474
    def run(self, data: Dict[str, Any], debug: bool = False) -> Dict[str, Any]:  # pylint: disable=too-many-locals
1✔
475
        """
476
        Runs the pipeline.
477

478
        Args:
479
            data: the inputs to give to the input components of the Pipeline.
480
            parameters: a dictionary with all the parameters of all the components, namespaced by component.
481
            debug: whether to collect and return debug information.
482

483
        Returns:
484
            A dictionary with the outputs of the output components of the Pipeline.
485

486
        Raises:
487
            PipelineRuntimeError: if the any of the components fail or return unexpected output.
488
        """
489
        if debug:
1✔
490
            logger.warning("Debug mode is still WIP")
×
491

492
        data = _validate_pipeline_input(self.graph, input_values=data)
1✔
493
        self._clear_visits_count()
1✔
494
        self.warm_up()
1✔
495
        pipeline_output: Dict[str, Any] = {}
1✔
496

497
        logger.info("Pipeline execution started.")
1✔
498

499
        # List all the input/output socket pairs - for quicker access
500
        connections = [
1✔
501
            (from_node, sockets.split("/")[0], to_node, sockets.split("/")[1])
502
            for from_node, to_node, sockets in self.graph.edges
503
        ]
504
        self._compute_valid_states()
1✔
505

506
        # Initial state
507
        state: Dict[Tuple[str, str], Any] = {}
1✔
508
        for component_name, input_data in data.items():
1✔
509
            for socket, value in input_data.items():
1✔
510
                if value is not None:
1✔
511
                    state[(component_name, socket)] = value
1✔
512

513
        # Execution loop
514
        step = 0
1✔
515
        while True:
1✔
516
            step += 1
1✔
517

518
            # Get the transition to perform
519
            transition = self._state_transition_function(state=tuple(state.keys()))
1✔
520
            logger.debug("##### %s^ transition #####", step)
1✔
521
            logger.debug("State: %s | Transition: %s", tuple(state.keys()), transition)
1✔
522

523
            # Termination condition: stopping states return an empty transition
524
            if not transition:
1✔
525
                logger.debug("   --X This is a stopping state.")
1✔
526
                break
1✔
527

528
            # Apply the transition to get to the next state
529
            output: Dict[str, Any]
530
            state, output = self._apply_transition(state, transition, connections)
1✔
531
            pipeline_output = {**pipeline_output, **output}
1✔
532

533
        logger.info("Pipeline executed successfully.")
1✔
534

535
        # Clean up output dictionary from None values
536
        clean_output = {}
1✔
537
        for component_name, outputs in pipeline_output.items():
1✔
538
            if not all(value is None for value in outputs.values()):
1✔
539
                clean_output[component_name] = {key: value for key, value in outputs.items() if value is not None}
1✔
540

541
        return clean_output
1✔
542

543
    def _apply_transition(
1✔
544
        self, state: Dict[Tuple[str, str], Any], transition: List[str], connections: List[Tuple[str, str, str, str]]
545
    ) -> Tuple[Dict[Tuple[str, str], Any], Dict[str, Any]]:
546
        """
547
        Given the current state of the pipeline, compute the next state. Returns a Tuple with (state, output)
548
        """
549
        output: Dict[str, Any] = {}
1✔
550
        next_state = state
1✔
551

552
        # Process all the component in this transition independently ("parallel branch execution")
553
        for component_name in transition:
1✔
554
            # Extract from the general state only the inputs for this component
555
            component_inputs = {state[1]: value for state, value in state.items() if state[0] == component_name}
1✔
556

557
            # Once an input is being used, remove it from the machine's state.
558
            # Leftover state will be carried over ("waiting for components")
559
            for used_input in component_inputs.keys():
1✔
560
                del next_state[(component_name, used_input)]
1✔
561

562
            # Run the component
563
            output_values = self._run_component(
1✔
564
                name=component_name,
565
                inputs=component_inputs,
566
            )
567

568
            # Translate output sockets into input sockets - builds the next state
569
            for socket, value in output_values.items():
1✔
570
                target_states = [
1✔
571
                    (to_node, to_socket)
572
                    for from_node, from_socket, to_node, to_socket in connections
573
                    if from_node == component_name and from_socket == socket
574
                ]
575

576
                # If the sockets are dangling, this value is an output
577
                if not target_states:
1✔
578
                    if component_name not in output:
1✔
579
                        output[component_name] = {}
1✔
580
                    output[component_name][socket] = value
1✔
581
                    logger.debug(" - '%s.%s' goes to the output with value '%s'", component_name, socket, value)
1✔
582

583
                # Otherwise, assign the values to the next state
584
                for target in target_states:
1✔
585
                    if all(self._identify_looping_outputs(component_name)) and value is None:
1✔
586
                        logger.debug("   --X Loop decision nodes do not propagate None values.")
1✔
587
                    else:
588
                        next_state[target] = value
1✔
589
                        logger.debug("   --> '%s.%s' received '%s'", target[0], target[1], value)
1✔
590

591
        return next_state, output
1✔
592

593
    def _run_component(self, name: str, inputs: Dict[str, Any]) -> Dict[str, Any]:
1✔
594
        """
595
        Once we're confident this component is ready to run, run it and collect the output.
596
        """
597
        self._check_max_loops(name)
1✔
598
        self.graph.nodes[name]["visits"] += 1
1✔
599
        instance = self.graph.nodes[name]["instance"]
1✔
600
        try:
1✔
601
            logger.info("* Running %s (visits: %s)", name, self.graph.nodes[name]["visits"])
1✔
602
            logger.debug("   '%s' inputs: %s", name, inputs)
1✔
603

604
            # Check if any None was received by a value that was not Optional:
605
            # if so, return an empty output dataclass ("skipping the component")
606
            if all(value is None for value in inputs.values()) or any(
1✔
607
                value is None and not self.graph.nodes[name]["input_sockets"][socket_name].is_optional
608
                for socket_name, value in inputs.items()
609
            ):
610
                logger.debug("   --X '%s' received None on a mandatory input: skipping.", name)
1✔
611
                output_dict: Dict[str, Any] = {key: None for key in self.graph.nodes[name]["output_sockets"].keys()}
1✔
612
                logger.debug("   '%s' outputs: %s", name, output_dict)
1✔
613
                return output_dict
1✔
614

615
            output_dict = instance.run(**inputs)
1✔
616

617
            # Unwrap the output
618
            logger.debug("   '%s' outputs: %s", name, output_dict)
1✔
619

620
        except Exception as e:
×
621
            raise PipelineRuntimeError(
×
622
                f"{name} raised '{e.__class__.__name__}: {e}' \nInputs: {inputs}\n\n"
623
                "See the stacktrace above for more information."
624
            ) from e
625

626
        return output_dict
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc