• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / canals / 5739097098

02 Aug 2023 12:56PM UTC coverage: 94.11% (+0.5%) from 93.577%
5739097098

push

github

web-flow
Merge pull request #64 from deepset-ai/remove-run-time-checks

Remove Component's I/O type checks at run time

144 of 147 branches covered (97.96%)

Branch coverage included in aggregate %.

575 of 617 relevant lines covered (93.19%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.37
canals/component/component.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import logging
1✔
6
import inspect
1✔
7
from typing import Protocol, Union, Dict, Any, get_origin, get_args
1✔
8
from functools import wraps
1✔
9

10
from canals.errors import ComponentError
1✔
11

12

13
logger = logging.getLogger(__name__)
1✔
14

15

16
# We ignore too-few-public-methods Pylint error as this is only meant to be
17
# the definition of the Component interface.
18
class Component(Protocol):  # pylint: disable=too-few-public-methods
1✔
19
    """
20
    Abstract interface of a Component.
21
    This is only used by type checking tools.
22
    If you want to create a new Component use the @component decorator.
23
    """
24

25
    def run(self, **kwargs) -> Dict[str, Any]:
1✔
26
        """
27
        Takes the Component input and returns its output.
28
        Inputs are defined explicitly by the run method's signature or with `component.set_input_types()` if dynamic.
29
        Outputs are defined by decorating the run method with `@component.output_types()`
30
        or with `component.set_output_types()` if dynamic.
31
        """
32

33

34
def _prepare_init_params_and_sockets(init_func):
1✔
35
    """
36
    Decorator that saves the init parameters of a component in `self.init_parameters`
37
    """
38

39
    @wraps(init_func)
1✔
40
    def wrapper(self, *args, **kwargs):
1✔
41
        # Call the actual __init__ function with the arguments
42
        init_func(self, *args, **kwargs)
1✔
43

44
        # Collect and store all the init parameters, preserving whatever the components might have already added there
45
        self.init_parameters = {**kwargs, **getattr(self, "init_parameters", {})}
1✔
46

47
    return wrapper
1✔
48

49

50
class _Component:
1✔
51
    """
52
    Marks a class as a component. Any class decorated with `@component` can be used by a Pipeline.
53

54
    All components must follow the contract below. This docstring is the source of truth for components contract.
55

56
    ### `@component` decorator
57

58
    All component classes must be decorated with the `@component` decorator. This allows Canals to discover them.
59

60
    ### `__init__(self, **kwargs)`
61

62
    Optional method.
63

64
    Components may have an `__init__` method where they define:
65

66
    - `self.init_parameters = {same parameters that the __init__ method received}`:
67
        In this dictionary you can store any state the components wish to be persisted when they are saved.
68
        These values will be given to the `__init__` method of a new instance when the pipeline is loaded.
69
        Note that by default the `@component` decorator saves the arguments automatically.
70
        However, if a component sets their own `init_parameters` manually in `__init__()`, that will be used instead.
71
        Note: all of the values contained here **must be JSON serializable**. Serialize them manually if needed.
72

73
    Components should take only "basic" Python types as parameters of their `__init__` function, or iterables and
74
    dictionaries containing only such values. Anything else (objects, functions, etc) will raise an exception at init
75
    time. If there's the need for such values, consider serializing them to a string.
76

77
    _(TODO explain how to use classes and functions in init. In the meantime see `test/components/test_accumulate.py`)_
78

79
    The `__init__` must be extrememly lightweight, because it's a frequent operation during the construction and
80
    validation of the pipeline. If a component has some heavy state to initialize (models, backends, etc...) refer to
81
    the `warm_up()` method.
82

83

84
    ### `warm_up(self)`
85

86
    Optional method.
87

88
    This method is called by Pipeline before the graph execution. Make sure to avoid double-initializations,
89
    because Pipeline will not keep track of which components it called `warm_up()` on.
90

91

92
    ### `run(self, data)`
93

94
    Mandatory method.
95

96
    This is the method where the main functionality of the component should be carried out. It's called by
97
    `Pipeline.run()`.
98

99
    When the component should run, Pipeline will call this method with an instance of the dataclass returned by the
100
    method decorated with `@component.input`. This dataclass contains:
101

102
    - all the input values coming from other components connected to it,
103
    - if any is missing, the corresponding value defined in `self.defaults`, if it exists.
104

105
    `run()` must return a single instance of the dataclass declared through the method decorated with
106
    `@component.output`.
107

108
    Args:
109
        class_: the class that Canals should use as a component.
110
        serializable: whether to check, at init time, if the component can be saved with
111
        `save_pipelines()`.
112

113
    Returns:
114
        A class that can be recognized as a component.
115

116
    Raises:
117
        ComponentError: if the class provided has no `run()` method or otherwise doesn't respect the component contract.
118
    """
119

120
    def __init__(self):
1✔
121
        self.registry = {}
1✔
122

123
    def set_input_types(self, instance, **types):
1✔
124
        """
125
        Method that validates the input kwargs of the run method.
126

127
        Use as:
128

129
        ```python
130
        @component
131
        class MyComponent:
132

133
            def __init__(self, value: int):
134
                component.set_input_types(value_1=str, value_2=str)
135
                ...
136

137
            @component.output_types(output_1=int, output_2=str)
138
            def run(self, **kwargs):
139
                return {"output_1": kwargs["value_1"], "output_2": ""}
140
        ```
141
        """
142
        run_method = instance.run
1✔
143

144
        def wrapper(**kwargs):
1✔
145
            return run_method(**kwargs)
1✔
146

147
        # Store the input types in the run method
148
        wrapper.__canals_io__ = getattr(instance.run, "__canals_io__", {})
1✔
149
        wrapper.__canals_io__["input_types"] = {
1✔
150
            name: {"name": name, "type": type_, "is_optional": _is_optional(type_)} for name, type_ in types.items()
151
        }
152

153
        # Assigns the wrapped method to the instance's run()
154
        instance.run = wrapper
1✔
155

156
    def set_output_types(self, instance, **types):
1✔
157
        """
158
        Method that validates the output dictionary of the run method.
159

160
        Use as:
161

162
        ```python
163
        @component
164
        class MyComponent:
165

166
            def __init__(self, value: int):
167
                component.set_output_types(output_1=int, output_2=str)
168
                ...
169

170
            def run(self, value: int):
171
                return {"output_1": 1, "output_2": "2"}
172
        ```
173
        """
174
        if not types:
1✔
175
            return
×
176

177
        run_method = instance.run
1✔
178

179
        def wrapper(*args, **kwargs):
1✔
180
            return run_method(*args, **kwargs)
1✔
181

182
        # Store the output types in the run method
183
        wrapper.__canals_io__ = getattr(instance.run, "__canals_io__", {})
1✔
184
        wrapper.__canals_io__["output_types"] = {name: {"name": name, "type": type_} for name, type_ in types.items()}
1✔
185

186
        # Assigns the wrapped method to the instance's run()
187
        instance.run = wrapper
1✔
188

189
    def output_types(self, **types):
1✔
190
        """
191
        Decorator factory that validates the output dictionary of the run method.
192

193
        Use as:
194

195
        ```python
196
        @component
197
        class MyComponent:
198
            @component.output_types(output_1=int, output_2=str)
199
            def run(self, value: int):
200
                return {"output_1": 1, "output_2": "2"}
201
        ```
202
        """
203

204
        def output_types_decorator(run_method):
1✔
205
            """
206
            Decorator that validates the output dictionary of the run method.
207
            """
208
            # Store the output types in the run method - used by the pipeline to build the sockets.
209
            if not hasattr(run_method, "__canals_io__"):
1✔
210
                run_method.__canals_io__ = {}
1✔
211
            run_method.__canals_io__["output_types"] = {
1✔
212
                name: {"name": name, "type": type_} for name, type_ in types.items()
213
            }
214

215
            @wraps(run_method)
1✔
216
            def output_types_impl(self, *args, **kwargs):
1✔
217
                return run_method(self, *args, **kwargs)
1✔
218

219
            return output_types_impl
1✔
220

221
        return output_types_decorator
1✔
222

223
    def _component(self, class_):
1✔
224
        """
225
        Decorator validating the structure of the component and registering it in the components registry.
226
        """
227
        logger.debug("Registering %s as a component", class_)
1✔
228

229
        # Check for run()
230
        if not hasattr(class_, "run"):
1✔
231
            raise ComponentError(f"{class_.__name__} must have a 'run()' method. See the docs for more information.")
1✔
232
        run_signature = inspect.signature(class_.run)
1✔
233

234
        # Create the input sockets
235
        if not hasattr(class_.run, "__canals_io__"):
1✔
236
            class_.run.__canals_io__ = {}
1✔
237
        class_.run.__canals_io__["input_types"] = {
1✔
238
            param: {
239
                "name": param,
240
                "type": run_signature.parameters[param].annotation,
241
                "is_optional": _is_optional(run_signature.parameters[param].annotation),
242
            }
243
            for param in list(run_signature.parameters)[1:]  # First is 'self' and it doesn't matter.
244
        }
245

246
        # Automatically registers all the init parameters in an instance attribute called `_init_parameters`.
247
        # See `save_init_parameters()`.
248
        class_.__init__ = _prepare_init_params_and_sockets(class_.__init__)
1✔
249

250
        # Save the component in the class registry (for deserialization)
251
        if class_.__name__ in self.registry:
1✔
252
            logger.error(
1✔
253
                "Component %s is already registered. Previous imported from '%s', new imported from '%s'",
254
                class_.__name__,
255
                self.registry[class_.__name__],
256
                class_,
257
            )
258
        self.registry[class_.__name__] = class_
1✔
259
        logger.debug("Registered Component %s", class_)
1✔
260

261
        return class_
1✔
262

263
    def __call__(self, class_=None):
1✔
264
        """Allows us to use this decorator with parenthesis and without."""
265
        if class_:
1✔
266
            return self._component(class_)
1✔
267

268
        return self._component
×
269

270

271
component = _Component()
1✔
272

273

274
def _is_optional(type_: type) -> bool:
1✔
275
    """
276
    Utility method that returns whether a type is Optional.
277
    """
278
    return get_origin(type_) is Union and type(None) in get_args(type_)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc