• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / canals / 6072170279

04 Sep 2023 10:21AM UTC coverage: 92.812% (-0.2%) from 93.023%
6072170279

push

github

web-flow
Make to/from_dict optional (#107)

* remove from/to dict from Protocol

* use a default marshaller

* example component with no serializers

* fix linting

* make it smarter

* fix linting

* thank you mypy protector of the dumb programmers

151 of 155 branches covered (0.0%)

Branch coverage included in aggregate %.

585 of 638 relevant lines covered (91.69%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.92
canals/component/component.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4
"""
1✔
5
    Attributes:
6

7
        component: Marks a class as a component. Any class decorated with `@component` can be used by a Pipeline.
8

9
    All components must follow the contract below. This docstring is the source of truth for components contract.
10

11
    <hr>
12

13
    `@component` decorator
14

15
    All component classes must be decorated with the `@component` decorator. This allows Canals to discover them.
16

17
    <hr>
18

19
    `__init__(self, **kwargs)`
20

21
    Optional method.
22

23
    Components may have an `__init__` method where they define:
24

25
    - `self.init_parameters = {same parameters that the __init__ method received}`:
26
        In this dictionary you can store any state the components wish to be persisted when they are saved.
27
        These values will be given to the `__init__` method of a new instance when the pipeline is loaded.
28
        Note that by default the `@component` decorator saves the arguments automatically.
29
        However, if a component sets their own `init_parameters` manually in `__init__()`, that will be used instead.
30
        Note: all of the values contained here **must be JSON serializable**. Serialize them manually if needed.
31

32
    Components should take only "basic" Python types as parameters of their `__init__` function, or iterables and
33
    dictionaries containing only such values. Anything else (objects, functions, etc) will raise an exception at init
34
    time. If there's the need for such values, consider serializing them to a string.
35

36
    _(TODO explain how to use classes and functions in init. In the meantime see `test/components/test_accumulate.py`)_
37

38
    The `__init__` must be extrememly lightweight, because it's a frequent operation during the construction and
39
    validation of the pipeline. If a component has some heavy state to initialize (models, backends, etc...) refer to
40
    the `warm_up()` method.
41

42
    <hr>
43

44
    `warm_up(self)`
45

46
    Optional method.
47

48
    This method is called by Pipeline before the graph execution. Make sure to avoid double-initializations,
49
    because Pipeline will not keep track of which components it called `warm_up()` on.
50

51
    <hr>
52

53
    `run(self, data)`
54

55
    Mandatory method.
56

57
    This is the method where the main functionality of the component should be carried out. It's called by
58
    `Pipeline.run()`.
59

60
    When the component should run, Pipeline will call this method with an instance of the dataclass returned by the
61
    method decorated with `@component.input`. This dataclass contains:
62

63
    - all the input values coming from other components connected to it,
64
    - if any is missing, the corresponding value defined in `self.defaults`, if it exists.
65

66
    `run()` must return a single instance of the dataclass declared through the method decorated with
67
    `@component.output`.
68

69
"""
70

71
import logging
1✔
72
import inspect
1✔
73
from typing import Protocol, Union, Dict, Any, get_origin, get_args
1✔
74
from functools import wraps
1✔
75

76
from canals.errors import ComponentError
1✔
77

78

79
logger = logging.getLogger(__name__)
1✔
80

81

82
class Component(Protocol):
1✔
83
    """
84
    Abstract interface of a Component.
85
    This is only used by type checking tools.
86
    If you want to create a new Component use the @component decorator.
87
    """
88

89
    def run(self, **kwargs) -> Dict[str, Any]:
1✔
90
        """
91
        Takes the Component input and returns its output.
92
        Inputs are defined explicitly by the run method's signature or with `component.set_input_types()` if dynamic.
93
        Outputs are defined by decorating the run method with `@component.output_types()`
94
        or with `component.set_output_types()` if dynamic.
95
        """
96

97

98
class _Component:
1✔
99
    """
100
    See module's docstring.
101

102
    Args:
103
        class_: the class that Canals should use as a component.
104
        serializable: whether to check, at init time, if the component can be saved with
105
        `save_pipelines()`.
106

107
    Returns:
108
        A class that can be recognized as a component.
109

110
    Raises:
111
        ComponentError: if the class provided has no `run()` method or otherwise doesn't respect the component contract.
112
    """
113

114
    def __init__(self):
1✔
115
        self.registry = {}
1✔
116

117
    def set_input_types(self, instance, **types):
1✔
118
        """
119
        Method that validates the input kwargs of the run method.
120

121
        Use as:
122

123
        ```python
124
        @component
125
        class MyComponent:
126

127
            def __init__(self, value: int):
128
                component.set_input_types(value_1=str, value_2=str)
129
                ...
130

131
            @component.output_types(output_1=int, output_2=str)
132
            def run(self, **kwargs):
133
                return {"output_1": kwargs["value_1"], "output_2": ""}
134
        ```
135
        """
136
        run_method = instance.run
1✔
137

138
        def wrapper(**kwargs):
1✔
139
            return run_method(**kwargs)
1✔
140

141
        # Store the input types in the run method
142
        wrapper.__canals_input__ = {
1✔
143
            name: {"name": name, "type": type_, "is_optional": _is_optional(type_)} for name, type_ in types.items()
144
        }
145
        wrapper.__canals_output__ = getattr(run_method, "__canals_output__", {})
1✔
146

147
        # Assigns the wrapped method to the instance's run()
148
        instance.run = wrapper
1✔
149

150
    def set_output_types(self, instance, **types):
1✔
151
        """
152
        Method that validates the output dictionary of the run method.
153

154
        Use as:
155

156
        ```python
157
        @component
158
        class MyComponent:
159

160
            def __init__(self, value: int):
161
                component.set_output_types(output_1=int, output_2=str)
162
                ...
163

164
            def run(self, value: int):
165
                return {"output_1": 1, "output_2": "2"}
166
        ```
167
        """
168
        if not types:
1✔
169
            return
×
170

171
        run_method = instance.run
1✔
172

173
        def wrapper(*args, **kwargs):
1✔
174
            return run_method(*args, **kwargs)
1✔
175

176
        # Store the output types in the run method
177
        wrapper.__canals_input__ = getattr(run_method, "__canals_input__", {})
1✔
178
        wrapper.__canals_output__ = {name: {"name": name, "type": type_} for name, type_ in types.items()}
1✔
179

180
        # Assigns the wrapped method to the instance's run()
181
        instance.run = wrapper
1✔
182

183
    def output_types(self, **types):
1✔
184
        """
185
        Decorator factory that validates the output dictionary of the run method.
186

187
        Use as:
188

189
        ```python
190
        @component
191
        class MyComponent:
192
            @component.output_types(output_1=int, output_2=str)
193
            def run(self, value: int):
194
                return {"output_1": 1, "output_2": "2"}
195
        ```
196
        """
197

198
        def output_types_decorator(run_method):
1✔
199
            """
200
            Decorator that validates the output dictionary of the run method.
201
            """
202
            # Store the output types in the run method - used by the pipeline to build the sockets.
203

204
            @wraps(run_method)
1✔
205
            def wrapper(self, *args, **kwargs):
1✔
206
                return run_method(self, *args, **kwargs)
1✔
207

208
            wrapper.__canals_input__ = getattr(run_method, "__canals_input__", {})
1✔
209
            wrapper.__canals_output__ = {name: {"name": name, "type": type_} for name, type_ in types.items()}
1✔
210

211
            return wrapper
1✔
212

213
        return output_types_decorator
1✔
214

215
    def _component(self, class_):
1✔
216
        """
217
        Decorator validating the structure of the component and registering it in the components registry.
218
        """
219
        logger.debug("Registering %s as a component", class_)
1✔
220

221
        # Check for required methods
222
        if not hasattr(class_, "run"):
1✔
223
            raise ComponentError(f"{class_.__name__} must have a 'run()' method. See the docs for more information.")
1✔
224
        run_signature = inspect.signature(class_.run)
1✔
225

226
        # Create the input sockets
227
        class_.run.__canals_input__ = {
1✔
228
            param: {
229
                "name": param,
230
                "type": run_signature.parameters[param].annotation,
231
                "is_optional": _is_optional(run_signature.parameters[param].annotation),
232
            }
233
            for param in list(run_signature.parameters)[1:]  # First is 'self' and it doesn't matter.
234
        }
235

236
        # Save the component in the class registry (for deserialization)
237
        if class_.__name__ in self.registry:
1✔
238
            # It may occur easily in notebooks by re-running cells.
239
            logger.debug(
1✔
240
                "Component %s is already registered. Previous imported from '%s', new imported from '%s'",
241
                class_.__name__,
242
                self.registry[class_.__name__],
243
                class_,
244
            )
245
        self.registry[class_.__name__] = class_
1✔
246
        logger.debug("Registered Component %s", class_)
1✔
247

248
        setattr(class_, "__canals_component__", True)
1✔
249

250
        return class_
1✔
251

252
    def __call__(self, class_=None):
1✔
253
        """Allows us to use this decorator with parenthesis and without."""
254
        if class_:
1✔
255
            return self._component(class_)
1✔
256

257
        return self._component
×
258

259

260
component = _Component()
1✔
261

262

263
def _is_optional(type_: type) -> bool:
1✔
264
    """
265
    Utility method that returns whether a type is Optional.
266
    """
267
    return get_origin(type_) is Union and type(None) in get_args(type_)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc