• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 13699279219

06 Mar 2025 01:00PM UTC coverage: 89.962% (-0.1%) from 90.074%
13699279219

Pull #8981

github

web-flow
Merge 2a28d2f23 into c4fafd9b0
Pull Request #8981: feat: async support for the `HuggingFaceLocalChatGenerator`

9661 of 10739 relevant lines covered (89.96%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.36
haystack/core/component/component.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
"""
6
Attributes:
7

8
    component: Marks a class as a component. Any class decorated with `@component` can be used by a Pipeline.
9

10
All components must follow the contract below. This docstring is the source of truth for components contract.
11

12
<hr>
13

14
`@component` decorator
15

16
All component classes must be decorated with the `@component` decorator. This allows Haystack to discover them.
17

18
<hr>
19

20
`__init__(self, **kwargs)`
21

22
Optional method.
23

24
Components may have an `__init__` method where they define:
25

26
- `self.init_parameters = {same parameters that the __init__ method received}`:
27
    In this dictionary you can store any state the components wish to be persisted when they are saved.
28
    These values will be given to the `__init__` method of a new instance when the pipeline is loaded.
29
    Note that by default the `@component` decorator saves the arguments automatically.
30
    However, if a component sets their own `init_parameters` manually in `__init__()`, that will be used instead.
31
    Note: all of the values contained here **must be JSON serializable**. Serialize them manually if needed.
32

33
Components should take only "basic" Python types as parameters of their `__init__` function, or iterables and
34
dictionaries containing only such values. Anything else (objects, functions, etc) will raise an exception at init
35
time. If there's the need for such values, consider serializing them to a string.
36

37
_(TODO explain how to use classes and functions in init. In the meantime see `test/components/test_accumulate.py`)_
38

39
The `__init__` must be extremely lightweight, because it's a frequent operation during the construction and
40
validation of the pipeline. If a component has some heavy state to initialize (models, backends, etc...) refer to
41
the `warm_up()` method.
42

43
<hr>
44

45
`warm_up(self)`
46

47
Optional method.
48

49
This method is called by Pipeline before the graph execution. Make sure to avoid double-initializations,
50
because Pipeline will not keep track of which components it called `warm_up()` on.
51

52
<hr>
53

54
`run(self, data)`
55

56
Mandatory method.
57

58
This is the method where the main functionality of the component should be carried out. It's called by
59
`Pipeline.run()`.
60

61
When the component should run, Pipeline will call this method with an instance of the dataclass returned by the
62
method decorated with `@component.input`. This dataclass contains:
63

64
- all the input values coming from other components connected to it,
65
- if any is missing, the corresponding value defined in `self.defaults`, if it exists.
66

67
`run()` must return a single instance of the dataclass declared through the method decorated with
68
`@component.output`.
69

70
"""
71

72
import inspect
1✔
73
from collections.abc import Callable
1✔
74
from contextlib import contextmanager
1✔
75
from contextvars import ContextVar
1✔
76
from copy import deepcopy
1✔
77
from dataclasses import dataclass
1✔
78
from types import new_class
1✔
79
from typing import Any, Dict, Optional, Protocol, Type, runtime_checkable
1✔
80

81
from haystack import logging
1✔
82
from haystack.core.errors import ComponentError
1✔
83

84
from .sockets import Sockets
1✔
85
from .types import InputSocket, OutputSocket, _empty
1✔
86

87
logger = logging.getLogger(__name__)
1✔
88

89

90
@dataclass
1✔
91
class PreInitHookPayload:
1✔
92
    """
93
    Payload for the hook called before a component instance is initialized.
94

95
    :param callback:
96
        Receives the following inputs: component class and init parameter keyword args.
97
    :param in_progress:
98
        Flag to indicate if the hook is currently being executed.
99
        Used to prevent it from being called recursively (if the component's constructor
100
        instantiates another component).
101
    """
102

103
    callback: Callable
1✔
104
    in_progress: bool = False
1✔
105

106

107
_COMPONENT_PRE_INIT_HOOK: ContextVar[Optional[PreInitHookPayload]] = ContextVar("component_pre_init_hook", default=None)
1✔
108

109

110
@contextmanager
1✔
111
def _hook_component_init(callback: Callable):
1✔
112
    """
113
    Context manager to set a callback that will be invoked before a component's constructor is called.
114

115
    The callback receives the component class and the init parameters (as keyword arguments) and can modify the init
116
    parameters in place.
117

118
    :param callback:
119
        Callback function to invoke.
120
    """
121
    token = _COMPONENT_PRE_INIT_HOOK.set(PreInitHookPayload(callback))
1✔
122
    try:
1✔
123
        yield
1✔
124
    finally:
125
        _COMPONENT_PRE_INIT_HOOK.reset(token)
1✔
126

127

128
@runtime_checkable
1✔
129
class Component(Protocol):
1✔
130
    """
131
    Note this is only used by type checking tools.
132

133
    In order to implement the `Component` protocol, custom components need to
134
    have a `run` method. The signature of the method and its return value
135
    won't be checked, i.e. classes with the following methods:
136

137
        def run(self, param: str) -> Dict[str, Any]:
138
            ...
139

140
    and
141

142
        def run(self, **kwargs):
143
            ...
144

145
    will be both considered as respecting the protocol. This makes the type
146
    checking much weaker, but we have other places where we ensure code is
147
    dealing with actual Components.
148

149
    The protocol is runtime checkable so it'll be possible to assert:
150

151
        isinstance(MyComponent, Component)
152
    """
153

154
    # This is the most reliable way to define the protocol for the `run` method.
155
    # Defining a method doesn't work as different Components will have different
156
    # arguments. Even defining here a method with `**kwargs` doesn't work as the
157
    # expected signature must be identical.
158
    # This makes most Language Servers and type checkers happy and shows less errors.
159
    run: Callable[..., Dict[str, Any]]
1✔
160

161

162
class ComponentMeta(type):
1✔
163
    @staticmethod
1✔
164
    def _positional_to_kwargs(cls_type, args) -> Dict[str, Any]:
1✔
165
        """
166
        Convert positional arguments to keyword arguments based on the signature of the `__init__` method.
167
        """
168
        init_signature = inspect.signature(cls_type.__init__)
1✔
169
        init_params = {name: info for name, info in init_signature.parameters.items() if name != "self"}
1✔
170

171
        out = {}
1✔
172
        for arg, (name, info) in zip(args, init_params.items()):
1✔
173
            if info.kind == inspect.Parameter.VAR_POSITIONAL:
1✔
174
                raise ComponentError(
1✔
175
                    "Pre-init hooks do not support components with variadic positional args in their init method"
176
                )
177

178
            assert info.kind in (inspect.Parameter.POSITIONAL_OR_KEYWORD, inspect.Parameter.POSITIONAL_ONLY)
1✔
179
            out[name] = arg
1✔
180
        return out
1✔
181

182
    @staticmethod
1✔
183
    def _parse_and_set_output_sockets(instance: Any):
1✔
184
        has_async_run = hasattr(instance, "run_async")
1✔
185

186
        # If `component.set_output_types()` was called in the component constructor,
187
        # `__haystack_output__` is already populated, no need to do anything.
188
        if not hasattr(instance, "__haystack_output__"):
1✔
189
            # If that's not the case, we need to populate `__haystack_output__`
190
            #
191
            # If either of the run methods were decorated, they'll have a field assigned that
192
            # stores the output specification. If both run methods were decorated, we ensure that
193
            # outputs are the same. We deepcopy the content of the cache to transfer ownership from
194
            # the class method to the actual instance, so that different instances of the same class
195
            # won't share this data.
196

197
            run_output_types = getattr(instance.run, "_output_types_cache", {})
1✔
198
            async_run_output_types = getattr(instance.run_async, "_output_types_cache", {}) if has_async_run else {}
1✔
199

200
            if has_async_run and run_output_types != async_run_output_types:
1✔
201
                raise ComponentError("Output type specifications of 'run' and 'run_async' methods must be the same")
1✔
202
            output_types_cache = run_output_types
1✔
203

204
            instance.__haystack_output__ = Sockets(instance, deepcopy(output_types_cache), OutputSocket)
1✔
205

206
    @staticmethod
1✔
207
    def _parse_and_set_input_sockets(component_cls: Type, instance: Any):
1✔
208
        def inner(method, sockets):
1✔
209
            from inspect import Parameter
1✔
210

211
            run_signature = inspect.signature(method)
1✔
212

213
            for param_name, param_info in run_signature.parameters.items():
1✔
214
                if param_name == "self" or param_info.kind in (Parameter.VAR_POSITIONAL, Parameter.VAR_KEYWORD):
1✔
215
                    continue
1✔
216

217
                socket_kwargs = {"name": param_name, "type": param_info.annotation}
1✔
218
                if param_info.default != Parameter.empty:
1✔
219
                    socket_kwargs["default_value"] = param_info.default
1✔
220

221
                new_socket = InputSocket(**socket_kwargs)
1✔
222

223
                # Also ensure that new sockets don't override existing ones.
224
                existing_socket = sockets.get(param_name)
1✔
225
                if existing_socket is not None and existing_socket != new_socket:
1✔
226
                    raise ComponentError(
1✔
227
                        "set_input_types()/set_input_type() cannot override the parameters of the 'run' method"
228
                    )
229

230
                sockets[param_name] = new_socket
1✔
231

232
            return run_signature
1✔
233

234
        # Create the sockets if set_input_types() wasn't called in the constructor.
235
        if not hasattr(instance, "__haystack_input__"):
1✔
236
            instance.__haystack_input__ = Sockets(instance, {}, InputSocket)
1✔
237

238
        inner(getattr(component_cls, "run"), instance.__haystack_input__)
1✔
239

240
        # Ensure that the sockets are the same for the async method, if it exists.
241
        async_run = getattr(component_cls, "run_async", None)
1✔
242
        if async_run is not None:
1✔
243
            run_sockets = Sockets(instance, {}, InputSocket)
1✔
244
            async_run_sockets = Sockets(instance, {}, InputSocket)
1✔
245

246
            # Can't use the sockets from above as they might contain
247
            # values set with set_input_types().
248
            run_sig = inner(getattr(component_cls, "run"), run_sockets)
1✔
249
            async_run_sig = inner(async_run, async_run_sockets)
1✔
250

251
            if async_run_sockets != run_sockets or run_sig != async_run_sig:
1✔
252
                raise ComponentError("Parameters of 'run' and 'run_async' methods must be the same")
1✔
253

254
    def __call__(cls, *args, **kwargs):
1✔
255
        """
256
        This method is called when clients instantiate a Component and runs before __new__ and __init__.
257
        """
258
        # This will call __new__ then __init__, giving us back the Component instance
259
        pre_init_hook = _COMPONENT_PRE_INIT_HOOK.get()
1✔
260
        if pre_init_hook is None or pre_init_hook.in_progress:
1✔
261
            instance = super().__call__(*args, **kwargs)
1✔
262
        else:
263
            try:
1✔
264
                pre_init_hook.in_progress = True
1✔
265
                named_positional_args = ComponentMeta._positional_to_kwargs(cls, args)
1✔
266
                assert set(named_positional_args.keys()).intersection(kwargs.keys()) == set(), (
1✔
267
                    "positional and keyword arguments overlap"
268
                )
269
                kwargs.update(named_positional_args)
1✔
270
                pre_init_hook.callback(cls, kwargs)
1✔
271
                instance = super().__call__(**kwargs)
1✔
272
            finally:
273
                pre_init_hook.in_progress = False
1✔
274

275
        # Before returning, we have the chance to modify the newly created
276
        # Component instance, so we take the chance and set up the I/O sockets
277
        has_async_run = hasattr(instance, "run_async")
1✔
278
        if has_async_run and not inspect.iscoroutinefunction(instance.run_async):
1✔
279
            raise ComponentError(f"Method 'run_async' of component '{cls.__name__}' must be a coroutine")
1✔
280
        instance.__haystack_supports_async__ = has_async_run
1✔
281

282
        ComponentMeta._parse_and_set_input_sockets(cls, instance)
1✔
283
        ComponentMeta._parse_and_set_output_sockets(instance)
1✔
284

285
        # Since a Component can't be used in multiple Pipelines at the same time
286
        # we need to know if it's already owned by a Pipeline when adding it to one.
287
        # We use this flag to check that.
288
        instance.__haystack_added_to_pipeline__ = None
1✔
289

290
        return instance
1✔
291

292

293
def _component_repr(component: Component) -> str:
1✔
294
    """
295
    All Components override their __repr__ method with this one.
296

297
    It prints the component name and the input/output sockets.
298
    """
299
    result = object.__repr__(component)
1✔
300
    if pipeline := getattr(component, "__haystack_added_to_pipeline__", None):
1✔
301
        # This Component has been added in a Pipeline, let's get the name from there.
302
        result += f"\n{pipeline.get_component_name(component)}"
1✔
303

304
    # We're explicitly ignoring the type here because we're sure that the component
305
    # has the __haystack_input__ and __haystack_output__ attributes at this point
306
    return (
1✔
307
        f"{result}\n{getattr(component, '__haystack_input__', '<invalid_input_sockets>')}"
308
        f"\n{getattr(component, '__haystack_output__', '<invalid_output_sockets>')}"
309
    )
310

311

312
def _component_run_has_kwargs(component_cls: Type) -> bool:
1✔
313
    run_method = getattr(component_cls, "run", None)
1✔
314
    if run_method is None:
1✔
315
        return False
×
316
    else:
317
        return any(
1✔
318
            param.kind == inspect.Parameter.VAR_KEYWORD for param in inspect.signature(run_method).parameters.values()
319
        )
320

321

322
class _Component:
1✔
323
    """
324
    See module's docstring.
325

326
    Args:
327
        cls: the class that should be used as a component.
328

329
    Returns:
330
        A class that can be recognized as a component.
331

332
    Raises:
333
        ComponentError: if the class provided has no `run()` method or otherwise doesn't respect the component contract.
334
    """
335

336
    def __init__(self):
1✔
337
        self.registry = {}
1✔
338

339
    def set_input_type(
1✔
340
        self,
341
        instance,
342
        name: str,
343
        type: Any,  # noqa: A002
344
        default: Any = _empty,
345
    ):
346
        """
347
        Add a single input socket to the component instance.
348

349
        Replaces any existing input socket with the same name.
350

351
        :param instance: Component instance where the input type will be added.
352
        :param name: name of the input socket.
353
        :param type: type of the input socket.
354
        :param default: default value of the input socket, defaults to _empty
355
        """
356
        if not _component_run_has_kwargs(instance.__class__):
1✔
357
            raise ComponentError(
1✔
358
                "Cannot set input types on a component that doesn't have a kwargs parameter in the 'run' method"
359
            )
360

361
        if not hasattr(instance, "__haystack_input__"):
1✔
362
            instance.__haystack_input__ = Sockets(instance, {}, InputSocket)
1✔
363
        instance.__haystack_input__[name] = InputSocket(name=name, type=type, default_value=default)
1✔
364

365
    def set_input_types(self, instance, **types):
1✔
366
        """
367
        Method that specifies the input types when 'kwargs' is passed to the run method.
368

369
        Use as:
370

371
        ```python
372
        @component
373
        class MyComponent:
374

375
            def __init__(self, value: int):
376
                component.set_input_types(self, value_1=str, value_2=str)
377
                ...
378

379
            @component.output_types(output_1=int, output_2=str)
380
            def run(self, **kwargs):
381
                return {"output_1": kwargs["value_1"], "output_2": ""}
382
        ```
383

384
        Note that if the `run()` method also specifies some parameters, those will take precedence.
385

386
        For example:
387

388
        ```python
389
        @component
390
        class MyComponent:
391

392
            def __init__(self, value: int):
393
                component.set_input_types(self, value_1=str, value_2=str)
394
                ...
395

396
            @component.output_types(output_1=int, output_2=str)
397
            def run(self, value_0: str, value_1: Optional[str] = None, **kwargs):
398
                return {"output_1": kwargs["value_1"], "output_2": ""}
399
        ```
400

401
        would add a mandatory `value_0` parameters, make the `value_1`
402
        parameter optional with a default None, and keep the `value_2`
403
        parameter mandatory as specified in `set_input_types`.
404

405
        """
406
        if not _component_run_has_kwargs(instance.__class__):
1✔
407
            raise ComponentError(
1✔
408
                "Cannot set input types on a component that doesn't have a kwargs parameter in the 'run' method"
409
            )
410

411
        instance.__haystack_input__ = Sockets(
1✔
412
            instance, {name: InputSocket(name=name, type=type_) for name, type_ in types.items()}, InputSocket
413
        )
414

415
    def set_output_types(self, instance, **types):
1✔
416
        """
417
        Method that specifies the output types when the 'run' method is not decorated with 'component.output_types'.
418

419
        Use as:
420

421
        ```python
422
        @component
423
        class MyComponent:
424

425
            def __init__(self, value: int):
426
                component.set_output_types(self, output_1=int, output_2=str)
427
                ...
428

429
            # no decorators here
430
            def run(self, value: int):
431
                return {"output_1": 1, "output_2": "2"}
432
        ```
433
        """
434
        has_decorator = hasattr(instance.run, "_output_types_cache")
1✔
435
        if has_decorator:
1✔
436
            raise ComponentError(
1✔
437
                "Cannot call `set_output_types` on a component that already has "
438
                "the 'output_types' decorator on its `run` method"
439
            )
440

441
        instance.__haystack_output__ = Sockets(
1✔
442
            instance, {name: OutputSocket(name=name, type=type_) for name, type_ in types.items()}, OutputSocket
443
        )
444

445
    def output_types(self, **types):
1✔
446
        """
447
        Decorator factory that specifies the output types of a component.
448

449
        Use as:
450

451
        ```python
452
        @component
453
        class MyComponent:
454
            @component.output_types(output_1=int, output_2=str)
455
            def run(self, value: int):
456
                return {"output_1": 1, "output_2": "2"}
457
        ```
458
        """
459

460
        def output_types_decorator(run_method):
1✔
461
            """
462
            Decorator that sets the output types of the decorated method.
463

464
            This happens at class creation time, and since we don't have the decorated
465
            class available here, we temporarily store the output types as an attribute of
466
            the decorated method. The ComponentMeta metaclass will use this data to create
467
            sockets at instance creation time.
468
            """
469
            method_name = run_method.__name__
1✔
470
            if method_name not in ("run", "run_async"):
1✔
471
                raise ComponentError("'output_types' decorator can only be used on 'run' and 'run_async' methods")
1✔
472

473
            setattr(
1✔
474
                run_method,
475
                "_output_types_cache",
476
                {name: OutputSocket(name=name, type=type_) for name, type_ in types.items()},
477
            )
478
            return run_method
1✔
479

480
        return output_types_decorator
1✔
481

482
    def _component(self, cls: Any):
1✔
483
        """
484
        Decorator validating the structure of the component and registering it in the components registry.
485
        """
486
        logger.debug("Registering {component} as a component", component=cls)
1✔
487

488
        # Check for required methods and fail as soon as possible
489
        if not hasattr(cls, "run"):
1✔
490
            raise ComponentError(f"{cls.__name__} must have a 'run()' method. See the docs for more information.")
1✔
491

492
        def copy_class_namespace(namespace):
1✔
493
            """
494
            This is the callback that `typing.new_class` will use to populate the newly created class.
495

496
            Simply copy the whole namespace from the decorated class.
497
            """
498
            for key, val in dict(cls.__dict__).items():
1✔
499
                # __dict__ and __weakref__ are class-bound, we should let Python recreate them.
500
                if key in ("__dict__", "__weakref__"):
1✔
501
                    continue
1✔
502
                namespace[key] = val
1✔
503

504
        # Recreate the decorated component class so it uses our metaclass.
505
        # We must explicitly redefine the type of the class to make sure language servers
506
        # and type checkers understand that the class is of the correct type.
507
        # mypy doesn't like that we do this though so we explicitly ignore the type check.
508
        new_cls: cls.__name__ = new_class(
1✔
509
            cls.__name__, cls.__bases__, {"metaclass": ComponentMeta}, copy_class_namespace
510
        )  # type: ignore[no-redef]
511

512
        # Save the component in the class registry (for deserialization)
513
        class_path = f"{new_cls.__module__}.{new_cls.__name__}"
1✔
514
        if class_path in self.registry:
1✔
515
            # Corner case, but it may occur easily in notebooks when re-running cells.
516
            logger.debug(
1✔
517
                "Component {component} is already registered. Previous imported from '{module_name}', \
518
                new imported from '{new_module_name}'",
519
                component=class_path,
520
                module_name=self.registry[class_path],
521
                new_module_name=new_cls,
522
            )
523
        self.registry[class_path] = new_cls
1✔
524
        logger.debug("Registered Component {component}", component=new_cls)
1✔
525

526
        # Override the __repr__ method with a default one
527
        new_cls.__repr__ = _component_repr
1✔
528

529
        return new_cls
1✔
530

531
    def __call__(self, cls: Optional[type] = None):
1✔
532
        # We must wrap the call to the decorator in a function for it to work
533
        # correctly with or without parens
534
        def wrap(cls):
1✔
535
            return self._component(cls)
1✔
536

537
        if cls:
1✔
538
            # Decorator is called without parens
539
            return wrap(cls)
1✔
540

541
        # Decorator is called with parens
542
        return wrap
1✔
543

544

545
component = _Component()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc