• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / canals / 5902589996

18 Aug 2023 12:26PM UTC coverage: 93.556% (+0.09%) from 93.466%
5902589996

Pull #95

github

web-flow
Merge fbfe6429f into 50c1afd14
Pull Request #95: Remove all mentions of Component.defaults

177 of 182 branches covered (97.25%)

Branch coverage included in aggregate %.

665 of 718 relevant lines covered (92.62%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.0
canals/pipeline/validation.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4
from typing import List, Dict, Any
1✔
5
import logging
1✔
6

7
import networkx
1✔
8

9
from canals.errors import PipelineValidationError
1✔
10
from canals.pipeline.sockets import InputSocket, OutputSocket
1✔
11

12

13
logger = logging.getLogger(__name__)
1✔
14

15

16
def _find_pipeline_inputs(graph: networkx.MultiDiGraph) -> Dict[str, List[InputSocket]]:
1✔
17
    """
18
    Collect components that have disconnected input sockets. Note that this method returns *ALL* disconnected
19
    input sockets, including all such sockets with default values.
20
    """
21
    return {
1✔
22
        name: [socket for socket in data.get("input_sockets", {}).values() if not socket.sender]
23
        for name, data in graph.nodes(data=True)
24
    }
25

26

27
def _find_pipeline_outputs(graph) -> Dict[str, List[OutputSocket]]:
1✔
28
    """
29
    Collect components that have disconnected output sockets. They define the pipeline output.
30
    """
31
    return {
1✔
32
        node: list(data.get("output_sockets", {}).values())
33
        for node, data in graph.nodes(data=True)
34
        if not graph.out_edges(node)
35
    }
36

37

38
def _validate_pipeline_input(graph: networkx.MultiDiGraph, input_values: Dict[str, Any]) -> Dict[str, Any]:
1✔
39
    """
40
    Make sure the pipeline is properly built and that the input received makes sense.
41
    Returns the input values, validated and updated at need.
42
    """
43
    if not any(sockets for sockets in _find_pipeline_inputs(graph).values()):
1✔
44
        raise PipelineValidationError("This pipeline has no inputs.")
1✔
45

46
    # Make sure the input keys are all nodes of the pipeline
47
    unknown_components = [key for key in input_values.keys() if not key in graph.nodes]
1✔
48
    if unknown_components:
1✔
49
        raise ValueError(f"Pipeline received data for unknown component(s): {', '.join(unknown_components)}")
1✔
50

51
    # Make sure all necessary sockets are connected
52
    _validate_input_sockets_are_connected(graph, input_values)
1✔
53

54
    # Make sure that the pipeline input is only sent to nodes that won't receive data from other nodes
55
    _validate_nodes_receive_only_expected_input(graph, input_values)
1✔
56

57
    return input_values
1✔
58

59

60
def _validate_input_sockets_are_connected(graph: networkx.MultiDiGraph, input_values: Dict[str, Any]):
1✔
61
    """
62
    Make sure all the inputs nodes are receiving all the values they need, either from the Pipeline's input or from
63
    other nodes.
64
    """
65
    valid_inputs = _find_pipeline_inputs(graph)
1✔
66
    for node, sockets in valid_inputs.items():
1✔
67
        for socket in sockets:
1✔
68
            inputs_for_node = input_values.get(node, {})
1✔
69
            missing_input_value = (
1✔
70
                inputs_for_node is None
71
                or not socket.name in inputs_for_node.keys()
72
                or inputs_for_node.get(socket.name, None) is None
73
            )
74
            if missing_input_value and not socket.is_optional:
1✔
75
                raise ValueError(f"Missing input: {node}.{socket.name}")
1✔
76

77

78
def _validate_nodes_receive_only_expected_input(graph: networkx.MultiDiGraph, input_values: Dict[str, Any]):
1✔
79
    """
80
    Make sure that every input node is only receiving input values from EITHER the pipeline's input or another node,
81
    but never from both.
82
    """
83
    for node, input_data in input_values.items():
1✔
84
        for socket_name in input_data.keys():
1✔
85
            if input_data.get(socket_name, None) is None:
1✔
86
                continue
×
87
            if not socket_name in graph.nodes[node]["input_sockets"].keys():
1✔
88
                raise ValueError(
1✔
89
                    f"Component {node} is not expecting any input value called {socket_name}. "
90
                    "Are you using the correct Input class?"
91
                )
92

93
            sender = graph.nodes[node]["input_sockets"][socket_name].sender
1✔
94
            if sender:
1✔
95
                raise ValueError(f"The input {socket_name} of {node} is already sent by node {sender}")
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc