• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / canals / 6034797155

31 Aug 2023 08:16AM UTC coverage: 93.023%. Remained the same
6034797155

Pull #104

github

web-flow
Merge 311b52c2c into 8a44be5dd
Pull Request #104: Downgrade log from ERROR to DEBUG

151 of 154 branches covered (0.0%)

Branch coverage included in aggregate %.

569 of 620 relevant lines covered (91.77%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.67
canals/component/component.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4
"""
1✔
5
    Attributes:
6

7
        component: Marks a class as a component. Any class decorated with `@component` can be used by a Pipeline.
8

9
    All components must follow the contract below. This docstring is the source of truth for components contract.
10

11
    <hr>
12

13
    `@component` decorator
14

15
    All component classes must be decorated with the `@component` decorator. This allows Canals to discover them.
16

17
    <hr>
18

19
    `__init__(self, **kwargs)`
20

21
    Optional method.
22

23
    Components may have an `__init__` method where they define:
24

25
    - `self.init_parameters = {same parameters that the __init__ method received}`:
26
        In this dictionary you can store any state the components wish to be persisted when they are saved.
27
        These values will be given to the `__init__` method of a new instance when the pipeline is loaded.
28
        Note that by default the `@component` decorator saves the arguments automatically.
29
        However, if a component sets their own `init_parameters` manually in `__init__()`, that will be used instead.
30
        Note: all of the values contained here **must be JSON serializable**. Serialize them manually if needed.
31

32
    Components should take only "basic" Python types as parameters of their `__init__` function, or iterables and
33
    dictionaries containing only such values. Anything else (objects, functions, etc) will raise an exception at init
34
    time. If there's the need for such values, consider serializing them to a string.
35

36
    _(TODO explain how to use classes and functions in init. In the meantime see `test/components/test_accumulate.py`)_
37

38
    The `__init__` must be extrememly lightweight, because it's a frequent operation during the construction and
39
    validation of the pipeline. If a component has some heavy state to initialize (models, backends, etc...) refer to
40
    the `warm_up()` method.
41

42
    <hr>
43

44
    `warm_up(self)`
45

46
    Optional method.
47

48
    This method is called by Pipeline before the graph execution. Make sure to avoid double-initializations,
49
    because Pipeline will not keep track of which components it called `warm_up()` on.
50

51
    <hr>
52

53
    `run(self, data)`
54

55
    Mandatory method.
56

57
    This is the method where the main functionality of the component should be carried out. It's called by
58
    `Pipeline.run()`.
59

60
    When the component should run, Pipeline will call this method with an instance of the dataclass returned by the
61
    method decorated with `@component.input`. This dataclass contains:
62

63
    - all the input values coming from other components connected to it,
64
    - if any is missing, the corresponding value defined in `self.defaults`, if it exists.
65

66
    `run()` must return a single instance of the dataclass declared through the method decorated with
67
    `@component.output`.
68

69
"""
70

71
import logging
1✔
72
import inspect
1✔
73
from typing import Protocol, Union, Dict, Any, get_origin, get_args
1✔
74
from functools import wraps
1✔
75

76
from canals.errors import ComponentError
1✔
77

78

79
logger = logging.getLogger(__name__)
1✔
80

81

82
class Component(Protocol):
1✔
83
    """
84
    Abstract interface of a Component.
85
    This is only used by type checking tools.
86
    If you want to create a new Component use the @component decorator.
87
    """
88

89
    def run(self, **kwargs) -> Dict[str, Any]:
1✔
90
        """
91
        Takes the Component input and returns its output.
92
        Inputs are defined explicitly by the run method's signature or with `component.set_input_types()` if dynamic.
93
        Outputs are defined by decorating the run method with `@component.output_types()`
94
        or with `component.set_output_types()` if dynamic.
95
        """
96

97
    def to_dict(self) -> Dict[str, Any]:
1✔
98
        """
99
        Serializes the component to a dictionary.
100
        """
101

102
    @classmethod
1✔
103
    def from_dict(cls, data: Dict[str, Any]) -> "Component":
1✔
104
        """
105
        Deserializes the component from a dictionary.
106
        """
107

108

109
class _Component:
1✔
110
    """
111
    See module's docstring.
112

113
    Args:
114
        class_: the class that Canals should use as a component.
115
        serializable: whether to check, at init time, if the component can be saved with
116
        `save_pipelines()`.
117

118
    Returns:
119
        A class that can be recognized as a component.
120

121
    Raises:
122
        ComponentError: if the class provided has no `run()` method or otherwise doesn't respect the component contract.
123
    """
124

125
    def __init__(self):
1✔
126
        self.registry = {}
1✔
127

128
    def set_input_types(self, instance, **types):
1✔
129
        """
130
        Method that validates the input kwargs of the run method.
131

132
        Use as:
133

134
        ```python
135
        @component
136
        class MyComponent:
137

138
            def __init__(self, value: int):
139
                component.set_input_types(value_1=str, value_2=str)
140
                ...
141

142
            @component.output_types(output_1=int, output_2=str)
143
            def run(self, **kwargs):
144
                return {"output_1": kwargs["value_1"], "output_2": ""}
145
        ```
146
        """
147
        run_method = instance.run
1✔
148

149
        def wrapper(**kwargs):
1✔
150
            return run_method(**kwargs)
1✔
151

152
        # Store the input types in the run method
153
        wrapper.__canals_input__ = {
1✔
154
            name: {"name": name, "type": type_, "is_optional": _is_optional(type_)} for name, type_ in types.items()
155
        }
156
        wrapper.__canals_output__ = getattr(run_method, "__canals_output__", {})
1✔
157

158
        # Assigns the wrapped method to the instance's run()
159
        instance.run = wrapper
1✔
160

161
    def set_output_types(self, instance, **types):
1✔
162
        """
163
        Method that validates the output dictionary of the run method.
164

165
        Use as:
166

167
        ```python
168
        @component
169
        class MyComponent:
170

171
            def __init__(self, value: int):
172
                component.set_output_types(output_1=int, output_2=str)
173
                ...
174

175
            def run(self, value: int):
176
                return {"output_1": 1, "output_2": "2"}
177
        ```
178
        """
179
        if not types:
1✔
180
            return
×
181

182
        run_method = instance.run
1✔
183

184
        def wrapper(*args, **kwargs):
1✔
185
            return run_method(*args, **kwargs)
1✔
186

187
        # Store the output types in the run method
188
        wrapper.__canals_input__ = getattr(run_method, "__canals_input__", {})
1✔
189
        wrapper.__canals_output__ = {name: {"name": name, "type": type_} for name, type_ in types.items()}
1✔
190

191
        # Assigns the wrapped method to the instance's run()
192
        instance.run = wrapper
1✔
193

194
    def output_types(self, **types):
1✔
195
        """
196
        Decorator factory that validates the output dictionary of the run method.
197

198
        Use as:
199

200
        ```python
201
        @component
202
        class MyComponent:
203
            @component.output_types(output_1=int, output_2=str)
204
            def run(self, value: int):
205
                return {"output_1": 1, "output_2": "2"}
206
        ```
207
        """
208

209
        def output_types_decorator(run_method):
1✔
210
            """
211
            Decorator that validates the output dictionary of the run method.
212
            """
213
            # Store the output types in the run method - used by the pipeline to build the sockets.
214

215
            @wraps(run_method)
1✔
216
            def wrapper(self, *args, **kwargs):
1✔
217
                return run_method(self, *args, **kwargs)
1✔
218

219
            wrapper.__canals_input__ = getattr(run_method, "__canals_input__", {})
1✔
220
            wrapper.__canals_output__ = {name: {"name": name, "type": type_} for name, type_ in types.items()}
1✔
221

222
            return wrapper
1✔
223

224
        return output_types_decorator
1✔
225

226
    def _component(self, class_):
1✔
227
        """
228
        Decorator validating the structure of the component and registering it in the components registry.
229
        """
230
        logger.debug("Registering %s as a component", class_)
1✔
231

232
        # Check for required methods
233
        if not hasattr(class_, "run"):
1✔
234
            raise ComponentError(f"{class_.__name__} must have a 'run()' method. See the docs for more information.")
1✔
235
        run_signature = inspect.signature(class_.run)
1✔
236

237
        if not hasattr(class_, "to_dict"):
1✔
238
            raise ComponentError(
×
239
                f"{class_.__name__} must have a 'to_dict()' method. See the docs for more information."
240
            )
241

242
        if not hasattr(class_, "from_dict"):
1✔
243
            raise ComponentError(
×
244
                f"{class_.__name__} must have a 'from_dict()' method. See the docs for more information."
245
            )
246

247
        # Create the input sockets
248
        class_.run.__canals_input__ = {
1✔
249
            param: {
250
                "name": param,
251
                "type": run_signature.parameters[param].annotation,
252
                "is_optional": _is_optional(run_signature.parameters[param].annotation),
253
            }
254
            for param in list(run_signature.parameters)[1:]  # First is 'self' and it doesn't matter.
255
        }
256

257
        # Save the component in the class registry (for deserialization)
258
        if class_.__name__ in self.registry:
1✔
259
            # It may occur easily in notebooks by re-running cells.
260
            logger.debug(
1✔
261
                "Component %s is already registered. Previous imported from '%s', new imported from '%s'",
262
                class_.__name__,
263
                self.registry[class_.__name__],
264
                class_,
265
            )
266
        self.registry[class_.__name__] = class_
1✔
267
        logger.debug("Registered Component %s", class_)
1✔
268

269
        setattr(class_, "__canals_component__", True)
1✔
270

271
        return class_
1✔
272

273
    def __call__(self, class_=None):
1✔
274
        """Allows us to use this decorator with parenthesis and without."""
275
        if class_:
1✔
276
            return self._component(class_)
1✔
277

278
        return self._component
×
279

280

281
component = _Component()
1✔
282

283

284
def _is_optional(type_: type) -> bool:
1✔
285
    """
286
    Utility method that returns whether a type is Optional.
287
    """
288
    return get_origin(type_) is Union and type(None) in get_args(type_)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc