• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / canals / 6537710819

16 Oct 2023 06:23PM UTC coverage: 92.72% (-0.02%) from 92.739%
6537710819

Pull #128

github

web-flow
Merge 046f66e8b into 4abcbfb33
Pull Request #128: make `__canals_output__` and `__canals_input__` management consistent

151 of 155 branches covered (0.0%)

Branch coverage included in aggregate %.

575 of 628 relevant lines covered (91.56%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.77
canals/component/component.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4
"""
1✔
5
    Attributes:
6

7
        component: Marks a class as a component. Any class decorated with `@component` can be used by a Pipeline.
8

9
    All components must follow the contract below. This docstring is the source of truth for components contract.
10

11
    <hr>
12

13
    `@component` decorator
14

15
    All component classes must be decorated with the `@component` decorator. This allows Canals to discover them.
16

17
    <hr>
18

19
    `__init__(self, **kwargs)`
20

21
    Optional method.
22

23
    Components may have an `__init__` method where they define:
24

25
    - `self.init_parameters = {same parameters that the __init__ method received}`:
26
        In this dictionary you can store any state the components wish to be persisted when they are saved.
27
        These values will be given to the `__init__` method of a new instance when the pipeline is loaded.
28
        Note that by default the `@component` decorator saves the arguments automatically.
29
        However, if a component sets their own `init_parameters` manually in `__init__()`, that will be used instead.
30
        Note: all of the values contained here **must be JSON serializable**. Serialize them manually if needed.
31

32
    Components should take only "basic" Python types as parameters of their `__init__` function, or iterables and
33
    dictionaries containing only such values. Anything else (objects, functions, etc) will raise an exception at init
34
    time. If there's the need for such values, consider serializing them to a string.
35

36
    _(TODO explain how to use classes and functions in init. In the meantime see `test/components/test_accumulate.py`)_
37

38
    The `__init__` must be extrememly lightweight, because it's a frequent operation during the construction and
39
    validation of the pipeline. If a component has some heavy state to initialize (models, backends, etc...) refer to
40
    the `warm_up()` method.
41

42
    <hr>
43

44
    `warm_up(self)`
45

46
    Optional method.
47

48
    This method is called by Pipeline before the graph execution. Make sure to avoid double-initializations,
49
    because Pipeline will not keep track of which components it called `warm_up()` on.
50

51
    <hr>
52

53
    `run(self, data)`
54

55
    Mandatory method.
56

57
    This is the method where the main functionality of the component should be carried out. It's called by
58
    `Pipeline.run()`.
59

60
    When the component should run, Pipeline will call this method with an instance of the dataclass returned by the
61
    method decorated with `@component.input`. This dataclass contains:
62

63
    - all the input values coming from other components connected to it,
64
    - if any is missing, the corresponding value defined in `self.defaults`, if it exists.
65

66
    `run()` must return a single instance of the dataclass declared through the method decorated with
67
    `@component.output`.
68

69
"""
70

71
import logging
1✔
72
import inspect
1✔
73
from typing import Protocol, runtime_checkable, Any
1✔
74
from types import new_class
1✔
75

76

77
from canals.errors import ComponentError
1✔
78
from canals.type_utils import _is_optional
1✔
79

80
logger = logging.getLogger(__name__)
1✔
81

82

83
@runtime_checkable
1✔
84
class Component(Protocol):
1✔
85
    """
86
    Note this is only used by type checking tools.
87

88
    In order to implement the `Component` protocol, custom components need to
89
    have a `run` method. The signature of the method and its return value
90
    won't be checked, i.e. classes with the following methods:
91

92
        def run(self, param: str) -> Dict[str, Any]:
93
            ...
94

95
    and
96

97
        def run(self, **kwargs):
98
            ...
99

100
    will be both considered as respecting the protocol. This makes the type
101
    checking much weaker, but we have other places where we ensure code is
102
    dealing with actual Components.
103

104
    The protocol is runtime checkable so it'll be possible to assert:
105

106
        isinstance(MyComponent, Component)
107
    """
108

109
    def run(self, *args: Any, **kwargs: Any):  # pylint: disable=missing-function-docstring
1✔
110
        ...
×
111

112

113
class ComponentMeta(type):
1✔
114
    def __call__(cls, *args, **kwargs):
1✔
115
        """
116
        This method is called when clients instantiate a Component and
117
        runs before __new__ and __init__.
118
        """
119
        # This will call __new__ then __init__, giving us back the Component instance
120
        instance = super().__call__(*args, **kwargs)
1✔
121

122
        # Before returning, we have the chance to modify the newly created
123
        # Component instance, so we take the chance and set up the I/O sockets
124

125
        # If the __init__ called component.set_output_types(), __canals_output__ is already populated
126
        if not hasattr(instance, "__canals_output__"):
1✔
127
            # if the run method was decorated, it has a _output_types_cache field assigned
128
            instance.__canals_output__ = getattr(instance.run, "_output_types_cache", {})
1✔
129

130
        # If the __init__ called component.set_input_types(), __canals_input__ is already populated
131
        if not hasattr(instance, "__canals_input__"):
1✔
132
            run_signature = inspect.signature(getattr(cls, "run"))
1✔
133
            instance.__canals_input__ = {
1✔
134
                # Create the input sockets
135
                param: {
136
                    "name": param,
137
                    "type": run_signature.parameters[param].annotation,
138
                    "is_optional": _is_optional(run_signature.parameters[param].annotation),
139
                }
140
                for param in list(run_signature.parameters)[1:]  # First is 'self' and it doesn't matter.
141
            }
142

143
        return instance
1✔
144

145

146
class _Component:
1✔
147
    """
148
    See module's docstring.
149

150
    Args:
151
        class_: the class that Canals should use as a component.
152
        serializable: whether to check, at init time, if the component can be saved with
153
        `save_pipelines()`.
154

155
    Returns:
156
        A class that can be recognized as a component.
157

158
    Raises:
159
        ComponentError: if the class provided has no `run()` method or otherwise doesn't respect the component contract.
160
    """
161

162
    def __init__(self):
1✔
163
        self.registry = {}
1✔
164

165
    def set_input_types(self, instance, **types):
1✔
166
        """
167
        Method that specifies the input types when 'kwargs' is passed to the run method.
168

169
        Use as:
170

171
        ```python
172
        @component
173
        class MyComponent:
174

175
            def __init__(self, value: int):
176
                component.set_input_types(value_1=str, value_2=str)
177
                ...
178

179
            @component.output_types(output_1=int, output_2=str)
180
            def run(self, **kwargs):
181
                return {"output_1": kwargs["value_1"], "output_2": ""}
182
        ```
183
        """
184
        instance.__canals_input__ = {
1✔
185
            name: {"name": name, "type": type_, "is_optional": _is_optional(type_)} for name, type_ in types.items()
186
        }
187

188
    def set_output_types(self, instance, **types):
1✔
189
        """
190
        Method that specifies the output types when the 'run' method is not decorated
191
        with 'component.output_types'.
192

193
        Use as:
194

195
        ```python
196
        @component
197
        class MyComponent:
198

199
            def __init__(self, value: int):
200
                component.set_output_types(output_1=int, output_2=str)
201
                ...
202

203
            # no decorators here
204
            def run(self, value: int):
205
                return {"output_1": 1, "output_2": "2"}
206
        ```
207
        """
208
        if not types:
1✔
209
            return
×
210

211
        instance.__canals_output__ = {name: {"name": name, "type": type_} for name, type_ in types.items()}
1✔
212

213
    def output_types(self, **types):
1✔
214
        """
215
        Decorator factory that specifies the output types of a component.
216

217
        Use as:
218

219
        ```python
220
        @component
221
        class MyComponent:
222
            @component.output_types(output_1=int, output_2=str)
223
            def run(self, value: int):
224
                return {"output_1": 1, "output_2": "2"}
225
        ```
226
        """
227

228
        def output_types_decorator(run_method):
1✔
229
            """
230
            This happens at class creation time, and since we don't have the decorated
231
            class available here, we temporarily store the output types as an attribute of
232
            the decorated method. The ComponentMeta metaclass will use this data to create
233
            sockets at instance creation time.
234
            """
235
            setattr(
1✔
236
                run_method,
237
                "_output_types_cache",
238
                {name: {"name": name, "type": type_} for name, type_ in types.items()},
239
            )
240
            return run_method
1✔
241

242
        return output_types_decorator
1✔
243

244
    def _component(self, class_):
1✔
245
        """
246
        Decorator validating the structure of the component and registering it in the components registry.
247
        """
248
        logger.debug("Registering %s as a component", class_)
1✔
249

250
        # Check for required methods and fail as soon as possible
251
        if not hasattr(class_, "run"):
1✔
252
            raise ComponentError(f"{class_.__name__} must have a 'run()' method. See the docs for more information.")
1✔
253

254
        def copy_class_namespace(namespace):
1✔
255
            """
256
            This is the callback that `typing.new_class` will use
257
            to populate the newly created class. We just copy
258
            the whole namespace from the decorated class.
259
            """
260
            for key, val in dict(class_.__dict__).items():
1✔
261
                namespace[key] = val
1✔
262

263
        # Recreate the decorated component class so it uses our metaclass
264
        class_ = new_class(class_.__name__, class_.__bases__, {"metaclass": ComponentMeta}, copy_class_namespace)
1✔
265

266
        # Save the component in the class registry (for deserialization)
267
        if class_.__name__ in self.registry:
1✔
268
            # Corner case, but it may occur easily in notebooks when re-running cells.
269
            logger.debug(
1✔
270
                "Component %s is already registered. Previous imported from '%s', new imported from '%s'",
271
                class_.__name__,
272
                self.registry[class_.__name__],
273
                class_,
274
            )
275
        self.registry[class_.__name__] = class_
1✔
276
        logger.debug("Registered Component %s", class_)
1✔
277

278
        return class_
1✔
279

280
    def __call__(self, class_):
1✔
281
        return self._component(class_)
1✔
282

283

284
component = _Component()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc