• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 14487605870

16 Apr 2025 07:54AM UTC coverage: 90.353% (+0.01%) from 90.342%
14487605870

push

github

web-flow
 feat: add `run_async` for `HuggingFaceAPIDocumentEmbedder` (#9226)

* added async support for HuggingFaceAPIDocumentEmbedder

* added type anotations, removed unused import

* Trigger mark test complited

* Apply suggestions from code review

* utility function

---------

Co-authored-by: Stefano Fiorucci <stefanofiorucci@gmail.com>

10687 of 11828 relevant lines covered (90.35%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.43
haystack/components/embedders/hugging_face_api_document_embedder.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
from typing import Any, Dict, List, Optional, Tuple, Union
1✔
6

7
from tqdm import tqdm
1✔
8
from tqdm.asyncio import tqdm as async_tqdm
1✔
9

10
from haystack import component, default_from_dict, default_to_dict, logging
1✔
11
from haystack.dataclasses import Document
1✔
12
from haystack.lazy_imports import LazyImport
1✔
13
from haystack.utils import Secret, deserialize_secrets_inplace
1✔
14
from haystack.utils.hf import HFEmbeddingAPIType, HFModelType, check_valid_model
1✔
15
from haystack.utils.url_validation import is_valid_http_url
1✔
16

17
with LazyImport(message="Run 'pip install \"huggingface_hub>=0.27.0\"'") as huggingface_hub_import:
1✔
18
    from huggingface_hub import AsyncInferenceClient, InferenceClient
1✔
19

20
logger = logging.getLogger(__name__)
1✔
21

22

23
@component
1✔
24
class HuggingFaceAPIDocumentEmbedder:
1✔
25
    """
26
    Embeds documents using Hugging Face APIs.
27

28
    Use it with the following Hugging Face APIs:
29
    - [Free Serverless Inference API](https://huggingface.co/inference-api)
30
    - [Paid Inference Endpoints](https://huggingface.co/inference-endpoints)
31
    - [Self-hosted Text Embeddings Inference](https://github.com/huggingface/text-embeddings-inference)
32

33

34
    ### Usage examples
35

36
    #### With free serverless inference API
37

38
    ```python
39
    from haystack.components.embedders import HuggingFaceAPIDocumentEmbedder
40
    from haystack.utils import Secret
41
    from haystack.dataclasses import Document
42

43
    doc = Document(content="I love pizza!")
44

45
    doc_embedder = HuggingFaceAPIDocumentEmbedder(api_type="serverless_inference_api",
46
                                                  api_params={"model": "BAAI/bge-small-en-v1.5"},
47
                                                  token=Secret.from_token("<your-api-key>"))
48

49
    result = document_embedder.run([doc])
50
    print(result["documents"][0].embedding)
51

52
    # [0.017020374536514282, -0.023255806416273117, ...]
53
    ```
54

55
    #### With paid inference endpoints
56

57
    ```python
58
    from haystack.components.embedders import HuggingFaceAPIDocumentEmbedder
59
    from haystack.utils import Secret
60
    from haystack.dataclasses import Document
61

62
    doc = Document(content="I love pizza!")
63

64
    doc_embedder = HuggingFaceAPIDocumentEmbedder(api_type="inference_endpoints",
65
                                                  api_params={"url": "<your-inference-endpoint-url>"},
66
                                                  token=Secret.from_token("<your-api-key>"))
67

68
    result = document_embedder.run([doc])
69
    print(result["documents"][0].embedding)
70

71
    # [0.017020374536514282, -0.023255806416273117, ...]
72
    ```
73

74
    #### With self-hosted text embeddings inference
75

76
    ```python
77
    from haystack.components.embedders import HuggingFaceAPIDocumentEmbedder
78
    from haystack.dataclasses import Document
79

80
    doc = Document(content="I love pizza!")
81

82
    doc_embedder = HuggingFaceAPIDocumentEmbedder(api_type="text_embeddings_inference",
83
                                                  api_params={"url": "http://localhost:8080"})
84

85
    result = document_embedder.run([doc])
86
    print(result["documents"][0].embedding)
87

88
    # [0.017020374536514282, -0.023255806416273117, ...]
89
    ```
90
    """
91

92
    def __init__(
1✔
93
        self,
94
        api_type: Union[HFEmbeddingAPIType, str],
95
        api_params: Dict[str, str],
96
        token: Optional[Secret] = Secret.from_env_var(["HF_API_TOKEN", "HF_TOKEN"], strict=False),
97
        prefix: str = "",
98
        suffix: str = "",
99
        truncate: Optional[bool] = True,
100
        normalize: Optional[bool] = False,
101
        batch_size: int = 32,
102
        progress_bar: bool = True,
103
        meta_fields_to_embed: Optional[List[str]] = None,
104
        embedding_separator: str = "\n",
105
    ):  # pylint: disable=too-many-positional-arguments
106
        """
107
        Creates a HuggingFaceAPIDocumentEmbedder component.
108

109
        :param api_type:
110
            The type of Hugging Face API to use.
111
        :param api_params:
112
            A dictionary with the following keys:
113
            - `model`: Hugging Face model ID. Required when `api_type` is `SERVERLESS_INFERENCE_API`.
114
            - `url`: URL of the inference endpoint. Required when `api_type` is `INFERENCE_ENDPOINTS` or
115
            `TEXT_EMBEDDINGS_INFERENCE`.
116
        :param token: The Hugging Face token to use as HTTP bearer authorization.
117
            Check your HF token in your [account settings](https://huggingface.co/settings/tokens).
118
        :param prefix:
119
            A string to add at the beginning of each text.
120
        :param suffix:
121
            A string to add at the end of each text.
122
        :param truncate:
123
            Truncates the input text to the maximum length supported by the model.
124
            Applicable when `api_type` is `TEXT_EMBEDDINGS_INFERENCE`, or `INFERENCE_ENDPOINTS`
125
            if the backend uses Text Embeddings Inference.
126
            If `api_type` is `SERVERLESS_INFERENCE_API`, this parameter is ignored.
127
        :param normalize:
128
            Normalizes the embeddings to unit length.
129
            Applicable when `api_type` is `TEXT_EMBEDDINGS_INFERENCE`, or `INFERENCE_ENDPOINTS`
130
            if the backend uses Text Embeddings Inference.
131
            If `api_type` is `SERVERLESS_INFERENCE_API`, this parameter is ignored.
132
        :param batch_size:
133
            Number of documents to process at once.
134
        :param progress_bar:
135
            If `True`, shows a progress bar when running.
136
        :param meta_fields_to_embed:
137
            List of metadata fields to embed along with the document text.
138
        :param embedding_separator:
139
            Separator used to concatenate the metadata fields to the document text.
140
        """
141
        huggingface_hub_import.check()
1✔
142

143
        if isinstance(api_type, str):
1✔
144
            api_type = HFEmbeddingAPIType.from_str(api_type)
1✔
145

146
        api_params = api_params or {}
1✔
147

148
        if api_type == HFEmbeddingAPIType.SERVERLESS_INFERENCE_API:
1✔
149
            model = api_params.get("model")
1✔
150
            if model is None:
1✔
151
                raise ValueError(
1✔
152
                    "To use the Serverless Inference API, you need to specify the `model` parameter in `api_params`."
153
                )
154
            check_valid_model(model, HFModelType.EMBEDDING, token)
1✔
155
            model_or_url = model
1✔
156
        elif api_type in [HFEmbeddingAPIType.INFERENCE_ENDPOINTS, HFEmbeddingAPIType.TEXT_EMBEDDINGS_INFERENCE]:
1✔
157
            url = api_params.get("url")
1✔
158
            if url is None:
1✔
159
                msg = (
1✔
160
                    "To use Text Embeddings Inference or Inference Endpoints, you need to specify the `url` "
161
                    "parameter in `api_params`."
162
                )
163
                raise ValueError(msg)
1✔
164
            if not is_valid_http_url(url):
1✔
165
                raise ValueError(f"Invalid URL: {url}")
1✔
166
            model_or_url = url
1✔
167
        else:
168
            msg = f"Unknown api_type {api_type}"
×
169
            raise ValueError(msg)
×
170

171
        client_args: Dict[str, Any] = {"model": model_or_url, "token": token.resolve_value() if token else None}
1✔
172

173
        self.api_type = api_type
1✔
174
        self.api_params = api_params
1✔
175
        self.token = token
1✔
176
        self.prefix = prefix
1✔
177
        self.suffix = suffix
1✔
178
        self.truncate = truncate
1✔
179
        self.normalize = normalize
1✔
180
        self.batch_size = batch_size
1✔
181
        self.progress_bar = progress_bar
1✔
182
        self.meta_fields_to_embed = meta_fields_to_embed or []
1✔
183
        self.embedding_separator = embedding_separator
1✔
184
        self._client = InferenceClient(**client_args)
1✔
185
        self._async_client = AsyncInferenceClient(**client_args)
1✔
186

187
    def to_dict(self) -> Dict[str, Any]:
1✔
188
        """
189
        Serializes the component to a dictionary.
190

191
        :returns:
192
            Dictionary with serialized data.
193
        """
194
        return default_to_dict(
1✔
195
            self,
196
            api_type=str(self.api_type),
197
            api_params=self.api_params,
198
            prefix=self.prefix,
199
            suffix=self.suffix,
200
            token=self.token.to_dict() if self.token else None,
201
            truncate=self.truncate,
202
            normalize=self.normalize,
203
            batch_size=self.batch_size,
204
            progress_bar=self.progress_bar,
205
            meta_fields_to_embed=self.meta_fields_to_embed,
206
            embedding_separator=self.embedding_separator,
207
        )
208

209
    @classmethod
1✔
210
    def from_dict(cls, data: Dict[str, Any]) -> "HuggingFaceAPIDocumentEmbedder":
1✔
211
        """
212
        Deserializes the component from a dictionary.
213

214
        :param data:
215
            Dictionary to deserialize from.
216
        :returns:
217
            Deserialized component.
218
        """
219
        deserialize_secrets_inplace(data["init_parameters"], keys=["token"])
1✔
220
        return default_from_dict(cls, data)
1✔
221

222
    def _prepare_texts_to_embed(self, documents: List[Document]) -> List[str]:
1✔
223
        """
224
        Prepare the texts to embed by concatenating the Document text with the metadata fields to embed.
225
        """
226
        texts_to_embed = []
1✔
227
        for doc in documents:
1✔
228
            meta_values_to_embed = [
1✔
229
                str(doc.meta[key]) for key in self.meta_fields_to_embed if key in doc.meta and doc.meta[key] is not None
230
            ]
231

232
            text_to_embed = (
1✔
233
                self.prefix + self.embedding_separator.join(meta_values_to_embed + [doc.content or ""]) + self.suffix
234
            )
235

236
            texts_to_embed.append(text_to_embed)
1✔
237
        return texts_to_embed
1✔
238

239
    @staticmethod
1✔
240
    def _adjust_api_parameters(
1✔
241
        truncate: Optional[bool], normalize: Optional[bool], api_type: HFEmbeddingAPIType
242
    ) -> Tuple[Optional[bool], Optional[bool]]:
243
        """
244
        Adjust the truncate and normalize parameters based on the API type.
245
        """
246
        if api_type == HFEmbeddingAPIType.SERVERLESS_INFERENCE_API:
1✔
247
            if truncate is not None:
1✔
248
                msg = "`truncate` parameter is not supported for Serverless Inference API. It will be ignored."
1✔
249
                logger.warning(msg)
1✔
250
                truncate = None
1✔
251
            if normalize is not None:
1✔
252
                msg = "`normalize` parameter is not supported for Serverless Inference API. It will be ignored."
1✔
253
                logger.warning(msg)
1✔
254
                normalize = None
1✔
255
        return truncate, normalize
1✔
256

257
    def _embed_batch(self, texts_to_embed: List[str], batch_size: int) -> List[List[float]]:
1✔
258
        """
259
        Embed a list of texts in batches.
260
        """
261
        truncate, normalize = self._adjust_api_parameters(self.truncate, self.normalize, self.api_type)
1✔
262

263
        all_embeddings: List = []
1✔
264
        for i in tqdm(
1✔
265
            range(0, len(texts_to_embed), batch_size), disable=not self.progress_bar, desc="Calculating embeddings"
266
        ):
267
            batch = texts_to_embed[i : i + batch_size]
1✔
268

269
            np_embeddings = self._client.feature_extraction(
1✔
270
                # this method does not officially support list of strings, but works as expected
271
                text=batch,  # type: ignore[arg-type]
272
                truncate=truncate,
273
                normalize=normalize,
274
            )
275

276
            if np_embeddings.ndim != 2 or np_embeddings.shape[0] != len(batch):
1✔
277
                raise ValueError(f"Expected embedding shape ({batch_size}, embedding_dim), got {np_embeddings.shape}")
1✔
278

279
            all_embeddings.extend(np_embeddings.tolist())
1✔
280

281
        return all_embeddings
1✔
282

283
    async def _embed_batch_async(self, texts_to_embed: List[str], batch_size: int) -> List[List[float]]:
1✔
284
        """
285
        Embed a list of texts in batches asynchronously.
286
        """
287
        truncate, normalize = self._adjust_api_parameters(self.truncate, self.normalize, self.api_type)
1✔
288

289
        all_embeddings: List = []
1✔
290
        for i in async_tqdm(
1✔
291
            range(0, len(texts_to_embed), batch_size), disable=not self.progress_bar, desc="Calculating embeddings"
292
        ):
293
            batch = texts_to_embed[i : i + batch_size]
1✔
294

295
            np_embeddings = await self._async_client.feature_extraction(
1✔
296
                # this method does not officially support list of strings, but works as expected
297
                text=batch,  # type: ignore[arg-type]
298
                truncate=truncate,
299
                normalize=normalize,
300
            )
301

302
            if np_embeddings.ndim != 2 or np_embeddings.shape[0] != len(batch):
1✔
303
                raise ValueError(f"Expected embedding shape ({batch_size}, embedding_dim), got {np_embeddings.shape}")
1✔
304

305
            all_embeddings.extend(np_embeddings.tolist())
1✔
306

307
        return all_embeddings
1✔
308

309
    @component.output_types(documents=List[Document])
1✔
310
    def run(self, documents: List[Document]):
1✔
311
        """
312
        Embeds a list of documents.
313

314
        :param documents:
315
            Documents to embed.
316

317
        :returns:
318
            A dictionary with the following keys:
319
            - `documents`: A list of documents with embeddings.
320
        """
321
        if not isinstance(documents, list) or documents and not isinstance(documents[0], Document):
1✔
322
            raise TypeError(
×
323
                "HuggingFaceAPIDocumentEmbedder expects a list of Documents as input."
324
                " In case you want to embed a string, please use the HuggingFaceAPITextEmbedder."
325
            )
326

327
        texts_to_embed = self._prepare_texts_to_embed(documents=documents)
1✔
328

329
        embeddings = self._embed_batch(texts_to_embed=texts_to_embed, batch_size=self.batch_size)
1✔
330

331
        for doc, emb in zip(documents, embeddings):
1✔
332
            doc.embedding = emb
1✔
333

334
        return {"documents": documents}
1✔
335

336
    @component.output_types(documents=List[Document])
1✔
337
    async def run_async(self, documents: List[Document]):
1✔
338
        """
339
        Embeds a list of documents asynchronously.
340

341
        :param documents:
342
            Documents to embed.
343

344
        :returns:
345
            A dictionary with the following keys:
346
            - `documents`: A list of documents with embeddings.
347
        """
348
        if not isinstance(documents, list) or documents and not isinstance(documents[0], Document):
1✔
349
            raise TypeError(
×
350
                "HuggingFaceAPIDocumentEmbedder expects a list of Documents as input."
351
                " In case you want to embed a string, please use the HuggingFaceAPITextEmbedder."
352
            )
353

354
        texts_to_embed = self._prepare_texts_to_embed(documents=documents)
1✔
355

356
        embeddings = await self._embed_batch_async(texts_to_embed=texts_to_embed, batch_size=self.batch_size)
1✔
357

358
        for doc, emb in zip(documents, embeddings):
1✔
359
            doc.embedding = emb
1✔
360

361
        return {"documents": documents}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc