• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 15632259890

13 Jun 2025 10:20AM UTC coverage: 90.281% (+0.004%) from 90.277%
15632259890

Pull #9513

github

web-flow
Merge 9056ed967 into a28b2851d
Pull Request #9513: chore: improve `select_streaming_callback` type hints

11556 of 12800 relevant lines covered (90.28%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.34
haystack/components/generators/hugging_face_api.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
from dataclasses import asdict
1✔
6
from datetime import datetime
1✔
7
from typing import Any, Dict, Iterable, List, Optional, Union, cast
1✔
8

9
from haystack import component, default_from_dict, default_to_dict
1✔
10
from haystack.dataclasses import (
1✔
11
    ComponentInfo,
12
    StreamingCallbackT,
13
    StreamingChunk,
14
    SyncStreamingCallbackT,
15
    select_streaming_callback,
16
)
17
from haystack.lazy_imports import LazyImport
1✔
18
from haystack.utils import Secret, deserialize_callable, deserialize_secrets_inplace, serialize_callable
1✔
19
from haystack.utils.hf import HFGenerationAPIType, HFModelType, check_valid_model
1✔
20
from haystack.utils.url_validation import is_valid_http_url
1✔
21

22
with LazyImport(message="Run 'pip install \"huggingface_hub>=0.27.0\"'") as huggingface_hub_import:
1✔
23
    from huggingface_hub import (
1✔
24
        InferenceClient,
25
        TextGenerationOutput,
26
        TextGenerationStreamOutput,
27
        TextGenerationStreamOutputToken,
28
    )
29

30

31
@component
1✔
32
class HuggingFaceAPIGenerator:
1✔
33
    """
34
    Generates text using Hugging Face APIs.
35

36
    Use it with the following Hugging Face APIs:
37
    - [Free Serverless Inference API]((https://huggingface.co/inference-api)
38
    - [Paid Inference Endpoints](https://huggingface.co/inference-endpoints)
39
    - [Self-hosted Text Generation Inference](https://github.com/huggingface/text-generation-inference)
40

41
    ### Usage examples
42

43
    #### With the free serverless inference API
44

45
    ```python
46
    from haystack.components.generators import HuggingFaceAPIGenerator
47
    from haystack.utils import Secret
48

49
    generator = HuggingFaceAPIGenerator(api_type="serverless_inference_api",
50
                                        api_params={"model": "HuggingFaceH4/zephyr-7b-beta"},
51
                                        token=Secret.from_token("<your-api-key>"))
52

53
    result = generator.run(prompt="What's Natural Language Processing?")
54
    print(result)
55
    ```
56

57
    #### With paid inference endpoints
58

59
    ```python
60
    from haystack.components.generators import HuggingFaceAPIGenerator
61
    from haystack.utils import Secret
62

63
    generator = HuggingFaceAPIGenerator(api_type="inference_endpoints",
64
                                        api_params={"url": "<your-inference-endpoint-url>"},
65
                                        token=Secret.from_token("<your-api-key>"))
66

67
    result = generator.run(prompt="What's Natural Language Processing?")
68
    print(result)
69

70
    #### With self-hosted text generation inference
71
    ```python
72
    from haystack.components.generators import HuggingFaceAPIGenerator
73

74
    generator = HuggingFaceAPIGenerator(api_type="text_generation_inference",
75
                                        api_params={"url": "http://localhost:8080"})
76

77
    result = generator.run(prompt="What's Natural Language Processing?")
78
    print(result)
79
    ```
80
    """
81

82
    def __init__(  # pylint: disable=too-many-positional-arguments
1✔
83
        self,
84
        api_type: Union[HFGenerationAPIType, str],
85
        api_params: Dict[str, str],
86
        token: Optional[Secret] = Secret.from_env_var(["HF_API_TOKEN", "HF_TOKEN"], strict=False),
87
        generation_kwargs: Optional[Dict[str, Any]] = None,
88
        stop_words: Optional[List[str]] = None,
89
        streaming_callback: Optional[StreamingCallbackT] = None,
90
    ):
91
        """
92
        Initialize the HuggingFaceAPIGenerator instance.
93

94
        :param api_type:
95
            The type of Hugging Face API to use. Available types:
96
            - `text_generation_inference`: See [TGI](https://github.com/huggingface/text-generation-inference).
97
            - `inference_endpoints`: See [Inference Endpoints](https://huggingface.co/inference-endpoints).
98
            - `serverless_inference_api`: See [Serverless Inference API](https://huggingface.co/inference-api).
99
        :param api_params:
100
            A dictionary with the following keys:
101
            - `model`: Hugging Face model ID. Required when `api_type` is `SERVERLESS_INFERENCE_API`.
102
            - `url`: URL of the inference endpoint. Required when `api_type` is `INFERENCE_ENDPOINTS` or
103
            `TEXT_GENERATION_INFERENCE`.
104
            - Other parameters specific to the chosen API type, such as `timeout`, `headers`, `provider` etc.
105
        :param token: The Hugging Face token to use as HTTP bearer authorization.
106
            Check your HF token in your [account settings](https://huggingface.co/settings/tokens).
107
        :param generation_kwargs:
108
            A dictionary with keyword arguments to customize text generation. Some examples: `max_new_tokens`,
109
            `temperature`, `top_k`, `top_p`.
110
            For details, see [Hugging Face documentation](https://huggingface.co/docs/huggingface_hub/en/package_reference/inference_client#huggingface_hub.InferenceClient.text_generation)
111
            for more information.
112
        :param stop_words: An optional list of strings representing the stop words.
113
        :param streaming_callback: An optional callable for handling streaming responses.
114
        """
115

116
        huggingface_hub_import.check()
1✔
117

118
        if isinstance(api_type, str):
1✔
119
            api_type = HFGenerationAPIType.from_str(api_type)
1✔
120

121
        if api_type == HFGenerationAPIType.SERVERLESS_INFERENCE_API:
1✔
122
            model = api_params.get("model")
1✔
123
            if model is None:
1✔
124
                raise ValueError(
1✔
125
                    "To use the Serverless Inference API, you need to specify the `model` parameter in `api_params`."
126
                )
127
            check_valid_model(model, HFModelType.GENERATION, token)
1✔
128
            model_or_url = model
1✔
129
        elif api_type in [HFGenerationAPIType.INFERENCE_ENDPOINTS, HFGenerationAPIType.TEXT_GENERATION_INFERENCE]:
1✔
130
            url = api_params.get("url")
1✔
131
            if url is None:
1✔
132
                msg = (
1✔
133
                    "To use Text Generation Inference or Inference Endpoints, you need to specify the `url` "
134
                    "parameter in `api_params`."
135
                )
136
                raise ValueError(msg)
1✔
137
            if not is_valid_http_url(url):
1✔
138
                raise ValueError(f"Invalid URL: {url}")
1✔
139
            model_or_url = url
1✔
140
        else:
141
            msg = f"Unknown api_type {api_type}"
×
142
            raise ValueError(msg)
×
143

144
        # handle generation kwargs setup
145
        generation_kwargs = generation_kwargs.copy() if generation_kwargs else {}
1✔
146
        generation_kwargs["stop_sequences"] = generation_kwargs.get("stop_sequences", [])
1✔
147
        generation_kwargs["stop_sequences"].extend(stop_words or [])
1✔
148
        generation_kwargs.setdefault("max_new_tokens", 512)
1✔
149

150
        self.api_type = api_type
1✔
151
        self.api_params = api_params
1✔
152
        self.token = token
1✔
153
        self.generation_kwargs = generation_kwargs
1✔
154
        self.streaming_callback = streaming_callback
1✔
155

156
        resolved_api_params: Dict[str, Any] = {k: v for k, v in api_params.items() if k != "model" and k != "url"}
1✔
157
        self._client = InferenceClient(
1✔
158
            model_or_url, token=token.resolve_value() if token else None, **resolved_api_params
159
        )
160

161
    def to_dict(self) -> Dict[str, Any]:
1✔
162
        """
163
        Serialize this component to a dictionary.
164

165
        :returns:
166
            A dictionary containing the serialized component.
167
        """
168
        callback_name = serialize_callable(self.streaming_callback) if self.streaming_callback else None
1✔
169
        return default_to_dict(
1✔
170
            self,
171
            api_type=str(self.api_type),
172
            api_params=self.api_params,
173
            token=self.token.to_dict() if self.token else None,
174
            generation_kwargs=self.generation_kwargs,
175
            streaming_callback=callback_name,
176
        )
177

178
    @classmethod
1✔
179
    def from_dict(cls, data: Dict[str, Any]) -> "HuggingFaceAPIGenerator":
1✔
180
        """
181
        Deserialize this component from a dictionary.
182
        """
183
        deserialize_secrets_inplace(data["init_parameters"], keys=["token"])
1✔
184
        init_params = data["init_parameters"]
1✔
185
        serialized_callback_handler = init_params.get("streaming_callback")
1✔
186
        if serialized_callback_handler:
1✔
187
            init_params["streaming_callback"] = deserialize_callable(serialized_callback_handler)
1✔
188
        return default_from_dict(cls, data)
1✔
189

190
    @component.output_types(replies=List[str], meta=List[Dict[str, Any]])
1✔
191
    def run(
1✔
192
        self,
193
        prompt: str,
194
        streaming_callback: Optional[StreamingCallbackT] = None,
195
        generation_kwargs: Optional[Dict[str, Any]] = None,
196
    ):
197
        """
198
        Invoke the text generation inference for the given prompt and generation parameters.
199

200
        :param prompt:
201
            A string representing the prompt.
202
        :param streaming_callback:
203
            A callback function that is called when a new token is received from the stream.
204
        :param generation_kwargs:
205
            Additional keyword arguments for text generation.
206
        :returns:
207
            A dictionary with the generated replies and metadata. Both are lists of length n.
208
            - replies: A list of strings representing the generated replies.
209
        """
210
        # update generation kwargs by merging with the default ones
211
        generation_kwargs = {**self.generation_kwargs, **(generation_kwargs or {})}
1✔
212

213
        # check if streaming_callback is passed
214
        streaming_callback = select_streaming_callback(
1✔
215
            init_callback=self.streaming_callback, runtime_callback=streaming_callback, requires_async=False
216
        )
217

218
        hf_output = self._client.text_generation(
1✔
219
            prompt, details=True, stream=streaming_callback is not None, **generation_kwargs
220
        )
221

222
        if streaming_callback is not None:
1✔
223
            return self._stream_and_build_response(hf_output=hf_output, streaming_callback=streaming_callback)
1✔
224

225
        # mypy doesn't know that hf_output is a TextGenerationOutput, so we cast it
226
        return self._build_non_streaming_response(cast(TextGenerationOutput, hf_output))
1✔
227

228
    def _stream_and_build_response(
1✔
229
        self, hf_output: Iterable["TextGenerationStreamOutput"], streaming_callback: SyncStreamingCallbackT
230
    ):
231
        chunks: List[StreamingChunk] = []
1✔
232
        first_chunk_time = None
1✔
233

234
        component_info = ComponentInfo.from_component(self)
1✔
235
        for chunk in hf_output:
1✔
236
            token: TextGenerationStreamOutputToken = chunk.token
1✔
237
            if token.special:
1✔
238
                continue
×
239

240
            chunk_metadata = {**asdict(token), **(asdict(chunk.details) if chunk.details else {})}
1✔
241
            if first_chunk_time is None:
1✔
242
                first_chunk_time = datetime.now().isoformat()
1✔
243

244
            stream_chunk = StreamingChunk(
1✔
245
                content=token.text, meta=chunk_metadata, component_info=component_info, index=0, start=len(chunks) == 0
246
            )
247
            chunks.append(stream_chunk)
1✔
248
            streaming_callback(stream_chunk)
1✔
249

250
        metadata = {
1✔
251
            "finish_reason": chunks[-1].meta.get("finish_reason", None),
252
            "model": self._client.model,
253
            "usage": {"completion_tokens": chunks[-1].meta.get("generated_tokens", 0)},
254
            "completion_start_time": first_chunk_time,
255
        }
256
        return {"replies": ["".join([chunk.content for chunk in chunks])], "meta": [metadata]}
1✔
257

258
    def _build_non_streaming_response(self, hf_output: "TextGenerationOutput"):
1✔
259
        meta = [
1✔
260
            {
261
                "model": self._client.model,
262
                "finish_reason": hf_output.details.finish_reason if hf_output.details else None,
263
                "usage": {"completion_tokens": len(hf_output.details.tokens) if hf_output.details else 0},
264
            }
265
        ]
266
        return {"replies": [hf_output.generated_text], "meta": meta}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc