• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / canals / 5739693931

02 Aug 2023 01:51PM UTC coverage: 94.094% (-0.02%) from 94.11%
5739693931

push

github

web-flow
Merge pull request #65 from deepset-ai/split-io-magic-fields

Split __canals_io__ field in __canals_input__ and __canals_output__

142 of 145 branches covered (97.93%)

Branch coverage included in aggregate %.

575 of 617 relevant lines covered (93.19%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.22
canals/component/component.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import logging
1✔
6
import inspect
1✔
7
from typing import Protocol, Union, Dict, Any, get_origin, get_args
1✔
8
from functools import wraps
1✔
9

10
from canals.errors import ComponentError
1✔
11

12

13
logger = logging.getLogger(__name__)
1✔
14

15

16
# We ignore too-few-public-methods Pylint error as this is only meant to be
17
# the definition of the Component interface.
18
class Component(Protocol):  # pylint: disable=too-few-public-methods
1✔
19
    """
20
    Abstract interface of a Component.
21
    This is only used by type checking tools.
22
    If you want to create a new Component use the @component decorator.
23
    """
24

25
    def run(self, **kwargs) -> Dict[str, Any]:
1✔
26
        """
27
        Takes the Component input and returns its output.
28
        Inputs are defined explicitly by the run method's signature or with `component.set_input_types()` if dynamic.
29
        Outputs are defined by decorating the run method with `@component.output_types()`
30
        or with `component.set_output_types()` if dynamic.
31
        """
32

33

34
def _prepare_init_params_and_sockets(init_func):
1✔
35
    """
36
    Decorator that saves the init parameters of a component in `self.init_parameters`
37
    """
38

39
    @wraps(init_func)
1✔
40
    def wrapper(self, *args, **kwargs):
1✔
41
        # Call the actual __init__ function with the arguments
42
        init_func(self, *args, **kwargs)
1✔
43

44
        # Collect and store all the init parameters, preserving whatever the components might have already added there
45
        self.init_parameters = {**kwargs, **getattr(self, "init_parameters", {})}
1✔
46

47
    return wrapper
1✔
48

49

50
class _Component:
1✔
51
    """
52
    Marks a class as a component. Any class decorated with `@component` can be used by a Pipeline.
53

54
    All components must follow the contract below. This docstring is the source of truth for components contract.
55

56
    ### `@component` decorator
57

58
    All component classes must be decorated with the `@component` decorator. This allows Canals to discover them.
59

60
    ### `__init__(self, **kwargs)`
61

62
    Optional method.
63

64
    Components may have an `__init__` method where they define:
65

66
    - `self.init_parameters = {same parameters that the __init__ method received}`:
67
        In this dictionary you can store any state the components wish to be persisted when they are saved.
68
        These values will be given to the `__init__` method of a new instance when the pipeline is loaded.
69
        Note that by default the `@component` decorator saves the arguments automatically.
70
        However, if a component sets their own `init_parameters` manually in `__init__()`, that will be used instead.
71
        Note: all of the values contained here **must be JSON serializable**. Serialize them manually if needed.
72

73
    Components should take only "basic" Python types as parameters of their `__init__` function, or iterables and
74
    dictionaries containing only such values. Anything else (objects, functions, etc) will raise an exception at init
75
    time. If there's the need for such values, consider serializing them to a string.
76

77
    _(TODO explain how to use classes and functions in init. In the meantime see `test/components/test_accumulate.py`)_
78

79
    The `__init__` must be extrememly lightweight, because it's a frequent operation during the construction and
80
    validation of the pipeline. If a component has some heavy state to initialize (models, backends, etc...) refer to
81
    the `warm_up()` method.
82

83

84
    ### `warm_up(self)`
85

86
    Optional method.
87

88
    This method is called by Pipeline before the graph execution. Make sure to avoid double-initializations,
89
    because Pipeline will not keep track of which components it called `warm_up()` on.
90

91

92
    ### `run(self, data)`
93

94
    Mandatory method.
95

96
    This is the method where the main functionality of the component should be carried out. It's called by
97
    `Pipeline.run()`.
98

99
    When the component should run, Pipeline will call this method with an instance of the dataclass returned by the
100
    method decorated with `@component.input`. This dataclass contains:
101

102
    - all the input values coming from other components connected to it,
103
    - if any is missing, the corresponding value defined in `self.defaults`, if it exists.
104

105
    `run()` must return a single instance of the dataclass declared through the method decorated with
106
    `@component.output`.
107

108
    Args:
109
        class_: the class that Canals should use as a component.
110
        serializable: whether to check, at init time, if the component can be saved with
111
        `save_pipelines()`.
112

113
    Returns:
114
        A class that can be recognized as a component.
115

116
    Raises:
117
        ComponentError: if the class provided has no `run()` method or otherwise doesn't respect the component contract.
118
    """
119

120
    def __init__(self):
1✔
121
        self.registry = {}
1✔
122

123
    def set_input_types(self, instance, **types):
1✔
124
        """
125
        Method that validates the input kwargs of the run method.
126

127
        Use as:
128

129
        ```python
130
        @component
131
        class MyComponent:
132

133
            def __init__(self, value: int):
134
                component.set_input_types(value_1=str, value_2=str)
135
                ...
136

137
            @component.output_types(output_1=int, output_2=str)
138
            def run(self, **kwargs):
139
                return {"output_1": kwargs["value_1"], "output_2": ""}
140
        ```
141
        """
142
        run_method = instance.run
1✔
143

144
        def wrapper(**kwargs):
1✔
145
            return run_method(**kwargs)
1✔
146

147
        # Store the input types in the run method
148
        wrapper.__canals_input__ = {
1✔
149
            name: {"name": name, "type": type_, "is_optional": _is_optional(type_)} for name, type_ in types.items()
150
        }
151
        wrapper.__canals_output__ = getattr(run_method, "__canals_output__", {})
1✔
152

153
        # Assigns the wrapped method to the instance's run()
154
        instance.run = wrapper
1✔
155

156
    def set_output_types(self, instance, **types):
1✔
157
        """
158
        Method that validates the output dictionary of the run method.
159

160
        Use as:
161

162
        ```python
163
        @component
164
        class MyComponent:
165

166
            def __init__(self, value: int):
167
                component.set_output_types(output_1=int, output_2=str)
168
                ...
169

170
            def run(self, value: int):
171
                return {"output_1": 1, "output_2": "2"}
172
        ```
173
        """
174
        if not types:
1✔
175
            return
×
176

177
        run_method = instance.run
1✔
178

179
        def wrapper(*args, **kwargs):
1✔
180
            return run_method(*args, **kwargs)
1✔
181

182
        # Store the output types in the run method
183
        wrapper.__canals_input__ = getattr(run_method, "__canals_input__", {})
1✔
184
        wrapper.__canals_output__ = {name: {"name": name, "type": type_} for name, type_ in types.items()}
1✔
185

186
        # Assigns the wrapped method to the instance's run()
187
        instance.run = wrapper
1✔
188

189
    def output_types(self, **types):
1✔
190
        """
191
        Decorator factory that validates the output dictionary of the run method.
192

193
        Use as:
194

195
        ```python
196
        @component
197
        class MyComponent:
198
            @component.output_types(output_1=int, output_2=str)
199
            def run(self, value: int):
200
                return {"output_1": 1, "output_2": "2"}
201
        ```
202
        """
203

204
        def output_types_decorator(run_method):
1✔
205
            """
206
            Decorator that validates the output dictionary of the run method.
207
            """
208
            # Store the output types in the run method - used by the pipeline to build the sockets.
209

210
            @wraps(run_method)
1✔
211
            def wrapper(self, *args, **kwargs):
1✔
212
                return run_method(self, *args, **kwargs)
1✔
213

214
            wrapper.__canals_input__ = getattr(run_method, "__canals_input__", {})
1✔
215
            wrapper.__canals_output__ = {name: {"name": name, "type": type_} for name, type_ in types.items()}
1✔
216

217
            return wrapper
1✔
218

219
        return output_types_decorator
1✔
220

221
    def _component(self, class_):
1✔
222
        """
223
        Decorator validating the structure of the component and registering it in the components registry.
224
        """
225
        logger.debug("Registering %s as a component", class_)
1✔
226

227
        # Check for run()
228
        if not hasattr(class_, "run"):
1✔
229
            raise ComponentError(f"{class_.__name__} must have a 'run()' method. See the docs for more information.")
1✔
230
        run_signature = inspect.signature(class_.run)
1✔
231

232
        # Create the input sockets
233
        class_.run.__canals_input__ = {
1✔
234
            param: {
235
                "name": param,
236
                "type": run_signature.parameters[param].annotation,
237
                "is_optional": _is_optional(run_signature.parameters[param].annotation),
238
            }
239
            for param in list(run_signature.parameters)[1:]  # First is 'self' and it doesn't matter.
240
        }
241

242
        # Automatically registers all the init parameters in an instance attribute called `_init_parameters`.
243
        # See `save_init_parameters()`.
244
        class_.__init__ = _prepare_init_params_and_sockets(class_.__init__)
1✔
245

246
        # Save the component in the class registry (for deserialization)
247
        if class_.__name__ in self.registry:
1✔
248
            logger.error(
1✔
249
                "Component %s is already registered. Previous imported from '%s', new imported from '%s'",
250
                class_.__name__,
251
                self.registry[class_.__name__],
252
                class_,
253
            )
254
        self.registry[class_.__name__] = class_
1✔
255
        logger.debug("Registered Component %s", class_)
1✔
256

257
        setattr(class_, "__canals_component__", True)
1✔
258

259
        return class_
1✔
260

261
    def __call__(self, class_=None):
1✔
262
        """Allows us to use this decorator with parenthesis and without."""
263
        if class_:
1✔
264
            return self._component(class_)
1✔
265

266
        return self._component
×
267

268

269
component = _Component()
1✔
270

271

272
def _is_optional(type_: type) -> bool:
1✔
273
    """
274
    Utility method that returns whether a type is Optional.
275
    """
276
    return get_origin(type_) is Union and type(None) in get_args(type_)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc