• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / canals / 5331939869

21 Jun 2023 08:39AM UTC coverage: 93.779% (-0.2%) from 93.998%
5331939869

Pull #23

github

web-flow
Merge ca1995464 into 4e413c7ba
Pull Request #23: Rework how component I/O is defined

171 of 176 branches covered (97.16%)

Branch coverage included in aggregate %.

643 of 692 relevant lines covered (92.92%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.28
canals/component/component.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4
import logging
1✔
5
import inspect
1✔
6

7
from canals.errors import ComponentError
1✔
8
from canals.component.decorators import save_init_params, init_defaults
1✔
9
from canals.component.input_output import Connection, _input, _output
1✔
10

11

12
logger = logging.getLogger(__name__)
1✔
13

14

15
def component(class_):
1✔
16
    """
17
    Marks a class as a component. Any class decorated with `@component` can be used by a Pipeline.
18

19
    All components must follow the contract below. This docstring is the source of truth for components contract.
20

21
    ### `@component` decorator
22

23
    All component classes must be decorated with the `@component` decorator. This allows Canals to discover them.
24

25
    ### `Input`
26

27
    ```python
28
    @dataclass
29
    class Input(ComponentInput / VariadicComponentInput):
30
        <expected input fields, typed, with no defaults>
31
    ```
32
    Semi-mandatory method (either this or `self.input_type(self)`).
33

34
    This inner class defines how the input of this component looks like. For example, if the node is expecting
35
    a list of Documents, the fields of the class should be `documents: List[Document]`
36

37
    Defaults are allowed, however `Optional`, `Union` and similar "generic" types are not. This is necessary to allow
38
    proper validation of the connections, which rely on the type of these fields.
39

40
    If your node expects variadic input, use `VariadicComponentInput`. In all other scenarios, use `ComponentInput`
41
    as your base class.
42

43
    Some components may need more dynamic input. For these scenarios, refer to `self.input_type()`.
44

45
    Every component should define **either** `Input` or `self.input_type()`.
46

47

48
    ### `input_type()`
49

50
    ```python
51
    @property
52
    def input_type(self) -> ComponentInput / VariadicComponentInput:
53
    ```
54
    Semi-mandatory method (either this or `class Input`).
55

56
    This method defines how the input of this component looks like. For example, if the node is expecting
57
    a list of Documents, this method should return a dataclass, subclass of either `ComponentInput` or
58
    `VariadicComponentInput`, with such fields. For example, it could build the dataclass as
59
    `make_dataclass("Input", fields=[(f"documents", List[Document], None)], bases=(ComponentInput, ))` and return it.
60

61
    Defaults are allowed, however `Optional`, `Union` and similar "generic" types are not. This is necessary to allow
62
    proper validation of the connections, which rely on the type of these fields.
63

64
    Normally the `Input` dataclass is preferred, as it provides autocompletion for the users and is much easier to use.
65

66
    Every component should define **either** `Input` or `self.input_type()`.
67

68

69
    ### `Output`
70

71
    ```python
72
    @dataclass
73
    class Output(ComponentOutput):
74
        <expected output fields, typed>
75
    ```
76
    Semi-mandatory method (either this or `self.output_type()`).
77

78
    This inner class defines how the output of this component looks like. For example, if the node is producing
79
    a list of Documents, the fields of the class should be `documents: List[Document]`
80

81
    Defaults are allowed, however `Optional`, `Union` and similar "generic" types are not. This is necessary to allow
82
    proper validation of the connections, which rely on the type of these fields.
83

84
    Some components may need more dynamic output: for example, your component accepts a list of file extensions at
85
    init time and wants to have one output field for each of those. For these scenarios, refer to `self.output_type()`.
86

87
    Every component should define **either** `Output` or `self.output_type()`.
88

89

90
    ### `output_type()`
91

92
    ```python
93
    @property
94
    def output_type(self) -> ComponentOutput:
95
    ```
96
    Semi-mandatory method (either this or `class Output`).
97

98
    This method defines how the output of this component looks like. For example, if the node is producing
99
    a list of Documents, this method should return a dataclass with such fields, for example:
100
    `return make_dataclass("Output", fields=[(f"documents", List[Document], None)], bases=(ComponentOutput, ))`
101

102
    Defaults are allowed, however `Optional`, `Union` and similar "generic" types are not. This is necessary to allow
103
    proper validation of the connections, which rely on the type of these fields.
104

105
    If the output is static, normally the `Output` dataclass is preferred, as it provides autocompletion for the users.
106

107
    Every component should define **either** `Output` or `self.output_type`.
108

109

110
    ### `__init__()`
111

112
    ```python
113
    def __init__(self, [... components init parameters ...]):
114
    ```
115
    Optional method.
116

117
    Components may have an `__init__` method where they define:
118

119
    - `self.defaults = {parameter_name: parameter_default_value, ...}`:
120
        All values defined here will be sent to the `run()` method when the Pipeline calls it.
121
        If any of these parameters is also receiving input from other components, those have precedence.
122
        This collection of values is supposed to replace the need for default values in `run()` and make them
123
        dynamically configurable. Keep in mind that only these defaults will count at runtime: defaults given to
124
        the `Input` dataclass (see above) will be ignored.
125

126
    - `self.init_parameters = {same parameters that the __init__ method received}`:
127
        In this dictionary you can store any state the components wish to be persisted when they are saved.
128
        These values will be given to the `__init__` method of a new instance when the pipeline is loaded.
129
        Note that by default the `@component` decorator saves the arguments automatically.
130
        However, if a component sets their own `init_parameters` manually in `__init__()`, that will be used instead.
131
        Note: all of the values contained here **must be JSON serializable**. Serialize them manually if needed.
132

133
    Components should take only "basic" Python types as parameters of their `__init__` function, or iterables and
134
    dictionaries containing only such values. Anything else (objects, functions, etc) will raise an exception at init
135
    time. If there's the need for such values, consider serializing them to a string.
136

137
    _(TODO explain how to use classes and functions in init. In the meantime see `test/components/test_accumulate.py`)_
138

139
    The `__init__` must be extrememly lightweight, because it's a frequent operation during the construction and
140
    validation of the pipeline. If a component has some heavy state to initialize (models, backends, etc...) refer to
141
    the `warm_up()` method.
142

143

144
    ### `warm_up()`
145

146
    ```python
147
    def warm_up(self):
148
    ```
149
    Optional method.
150

151
    This method is called by Pipeline before the graph execution. Make sure to avoid double-initializations,
152
    because Pipeline will not keep track of which components it called `warm_up()` on.
153

154

155
    ### `run()`
156

157
    ```python
158
    def run(self, data: <Input if defined, otherwise untyped>) -> <Output if defined, otherwise untyped>:
159
    ```
160
    Mandatory method.
161

162
    This is the method where the main functionality of the component should be carried out. It's called by
163
    `Pipeline.run()`.
164

165
    When the component should run, Pipeline will call this method with:
166

167
    - all the input values coming from other components connected to it,
168
    - if any is missing, the corresponding value defined in `self.defaults`, if it exists.
169

170
    `run()` must return a single instance of the dataclass declared through either `Output` or `self.output_type()`.
171

172
    Args:
173
        class_: the class that Canals should use as a component.
174
        serializable: whether to check, at init time, if the component can be saved with
175
        `save_pipelines()`.
176

177
    Returns:
178
        A class that can be recognized as a component.
179

180
    Raises:
181
        ComponentError: if the class provided has no `run()` method or otherwise doesn't respect the component contract.
182
    """
183
    logger.debug("Registering %s as a component", class_)
1✔
184

185
    # '__canals_component__' is used to distinguish components from regular classes.
186
    # Its value is set to the desired component name: normally it is the class name, but it can technically be customized.
187
    class_.__canals_component__ = class_.__name__
1✔
188

189
    # Find input and output properties
190
    (input_, output) = _find_input_output(class_)
1✔
191

192
    # Save the input and output properties so it's easier to find them when running the Component since we won't
193
    # need to search the exact property name each time
194
    class_.__canals_input__ = input_
1✔
195
    class_.__canals_output__ = output
1✔
196

197
    # Check that the run method respects all constraints
198
    _check_run_signature(class_)
1✔
199

200
    # Automatically registers all the init parameters in an instance attribute called `init_parameters`.
201
    # See `save_init_params()`.
202
    class_.__init__ = save_init_params(class_.__init__)
1✔
203

204
    # Makes sure the self.defaults dictionary is always present
205
    class_.__init__ = init_defaults(class_.__init__)
1✔
206

207
    return class_
1✔
208

209

210
# We do this to have some namespacing and also to make it clear that the methods decorated with
211
# @component.input and @component.output must have their class decorated as @component.
212
setattr(component, "input", _input)
1✔
213
setattr(component, "output", _output)
1✔
214

215

216
def _find_input_output(class_):
1✔
217
    """
218
    Finds the input and the output definitions for class_ and returns them.
219

220
    There must be only a single definition of input and output for class_, if either
221
    none or more than one are found raise ConnectionError.
222
    """
223
    inputs_found = []
1✔
224
    outputs_found = []
1✔
225

226
    # Get all properties of class_
227
    properties = inspect.getmembers(class_, predicate=lambda m: isinstance(m, property))
1✔
228
    for _, prop in properties:
1✔
229
        if not hasattr(prop, "fget") and not hasattr(prop.fget, "__canals_connection__"):
1✔
230
            continue
×
231

232
        # Field __canals_connection__ is set by _input and _output decorators
233
        if prop.fget.__canals_connection__ in [Connection.INPUT, Connection.INPUT_VARIADIC]:
1✔
234
            inputs_found.append(prop)
1✔
235
        elif prop.fget.__canals_connection__ == Connection.OUTPUT:
1✔
236
            outputs_found.append(prop)
1✔
237

238
    if (in_len := len(inputs_found)) != 1:
1✔
239
        # Raise if we don't find only a single input definition
240
        if in_len == 0:
1✔
241
            raise ComponentError(
1✔
242
                f"No input definition found in Component {class_.__name__}. "
243
                "Create a method that returns a dataclass defining the input and "
244
                "decorate it with @component.input() to fix the error."
245
            )
246
        raise ComponentError(f"Multiple input definitions found for Component {class_.__name__}.")
1✔
247

248
    if (in_len := len(outputs_found)) != 1:
1✔
249
        # Raise if we don't find only a single output definition
250
        if in_len == 0:
1✔
251
            raise ComponentError(
1✔
252
                f"No output definition found in Component {class_.__name__}. "
253
                "Create a method that returns a dataclass defining the output and "
254
                "decorate it with @component.output() to fix the error."
255
            )
256
        raise ComponentError(f"Multiple output definitions found for Component {class_.__name__}.")
1✔
257

258
    return (inputs_found[0], outputs_found[0])
1✔
259

260

261
def _check_run_signature(class_):
1✔
262
    """
263
    Check that the component's run() method exists and respects all constraints
264
    """
265
    # Check for run()
266
    if not hasattr(class_, "run"):
1✔
267
        raise ComponentError(f"{class_.__name__} must have a 'run()' method. See the docs for more information.")
1✔
268
    run_signature = inspect.signature(class_.run)
1✔
269

270
    # run() must take a single input param
271
    if len(run_signature.parameters) != 2:
1✔
272
        raise ComponentError("run() must accept only a single parameter called 'data'.")
1✔
273

274
    # The input param must be called data
275
    if not "data" in run_signature.parameters:
1✔
276
        raise ComponentError("run() must accept a parameter called 'data'.")
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc