• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 13674921125

05 Mar 2025 11:26AM UTC coverage: 90.081% (+0.007%) from 90.074%
13674921125

Pull #8978

github

web-flow
Merge 4500e0dbc into b77f2bad7
Pull Request #8978: build: drop Python 3.8 support

9627 of 10687 relevant lines covered (90.08%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.37
haystack/core/component/component.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
"""
6
Attributes:
7

8
    component: Marks a class as a component. Any class decorated with `@component` can be used by a Pipeline.
9

10
All components must follow the contract below. This docstring is the source of truth for components contract.
11

12
<hr>
13

14
`@component` decorator
15

16
All component classes must be decorated with the `@component` decorator. This allows Haystack to discover them.
17

18
<hr>
19

20
`__init__(self, **kwargs)`
21

22
Optional method.
23

24
Components may have an `__init__` method where they define:
25

26
- `self.init_parameters = {same parameters that the __init__ method received}`:
27
    In this dictionary you can store any state the components wish to be persisted when they are saved.
28
    These values will be given to the `__init__` method of a new instance when the pipeline is loaded.
29
    Note that by default the `@component` decorator saves the arguments automatically.
30
    However, if a component sets their own `init_parameters` manually in `__init__()`, that will be used instead.
31
    Note: all of the values contained here **must be JSON serializable**. Serialize them manually if needed.
32

33
Components should take only "basic" Python types as parameters of their `__init__` function, or iterables and
34
dictionaries containing only such values. Anything else (objects, functions, etc) will raise an exception at init
35
time. If there's the need for such values, consider serializing them to a string.
36

37
_(TODO explain how to use classes and functions in init. In the meantime see `test/components/test_accumulate.py`)_
38

39
The `__init__` must be extremely lightweight, because it's a frequent operation during the construction and
40
validation of the pipeline. If a component has some heavy state to initialize (models, backends, etc...) refer to
41
the `warm_up()` method.
42

43
<hr>
44

45
`warm_up(self)`
46

47
Optional method.
48

49
This method is called by Pipeline before the graph execution. Make sure to avoid double-initializations,
50
because Pipeline will not keep track of which components it called `warm_up()` on.
51

52
<hr>
53

54
`run(self, data)`
55

56
Mandatory method.
57

58
This is the method where the main functionality of the component should be carried out. It's called by
59
`Pipeline.run()`.
60

61
When the component should run, Pipeline will call this method with an instance of the dataclass returned by the
62
method decorated with `@component.input`. This dataclass contains:
63

64
- all the input values coming from other components connected to it,
65
- if any is missing, the corresponding value defined in `self.defaults`, if it exists.
66

67
`run()` must return a single instance of the dataclass declared through the method decorated with
68
`@component.output`.
69

70
"""
71

72
import inspect
1✔
73
import sys
1✔
74
from collections.abc import Callable
1✔
75
from contextlib import contextmanager
1✔
76
from contextvars import ContextVar
1✔
77
from copy import deepcopy
1✔
78
from dataclasses import dataclass
1✔
79
from types import new_class
1✔
80
from typing import Any, Dict, Optional, Protocol, Type, runtime_checkable
1✔
81

82
from haystack import logging
1✔
83
from haystack.core.errors import ComponentError
1✔
84

85
from .sockets import Sockets
1✔
86
from .types import InputSocket, OutputSocket, _empty
1✔
87

88
logger = logging.getLogger(__name__)
1✔
89

90

91
@dataclass
1✔
92
class PreInitHookPayload:
1✔
93
    """
94
    Payload for the hook called before a component instance is initialized.
95

96
    :param callback:
97
        Receives the following inputs: component class and init parameter keyword args.
98
    :param in_progress:
99
        Flag to indicate if the hook is currently being executed.
100
        Used to prevent it from being called recursively (if the component's constructor
101
        instantiates another component).
102
    """
103

104
    callback: Callable
1✔
105
    in_progress: bool = False
1✔
106

107

108
_COMPONENT_PRE_INIT_HOOK: ContextVar[Optional[PreInitHookPayload]] = ContextVar("component_pre_init_hook", default=None)
1✔
109

110

111
@contextmanager
1✔
112
def _hook_component_init(callback: Callable):
1✔
113
    """
114
    Context manager to set a callback that will be invoked before a component's constructor is called.
115

116
    The callback receives the component class and the init parameters (as keyword arguments) and can modify the init
117
    parameters in place.
118

119
    :param callback:
120
        Callback function to invoke.
121
    """
122
    token = _COMPONENT_PRE_INIT_HOOK.set(PreInitHookPayload(callback))
1✔
123
    try:
1✔
124
        yield
1✔
125
    finally:
126
        _COMPONENT_PRE_INIT_HOOK.reset(token)
1✔
127

128

129
@runtime_checkable
1✔
130
class Component(Protocol):
1✔
131
    """
132
    Note this is only used by type checking tools.
133

134
    In order to implement the `Component` protocol, custom components need to
135
    have a `run` method. The signature of the method and its return value
136
    won't be checked, i.e. classes with the following methods:
137

138
        def run(self, param: str) -> Dict[str, Any]:
139
            ...
140

141
    and
142

143
        def run(self, **kwargs):
144
            ...
145

146
    will be both considered as respecting the protocol. This makes the type
147
    checking much weaker, but we have other places where we ensure code is
148
    dealing with actual Components.
149

150
    The protocol is runtime checkable so it'll be possible to assert:
151

152
        isinstance(MyComponent, Component)
153
    """
154

155
    # This is the most reliable way to define the protocol for the `run` method.
156
    # Defining a method doesn't work as different Components will have different
157
    # arguments. Even defining here a method with `**kwargs` doesn't work as the
158
    # expected signature must be identical.
159
    # This makes most Language Servers and type checkers happy and shows less errors.
160
    run: Callable[..., Dict[str, Any]]
1✔
161

162

163
class ComponentMeta(type):
1✔
164
    @staticmethod
1✔
165
    def _positional_to_kwargs(cls_type, args) -> Dict[str, Any]:
1✔
166
        """
167
        Convert positional arguments to keyword arguments based on the signature of the `__init__` method.
168
        """
169
        init_signature = inspect.signature(cls_type.__init__)
1✔
170
        init_params = {name: info for name, info in init_signature.parameters.items() if name != "self"}
1✔
171

172
        out = {}
1✔
173
        for arg, (name, info) in zip(args, init_params.items()):
1✔
174
            if info.kind == inspect.Parameter.VAR_POSITIONAL:
1✔
175
                raise ComponentError(
1✔
176
                    "Pre-init hooks do not support components with variadic positional args in their init method"
177
                )
178

179
            assert info.kind in (inspect.Parameter.POSITIONAL_OR_KEYWORD, inspect.Parameter.POSITIONAL_ONLY)
1✔
180
            out[name] = arg
1✔
181
        return out
1✔
182

183
    @staticmethod
1✔
184
    def _parse_and_set_output_sockets(instance: Any):
1✔
185
        has_async_run = hasattr(instance, "run_async")
1✔
186

187
        # If `component.set_output_types()` was called in the component constructor,
188
        # `__haystack_output__` is already populated, no need to do anything.
189
        if not hasattr(instance, "__haystack_output__"):
1✔
190
            # If that's not the case, we need to populate `__haystack_output__`
191
            #
192
            # If either of the run methods were decorated, they'll have a field assigned that
193
            # stores the output specification. If both run methods were decorated, we ensure that
194
            # outputs are the same. We deepcopy the content of the cache to transfer ownership from
195
            # the class method to the actual instance, so that different instances of the same class
196
            # won't share this data.
197

198
            run_output_types = getattr(instance.run, "_output_types_cache", {})
1✔
199
            async_run_output_types = getattr(instance.run_async, "_output_types_cache", {}) if has_async_run else {}
1✔
200

201
            if has_async_run and run_output_types != async_run_output_types:
1✔
202
                raise ComponentError("Output type specifications of 'run' and 'run_async' methods must be the same")
1✔
203
            output_types_cache = run_output_types
1✔
204

205
            instance.__haystack_output__ = Sockets(instance, deepcopy(output_types_cache), OutputSocket)
1✔
206

207
    @staticmethod
1✔
208
    def _parse_and_set_input_sockets(component_cls: Type, instance: Any):
1✔
209
        def inner(method, sockets):
1✔
210
            from inspect import Parameter
1✔
211

212
            run_signature = inspect.signature(method)
1✔
213

214
            for param_name, param_info in run_signature.parameters.items():
1✔
215
                if param_name == "self" or param_info.kind in (Parameter.VAR_POSITIONAL, Parameter.VAR_KEYWORD):
1✔
216
                    continue
1✔
217

218
                socket_kwargs = {"name": param_name, "type": param_info.annotation}
1✔
219
                if param_info.default != Parameter.empty:
1✔
220
                    socket_kwargs["default_value"] = param_info.default
1✔
221

222
                new_socket = InputSocket(**socket_kwargs)
1✔
223

224
                # Also ensure that new sockets don't override existing ones.
225
                existing_socket = sockets.get(param_name)
1✔
226
                if existing_socket is not None and existing_socket != new_socket:
1✔
227
                    raise ComponentError(
1✔
228
                        "set_input_types()/set_input_type() cannot override the parameters of the 'run' method"
229
                    )
230

231
                sockets[param_name] = new_socket
1✔
232

233
            return run_signature
1✔
234

235
        # Create the sockets if set_input_types() wasn't called in the constructor.
236
        if not hasattr(instance, "__haystack_input__"):
1✔
237
            instance.__haystack_input__ = Sockets(instance, {}, InputSocket)
1✔
238

239
        inner(getattr(component_cls, "run"), instance.__haystack_input__)
1✔
240

241
        # Ensure that the sockets are the same for the async method, if it exists.
242
        async_run = getattr(component_cls, "run_async", None)
1✔
243
        if async_run is not None:
1✔
244
            run_sockets = Sockets(instance, {}, InputSocket)
1✔
245
            async_run_sockets = Sockets(instance, {}, InputSocket)
1✔
246

247
            # Can't use the sockets from above as they might contain
248
            # values set with set_input_types().
249
            run_sig = inner(getattr(component_cls, "run"), run_sockets)
1✔
250
            async_run_sig = inner(async_run, async_run_sockets)
1✔
251

252
            if async_run_sockets != run_sockets or run_sig != async_run_sig:
1✔
253
                raise ComponentError("Parameters of 'run' and 'run_async' methods must be the same")
1✔
254

255
    def __call__(cls, *args, **kwargs):
1✔
256
        """
257
        This method is called when clients instantiate a Component and runs before __new__ and __init__.
258
        """
259
        # This will call __new__ then __init__, giving us back the Component instance
260
        pre_init_hook = _COMPONENT_PRE_INIT_HOOK.get()
1✔
261
        if pre_init_hook is None or pre_init_hook.in_progress:
1✔
262
            instance = super().__call__(*args, **kwargs)
1✔
263
        else:
264
            try:
1✔
265
                pre_init_hook.in_progress = True
1✔
266
                named_positional_args = ComponentMeta._positional_to_kwargs(cls, args)
1✔
267
                assert set(named_positional_args.keys()).intersection(kwargs.keys()) == set(), (
1✔
268
                    "positional and keyword arguments overlap"
269
                )
270
                kwargs.update(named_positional_args)
1✔
271
                pre_init_hook.callback(cls, kwargs)
1✔
272
                instance = super().__call__(**kwargs)
1✔
273
            finally:
274
                pre_init_hook.in_progress = False
1✔
275

276
        # Before returning, we have the chance to modify the newly created
277
        # Component instance, so we take the chance and set up the I/O sockets
278
        has_async_run = hasattr(instance, "run_async")
1✔
279
        if has_async_run and not inspect.iscoroutinefunction(instance.run_async):
1✔
280
            raise ComponentError(f"Method 'run_async' of component '{cls.__name__}' must be a coroutine")
1✔
281
        instance.__haystack_supports_async__ = has_async_run
1✔
282

283
        ComponentMeta._parse_and_set_input_sockets(cls, instance)
1✔
284
        ComponentMeta._parse_and_set_output_sockets(instance)
1✔
285

286
        # Since a Component can't be used in multiple Pipelines at the same time
287
        # we need to know if it's already owned by a Pipeline when adding it to one.
288
        # We use this flag to check that.
289
        instance.__haystack_added_to_pipeline__ = None
1✔
290

291
        return instance
1✔
292

293

294
def _component_repr(component: Component) -> str:
1✔
295
    """
296
    All Components override their __repr__ method with this one.
297

298
    It prints the component name and the input/output sockets.
299
    """
300
    result = object.__repr__(component)
1✔
301
    if pipeline := getattr(component, "__haystack_added_to_pipeline__", None):
1✔
302
        # This Component has been added in a Pipeline, let's get the name from there.
303
        result += f"\n{pipeline.get_component_name(component)}"
1✔
304

305
    # We're explicitly ignoring the type here because we're sure that the component
306
    # has the __haystack_input__ and __haystack_output__ attributes at this point
307
    return (
1✔
308
        f"{result}\n{getattr(component, '__haystack_input__', '<invalid_input_sockets>')}"
309
        f"\n{getattr(component, '__haystack_output__', '<invalid_output_sockets>')}"
310
    )
311

312

313
def _component_run_has_kwargs(component_cls: Type) -> bool:
1✔
314
    run_method = getattr(component_cls, "run", None)
1✔
315
    if run_method is None:
1✔
316
        return False
×
317
    else:
318
        return any(
1✔
319
            param.kind == inspect.Parameter.VAR_KEYWORD for param in inspect.signature(run_method).parameters.values()
320
        )
321

322

323
class _Component:
1✔
324
    """
325
    See module's docstring.
326

327
    Args:
328
        cls: the class that should be used as a component.
329

330
    Returns:
331
        A class that can be recognized as a component.
332

333
    Raises:
334
        ComponentError: if the class provided has no `run()` method or otherwise doesn't respect the component contract.
335
    """
336

337
    def __init__(self):
1✔
338
        self.registry = {}
1✔
339

340
    def set_input_type(
1✔
341
        self,
342
        instance,
343
        name: str,
344
        type: Any,  # noqa: A002
345
        default: Any = _empty,
346
    ):
347
        """
348
        Add a single input socket to the component instance.
349

350
        Replaces any existing input socket with the same name.
351

352
        :param instance: Component instance where the input type will be added.
353
        :param name: name of the input socket.
354
        :param type: type of the input socket.
355
        :param default: default value of the input socket, defaults to _empty
356
        """
357
        if not _component_run_has_kwargs(instance.__class__):
1✔
358
            raise ComponentError(
1✔
359
                "Cannot set input types on a component that doesn't have a kwargs parameter in the 'run' method"
360
            )
361

362
        if not hasattr(instance, "__haystack_input__"):
1✔
363
            instance.__haystack_input__ = Sockets(instance, {}, InputSocket)
1✔
364
        instance.__haystack_input__[name] = InputSocket(name=name, type=type, default_value=default)
1✔
365

366
    def set_input_types(self, instance, **types):
1✔
367
        """
368
        Method that specifies the input types when 'kwargs' is passed to the run method.
369

370
        Use as:
371

372
        ```python
373
        @component
374
        class MyComponent:
375

376
            def __init__(self, value: int):
377
                component.set_input_types(self, value_1=str, value_2=str)
378
                ...
379

380
            @component.output_types(output_1=int, output_2=str)
381
            def run(self, **kwargs):
382
                return {"output_1": kwargs["value_1"], "output_2": ""}
383
        ```
384

385
        Note that if the `run()` method also specifies some parameters, those will take precedence.
386

387
        For example:
388

389
        ```python
390
        @component
391
        class MyComponent:
392

393
            def __init__(self, value: int):
394
                component.set_input_types(self, value_1=str, value_2=str)
395
                ...
396

397
            @component.output_types(output_1=int, output_2=str)
398
            def run(self, value_0: str, value_1: Optional[str] = None, **kwargs):
399
                return {"output_1": kwargs["value_1"], "output_2": ""}
400
        ```
401

402
        would add a mandatory `value_0` parameters, make the `value_1`
403
        parameter optional with a default None, and keep the `value_2`
404
        parameter mandatory as specified in `set_input_types`.
405

406
        """
407
        if not _component_run_has_kwargs(instance.__class__):
1✔
408
            raise ComponentError(
1✔
409
                "Cannot set input types on a component that doesn't have a kwargs parameter in the 'run' method"
410
            )
411

412
        instance.__haystack_input__ = Sockets(
1✔
413
            instance, {name: InputSocket(name=name, type=type_) for name, type_ in types.items()}, InputSocket
414
        )
415

416
    def set_output_types(self, instance, **types):
1✔
417
        """
418
        Method that specifies the output types when the 'run' method is not decorated with 'component.output_types'.
419

420
        Use as:
421

422
        ```python
423
        @component
424
        class MyComponent:
425

426
            def __init__(self, value: int):
427
                component.set_output_types(self, output_1=int, output_2=str)
428
                ...
429

430
            # no decorators here
431
            def run(self, value: int):
432
                return {"output_1": 1, "output_2": "2"}
433
        ```
434
        """
435
        has_decorator = hasattr(instance.run, "_output_types_cache")
1✔
436
        if has_decorator:
1✔
437
            raise ComponentError(
1✔
438
                "Cannot call `set_output_types` on a component that already has "
439
                "the 'output_types' decorator on its `run` method"
440
            )
441

442
        instance.__haystack_output__ = Sockets(
1✔
443
            instance, {name: OutputSocket(name=name, type=type_) for name, type_ in types.items()}, OutputSocket
444
        )
445

446
    def output_types(self, **types):
1✔
447
        """
448
        Decorator factory that specifies the output types of a component.
449

450
        Use as:
451

452
        ```python
453
        @component
454
        class MyComponent:
455
            @component.output_types(output_1=int, output_2=str)
456
            def run(self, value: int):
457
                return {"output_1": 1, "output_2": "2"}
458
        ```
459
        """
460

461
        def output_types_decorator(run_method):
1✔
462
            """
463
            Decorator that sets the output types of the decorated method.
464

465
            This happens at class creation time, and since we don't have the decorated
466
            class available here, we temporarily store the output types as an attribute of
467
            the decorated method. The ComponentMeta metaclass will use this data to create
468
            sockets at instance creation time.
469
            """
470
            method_name = run_method.__name__
1✔
471
            if method_name not in ("run", "run_async"):
1✔
472
                raise ComponentError("'output_types' decorator can only be used on 'run' and 'run_async' methods")
1✔
473

474
            setattr(
1✔
475
                run_method,
476
                "_output_types_cache",
477
                {name: OutputSocket(name=name, type=type_) for name, type_ in types.items()},
478
            )
479
            return run_method
1✔
480

481
        return output_types_decorator
1✔
482

483
    def _component(self, cls: Any):
1✔
484
        """
485
        Decorator validating the structure of the component and registering it in the components registry.
486
        """
487
        logger.debug("Registering {component} as a component", component=cls)
1✔
488

489
        # Check for required methods and fail as soon as possible
490
        if not hasattr(cls, "run"):
1✔
491
            raise ComponentError(f"{cls.__name__} must have a 'run()' method. See the docs for more information.")
1✔
492

493
        def copy_class_namespace(namespace):
1✔
494
            """
495
            This is the callback that `typing.new_class` will use to populate the newly created class.
496

497
            Simply copy the whole namespace from the decorated class.
498
            """
499
            for key, val in dict(cls.__dict__).items():
1✔
500
                # __dict__ and __weakref__ are class-bound, we should let Python recreate them.
501
                if key in ("__dict__", "__weakref__"):
1✔
502
                    continue
1✔
503
                namespace[key] = val
1✔
504

505
        # Recreate the decorated component class so it uses our metaclass.
506
        # We must explicitly redefine the type of the class to make sure language servers
507
        # and type checkers understand that the class is of the correct type.
508
        # mypy doesn't like that we do this though so we explicitly ignore the type check.
509
        new_cls: cls.__name__ = new_class(
1✔
510
            cls.__name__, cls.__bases__, {"metaclass": ComponentMeta}, copy_class_namespace
511
        )  # type: ignore[no-redef]
512

513
        # Save the component in the class registry (for deserialization)
514
        class_path = f"{new_cls.__module__}.{new_cls.__name__}"
1✔
515
        if class_path in self.registry:
1✔
516
            # Corner case, but it may occur easily in notebooks when re-running cells.
517
            logger.debug(
1✔
518
                "Component {component} is already registered. Previous imported from '{module_name}', \
519
                new imported from '{new_module_name}'",
520
                component=class_path,
521
                module_name=self.registry[class_path],
522
                new_module_name=new_cls,
523
            )
524
        self.registry[class_path] = new_cls
1✔
525
        logger.debug("Registered Component {component}", component=new_cls)
1✔
526

527
        # Override the __repr__ method with a default one
528
        new_cls.__repr__ = _component_repr
1✔
529

530
        return new_cls
1✔
531

532
    def __call__(self, cls: Optional[type] = None):
1✔
533
        # We must wrap the call to the decorator in a function for it to work
534
        # correctly with or without parens
535
        def wrap(cls):
1✔
536
            return self._component(cls)
1✔
537

538
        if cls:
1✔
539
            # Decorator is called without parens
540
            return wrap(cls)
1✔
541

542
        # Decorator is called with parens
543
        return wrap
1✔
544

545

546
component = _Component()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc