• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 13202548501

07 Feb 2025 03:09PM UTC coverage: 92.299% (-0.4%) from 92.709%
13202548501

Pull #8812

github

web-flow
Merge a326a7bfe into 35788a2d0
Pull Request #8812: feat: AsyncPipeline that can schedule components to run concurrently

9145 of 9908 relevant lines covered (92.3%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.14
haystack/components/embedders/azure_document_embedder.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import os
1✔
6
from typing import Any, Dict, List, Optional, Tuple
1✔
7

8
from openai.lib.azure import AzureOpenAI
1✔
9
from tqdm import tqdm
1✔
10

11
from haystack import Document, component, default_from_dict, default_to_dict
1✔
12
from haystack.utils import Secret, deserialize_secrets_inplace
1✔
13

14

15
@component
1✔
16
class AzureOpenAIDocumentEmbedder:
1✔
17
    """
18
    Calculates document embeddings using OpenAI models deployed on Azure.
19

20
    ### Usage example
21

22
    ```python
23
    from haystack import Document
24
    from haystack.components.embedders import AzureOpenAIDocumentEmbedder
25

26
    doc = Document(content="I love pizza!")
27

28
    document_embedder = AzureOpenAIDocumentEmbedder()
29

30
    result = document_embedder.run([doc])
31
    print(result['documents'][0].embedding)
32

33
    # [0.017020374536514282, -0.023255806416273117, ...]
34
    ```
35
    """
36

37
    def __init__(  # noqa: PLR0913 (too-many-arguments) # pylint: disable=too-many-positional-arguments
1✔
38
        self,
39
        azure_endpoint: Optional[str] = None,
40
        api_version: Optional[str] = "2023-05-15",
41
        azure_deployment: str = "text-embedding-ada-002",
42
        dimensions: Optional[int] = None,
43
        api_key: Optional[Secret] = Secret.from_env_var("AZURE_OPENAI_API_KEY", strict=False),
44
        azure_ad_token: Optional[Secret] = Secret.from_env_var("AZURE_OPENAI_AD_TOKEN", strict=False),
45
        organization: Optional[str] = None,
46
        prefix: str = "",
47
        suffix: str = "",
48
        batch_size: int = 32,
49
        progress_bar: bool = True,
50
        meta_fields_to_embed: Optional[List[str]] = None,
51
        embedding_separator: str = "\n",
52
        timeout: Optional[float] = None,
53
        max_retries: Optional[int] = None,
54
        *,
55
        default_headers: Optional[Dict[str, str]] = None,
56
    ):
57
        """
58
        Creates an AzureOpenAIDocumentEmbedder component.
59

60
        :param azure_endpoint:
61
            The endpoint of the model deployed on Azure.
62
        :param api_version:
63
            The version of the API to use.
64
        :param azure_deployment:
65
            The name of the model deployed on Azure. The default model is text-embedding-ada-002.
66
        :param dimensions:
67
            The number of dimensions of the resulting embeddings. Only supported in text-embedding-3
68
            and later models.
69
        :param api_key:
70
            The Azure OpenAI API key.
71
            You can set it with an environment variable `AZURE_OPENAI_API_KEY`, or pass with this
72
            parameter during initialization.
73
        :param azure_ad_token:
74
            Microsoft Entra ID token, see Microsoft's
75
            [Entra ID](https://www.microsoft.com/en-us/security/business/identity-access/microsoft-entra-id)
76
            documentation for more information. You can set it with an environment variable
77
            `AZURE_OPENAI_AD_TOKEN`, or pass with this parameter during initialization.
78
            Previously called Azure Active Directory.
79
        :param organization:
80
            Your organization ID. See OpenAI's
81
            [Setting Up Your Organization](https://platform.openai.com/docs/guides/production-best-practices/setting-up-your-organization)
82
            for more information.
83
        :param prefix:
84
            A string to add at the beginning of each text.
85
        :param suffix:
86
            A string to add at the end of each text.
87
        :param batch_size:
88
            Number of documents to embed at once.
89
        :param progress_bar:
90
            If `True`, shows a progress bar when running.
91
        :param meta_fields_to_embed:
92
            List of metadata fields to embed along with the document text.
93
        :param embedding_separator:
94
            Separator used to concatenate the metadata fields to the document text.
95
        :param timeout: The timeout for `AzureOpenAI` client calls, in seconds.
96
            If not set, defaults to either the
97
            `OPENAI_TIMEOUT` environment variable, or 30 seconds.
98
        :param max_retries: Maximum number of retries to contact AzureOpenAI after an internal error.
99
            If not set, defaults to either the `OPENAI_MAX_RETRIES` environment variable or to 5 retries.
100
        :param default_headers: Default headers to send to the AzureOpenAI client.
101
        """
102
        # if not provided as a parameter, azure_endpoint is read from the env var AZURE_OPENAI_ENDPOINT
103
        azure_endpoint = azure_endpoint or os.environ.get("AZURE_OPENAI_ENDPOINT")
1✔
104
        if not azure_endpoint:
1✔
105
            raise ValueError("Please provide an Azure endpoint or set the environment variable AZURE_OPENAI_ENDPOINT.")
×
106

107
        if api_key is None and azure_ad_token is None:
1✔
108
            raise ValueError("Please provide an API key or an Azure Active Directory token.")
×
109

110
        self.api_key = api_key
1✔
111
        self.azure_ad_token = azure_ad_token
1✔
112
        self.api_version = api_version
1✔
113
        self.azure_endpoint = azure_endpoint
1✔
114
        self.azure_deployment = azure_deployment
1✔
115
        self.dimensions = dimensions
1✔
116
        self.organization = organization
1✔
117
        self.prefix = prefix
1✔
118
        self.suffix = suffix
1✔
119
        self.batch_size = batch_size
1✔
120
        self.progress_bar = progress_bar
1✔
121
        self.meta_fields_to_embed = meta_fields_to_embed or []
1✔
122
        self.embedding_separator = embedding_separator
1✔
123
        self.timeout = timeout or float(os.environ.get("OPENAI_TIMEOUT", 30.0))
1✔
124
        self.max_retries = max_retries or int(os.environ.get("OPENAI_MAX_RETRIES", 5))
1✔
125
        self.default_headers = default_headers or {}
1✔
126

127
        self._client = AzureOpenAI(
1✔
128
            api_version=api_version,
129
            azure_endpoint=azure_endpoint,
130
            azure_deployment=azure_deployment,
131
            api_key=api_key.resolve_value() if api_key is not None else None,
132
            azure_ad_token=azure_ad_token.resolve_value() if azure_ad_token is not None else None,
133
            organization=organization,
134
            timeout=self.timeout,
135
            max_retries=self.max_retries,
136
            default_headers=self.default_headers,
137
        )
138

139
    def _get_telemetry_data(self) -> Dict[str, Any]:
1✔
140
        """
141
        Data that is sent to Posthog for usage analytics.
142
        """
143
        return {"model": self.azure_deployment}
×
144

145
    def to_dict(self) -> Dict[str, Any]:
1✔
146
        """
147
        Serializes the component to a dictionary.
148

149
        :returns:
150
            Dictionary with serialized data.
151
        """
152
        return default_to_dict(
1✔
153
            self,
154
            azure_endpoint=self.azure_endpoint,
155
            azure_deployment=self.azure_deployment,
156
            dimensions=self.dimensions,
157
            organization=self.organization,
158
            api_version=self.api_version,
159
            prefix=self.prefix,
160
            suffix=self.suffix,
161
            batch_size=self.batch_size,
162
            progress_bar=self.progress_bar,
163
            meta_fields_to_embed=self.meta_fields_to_embed,
164
            embedding_separator=self.embedding_separator,
165
            api_key=self.api_key.to_dict() if self.api_key is not None else None,
166
            azure_ad_token=self.azure_ad_token.to_dict() if self.azure_ad_token is not None else None,
167
            timeout=self.timeout,
168
            max_retries=self.max_retries,
169
            default_headers=self.default_headers,
170
        )
171

172
    @classmethod
1✔
173
    def from_dict(cls, data: Dict[str, Any]) -> "AzureOpenAIDocumentEmbedder":
1✔
174
        """
175
        Deserializes the component from a dictionary.
176

177
        :param data:
178
            Dictionary to deserialize from.
179
        :returns:
180
            Deserialized component.
181
        """
182
        deserialize_secrets_inplace(data["init_parameters"], keys=["api_key", "azure_ad_token"])
1✔
183
        return default_from_dict(cls, data)
1✔
184

185
    def _prepare_texts_to_embed(self, documents: List[Document]) -> List[str]:
1✔
186
        """
187
        Prepare the texts to embed by concatenating the Document text with the metadata fields to embed.
188
        """
189
        texts_to_embed = []
×
190
        for doc in documents:
×
191
            meta_values_to_embed = [
×
192
                str(doc.meta[key]) for key in self.meta_fields_to_embed if key in doc.meta and doc.meta[key] is not None
193
            ]
194

195
            text_to_embed = (
×
196
                self.prefix + self.embedding_separator.join(meta_values_to_embed + [doc.content or ""]) + self.suffix
197
            ).replace("\n", " ")
198

199
            texts_to_embed.append(text_to_embed)
×
200
        return texts_to_embed
×
201

202
    def _embed_batch(self, texts_to_embed: List[str], batch_size: int) -> Tuple[List[List[float]], Dict[str, Any]]:
1✔
203
        """
204
        Embed a list of texts in batches.
205
        """
206

207
        all_embeddings: List[List[float]] = []
×
208
        meta: Dict[str, Any] = {"model": "", "usage": {"prompt_tokens": 0, "total_tokens": 0}}
×
209
        for i in tqdm(range(0, len(texts_to_embed), batch_size), desc="Embedding Texts"):
×
210
            batch = texts_to_embed[i : i + batch_size]
×
211
            if self.dimensions is not None:
×
212
                response = self._client.embeddings.create(
×
213
                    model=self.azure_deployment, dimensions=self.dimensions, input=batch
214
                )
215
            else:
216
                response = self._client.embeddings.create(model=self.azure_deployment, input=batch)
×
217

218
            # Append embeddings to the list
219
            all_embeddings.extend(el.embedding for el in response.data)
×
220

221
            # Update the meta information only once if it's empty
222
            if not meta["model"]:
×
223
                meta["model"] = response.model
×
224
                meta["usage"] = dict(response.usage)
×
225
            else:
226
                # Update the usage tokens
227
                meta["usage"]["prompt_tokens"] += response.usage.prompt_tokens
×
228
                meta["usage"]["total_tokens"] += response.usage.total_tokens
×
229

230
        return all_embeddings, meta
×
231

232
    @component.output_types(documents=List[Document], meta=Dict[str, Any])
1✔
233
    def run(self, documents: List[Document]) -> Dict[str, Any]:
1✔
234
        """
235
        Embeds a list of documents.
236

237
        :param documents:
238
            Documents to embed.
239

240
        :returns:
241
            A dictionary with the following keys:
242
            - `documents`: A list of documents with embeddings.
243
            - `meta`: Information about the usage of the model.
244
        """
245
        if not (isinstance(documents, list) and all(isinstance(doc, Document) for doc in documents)):
×
246
            raise TypeError("Input must be a list of Document instances. For strings, use AzureOpenAITextEmbedder.")
×
247

248
        texts_to_embed = self._prepare_texts_to_embed(documents=documents)
×
249
        embeddings, meta = self._embed_batch(texts_to_embed=texts_to_embed, batch_size=self.batch_size)
×
250

251
        # Assign the corresponding embeddings to each document
252
        for doc, emb in zip(documents, embeddings):
×
253
            doc.embedding = emb
×
254

255
        return {"documents": documents, "meta": meta}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc