• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / canals / 5679577863

27 Jul 2023 10:45AM UTC coverage: 93.457% (-0.4%) from 93.813%
5679577863

push

github

web-flow
Merge pull request #60 from deepset-ai/decorator-based-io

Decorator based I/O

158 of 162 branches covered (97.53%)

Branch coverage included in aggregate %.

599 of 648 relevant lines covered (92.44%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.45
canals/component/component.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import logging
1✔
6
import inspect
1✔
7
from typing import Protocol, Union, Dict, Any, get_origin, get_args
1✔
8
from functools import wraps
1✔
9

10
from canals.errors import ComponentError
1✔
11
from canals.type_checking import _types_are_compatible
1✔
12

13

14
logger = logging.getLogger(__name__)
1✔
15

16

17
# We ignore too-few-public-methods Pylint error as this is only meant to be
18
# the definition of the Component interface.
19
class Component(Protocol):  # pylint: disable=too-few-public-methods
1✔
20
    """
21
    Abstract interface of a Component.
22
    This is only used by type checking tools.
23
    If you want to create a new Component use the @component decorator.
24
    """
25

26
    def run(self, **kwargs) -> Dict[str, Any]:
1✔
27
        """
28
        Takes the Component input and returns its output.
29
        Inputs are defined explicitly by the run method's signature or with `component.set_input_types()` if dynamic.
30
        Outputs are defined by decorating the run method with `@component.output_types()`
31
        or with `component.set_output_types()` if dynamic.
32
        """
33

34

35
def _prepare_init_params_and_sockets(init_func):
1✔
36
    """
37
    Decorator that saves the init parameters of a component in `self.init_parameters`
38
    """
39

40
    @wraps(init_func)
1✔
41
    def wrapper(self, *args, **kwargs):
1✔
42
        # Call the actual __init__ function with the arguments
43
        init_func(self, *args, **kwargs)
1✔
44

45
        # Collect and store all the init parameters, preserving whatever the components might have already added there
46
        self.init_parameters = {**kwargs, **getattr(self, "init_parameters", {})}
1✔
47

48
        if not hasattr(self.run, "__canals_io__"):
1✔
49
            raise ComponentError("This component seems to have neither inputs nor outputs.")
×
50

51
    return wrapper
1✔
52

53

54
class _Component:
1✔
55
    """
56
    Marks a class as a component. Any class decorated with `@component` can be used by a Pipeline.
57

58
    All components must follow the contract below. This docstring is the source of truth for components contract.
59

60
    ### `@component` decorator
61

62
    All component classes must be decorated with the `@component` decorator. This allows Canals to discover them.
63

64
    ### `__init__(self, **kwargs)`
65

66
    Optional method.
67

68
    Components may have an `__init__` method where they define:
69

70
    - `self.init_parameters = {same parameters that the __init__ method received}`:
71
        In this dictionary you can store any state the components wish to be persisted when they are saved.
72
        These values will be given to the `__init__` method of a new instance when the pipeline is loaded.
73
        Note that by default the `@component` decorator saves the arguments automatically.
74
        However, if a component sets their own `init_parameters` manually in `__init__()`, that will be used instead.
75
        Note: all of the values contained here **must be JSON serializable**. Serialize them manually if needed.
76

77
    Components should take only "basic" Python types as parameters of their `__init__` function, or iterables and
78
    dictionaries containing only such values. Anything else (objects, functions, etc) will raise an exception at init
79
    time. If there's the need for such values, consider serializing them to a string.
80

81
    _(TODO explain how to use classes and functions in init. In the meantime see `test/components/test_accumulate.py`)_
82

83
    The `__init__` must be extrememly lightweight, because it's a frequent operation during the construction and
84
    validation of the pipeline. If a component has some heavy state to initialize (models, backends, etc...) refer to
85
    the `warm_up()` method.
86

87

88
    ### `warm_up(self)`
89

90
    Optional method.
91

92
    This method is called by Pipeline before the graph execution. Make sure to avoid double-initializations,
93
    because Pipeline will not keep track of which components it called `warm_up()` on.
94

95

96
    ### `run(self, data)`
97

98
    Mandatory method.
99

100
    This is the method where the main functionality of the component should be carried out. It's called by
101
    `Pipeline.run()`.
102

103
    When the component should run, Pipeline will call this method with an instance of the dataclass returned by the
104
    method decorated with `@component.input`. This dataclass contains:
105

106
    - all the input values coming from other components connected to it,
107
    - if any is missing, the corresponding value defined in `self.defaults`, if it exists.
108

109
    `run()` must return a single instance of the dataclass declared through the method decorated with
110
    `@component.output`.
111

112
    Args:
113
        class_: the class that Canals should use as a component.
114
        serializable: whether to check, at init time, if the component can be saved with
115
        `save_pipelines()`.
116

117
    Returns:
118
        A class that can be recognized as a component.
119

120
    Raises:
121
        ComponentError: if the class provided has no `run()` method or otherwise doesn't respect the component contract.
122
    """
123

124
    def __init__(self):
1✔
125
        self.registry = {}
1✔
126

127
    def set_input_types(self, instance, **types):
1✔
128
        """
129
        Method that validates the input kwargs of the run method.
130

131
        Use as:
132

133
        ```python
134
        @component
135
        class MyComponent:
136

137
            def __init__(self, value: int):
138
                component.set_input_types(value_1=str, value_2=str)
139
                ...
140

141
            @component.output_types(output_1=int, output_2=str)
142
            def run(self, **kwargs):
143
                return {"output_1": kwargs["value_1"], "output_2": ""}
144
        ```
145
        """
146
        run_method = instance.run
1✔
147

148
        def wrapper(**kwargs):
1✔
149
            """
150
            Adds a check that validates the input kwargs of the run method.
151
            """
152
            # Check input types
153
            for key, value in kwargs.items():
1✔
154
                if key not in types:
1✔
155
                    raise ComponentError(f"Input value '{key}' not declared in component.set_input_types()")
1✔
156
                if _types_are_compatible(value, types[key]):
1✔
157
                    raise ComponentError(
×
158
                        f"Input type {type(value)} for value '{key}' doesn't match the one declared in "
159
                        f"component.set_input_types() ({types[key]}))"
160
                    )
161
            return run_method(**kwargs)
1✔
162

163
        # Store the input types in the run method
164
        wrapper.__canals_io__ = getattr(instance.run, "__canals_io__", {})
1✔
165
        wrapper.__canals_io__["input_types"] = {
1✔
166
            name: {"name": name, "type": type_, "is_optional": _is_optional(type_)} for name, type_ in types.items()
167
        }
168

169
        # Assigns the wrapped method to the instance's run()
170
        instance.run = wrapper
1✔
171

172
    def set_output_types(self, instance, **types):
1✔
173
        """
174
        Method that validates the output dictionary of the run method.
175

176
        Use as:
177

178
        ```python
179
        @component
180
        class MyComponent:
181

182
            def __init__(self, value: int):
183
                component.set_output_types(output_1=int, output_2=str)
184
                ...
185

186
            def run(self, value: int):
187
                return {"output_1": 1, "output_2": "2"}
188
        ```
189
        """
190
        if not types:
1✔
191
            return
×
192

193
        run_method = instance.run
1✔
194

195
        def wrapper(*args, **kwargs):
1✔
196
            """
197
            Adds a check that validates the output dictionary of the run method.
198
            """
199
            result = run_method(*args, **kwargs)
1✔
200
            # Check output types
201
            for key in result:
1✔
202
                if key not in types:
1✔
203
                    raise ComponentError(f"Return value '{key}' not declared in component.set_output_types()")
×
204
                if _types_are_compatible(types[key], result[key]):
1✔
205
                    raise ComponentError(
×
206
                        f"Return type {type(result[key])} for value '{key}' doesn't match the one declared in "
207
                        f"component.set_output_types() ({types[key]}))"
208
                    )
209
            return result
1✔
210

211
        # Store the output types in the run method
212
        wrapper.__canals_io__ = getattr(instance.run, "__canals_io__", {})
1✔
213
        wrapper.__canals_io__["output_types"] = {name: {"name": name, "type": type_} for name, type_ in types.items()}
1✔
214

215
        # Assigns the wrapped method to the instance's run()
216
        instance.run = wrapper
1✔
217

218
    def output_types(self, **types):
1✔
219
        """
220
        Decorator factory that validates the output dictionary of the run method.
221

222
        Use as:
223

224
        ```python
225
        @component
226
        class MyComponent:
227
            @component.output_types(output_1=int, output_2=str)
228
            def run(self, value: int):
229
                return {"output_1": 1, "output_2": "2"}
230
        ```
231
        """
232

233
        def output_types_decorator(run_method):
1✔
234
            """
235
            Decorator that validates the output dictionary of the run method.
236
            """
237
            # Store the output types in the run method - used by the pipeline to build the sockets.
238
            if not hasattr(run_method, "__canals_io__"):
1✔
239
                run_method.__canals_io__ = {}
1✔
240
            run_method.__canals_io__["output_types"] = {
1✔
241
                name: {"name": name, "type": type_} for name, type_ in types.items()
242
            }
243

244
            @wraps(run_method)
1✔
245
            def output_types_impl(self, *args, **kwargs):
1✔
246
                """
247
                Adds a check that validates the output dictionary of the run method.
248
                """
249
                result = run_method(self, *args, **kwargs)
1✔
250

251
                # Check output types
252
                for key in result:
1✔
253
                    if key not in types:
1✔
254
                        raise ComponentError(f"Return value '{key}' not declared in @output_types decorator")
×
255
                    if _types_are_compatible(types[key], result[key]):
1✔
256
                        raise ComponentError(
×
257
                            f"Return type {type(result[key])} for value '{key}' doesn't match the one declared in "
258
                            f"@output_types decorator ({types[key]}))"
259
                        )
260
                return result
1✔
261

262
            return output_types_impl
1✔
263

264
        return output_types_decorator
1✔
265

266
    def _component(self, class_):
1✔
267
        """
268
        Decorator validating the structure of the component and registering it in the components registry.
269
        """
270
        logger.debug("Registering %s as a component", class_)
1✔
271

272
        # Check for run()
273
        if not hasattr(class_, "run"):
1✔
274
            raise ComponentError(f"{class_.__name__} must have a 'run()' method. See the docs for more information.")
1✔
275
        run_signature = inspect.signature(class_.run)
1✔
276

277
        # Create the input sockets
278
        if not hasattr(class_.run, "__canals_io__"):
1✔
279
            class_.run.__canals_io__ = {}
1✔
280
        class_.run.__canals_io__["input_types"] = {
1✔
281
            param: {
282
                "name": param,
283
                "type": run_signature.parameters[param].annotation,
284
                "is_optional": _is_optional(run_signature.parameters[param].annotation),
285
            }
286
            for param in list(run_signature.parameters)[1:]  # First is 'self' and it doesn't matter.
287
        }
288

289
        # Automatically registers all the init parameters in an instance attribute called `_init_parameters`.
290
        # See `save_init_parameters()`.
291
        class_.__init__ = _prepare_init_params_and_sockets(class_.__init__)
1✔
292

293
        # Save the component in the class registry (for deserialization)
294
        if class_.__name__ in self.registry:
1✔
295
            logger.error(
1✔
296
                "Component %s is already registered. Previous imported from '%s', new imported from '%s'",
297
                class_.__name__,
298
                self.registry[class_.__name__],
299
                class_,
300
            )
301
        self.registry[class_.__name__] = class_
1✔
302
        logger.debug("Registered Component %s", class_)
1✔
303

304
        return class_
1✔
305

306
    def __call__(self, class_=None):
1✔
307
        """Allows us to use this decorator with parenthesis and without."""
308
        if class_:
1✔
309
            return self._component(class_)
1✔
310

311
        return self._component
×
312

313

314
component = _Component()
1✔
315

316

317
def _is_optional(type_: type) -> bool:
1✔
318
    """
319
    Utility method that returns whether a type is Optional.
320
    """
321
    return get_origin(type_) is Union and type(None) in get_args(type_)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc