• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 18801669717

25 Oct 2025 10:09AM UTC coverage: 92.199% (-0.02%) from 92.219%
18801669717

Pull #9931

github

web-flow
Merge 64b38d384 into 554616981
Pull Request #9931: fix: minor type hint in run method

13461 of 14600 relevant lines covered (92.2%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.2
haystack/components/converters/azure.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import copy
1✔
6
import os
1✔
7
from collections import defaultdict
1✔
8
from pathlib import Path
1✔
9
from typing import Any, Literal, Optional, Union
1✔
10

11
import networkx as nx
1✔
12

13
from haystack import Document, component, default_from_dict, default_to_dict, logging
1✔
14
from haystack.components.converters.utils import get_bytestream_from_source, normalize_metadata
1✔
15
from haystack.dataclasses import ByteStream
1✔
16
from haystack.lazy_imports import LazyImport
1✔
17
from haystack.utils import Secret, deserialize_secrets_inplace
1✔
18

19
logger = logging.getLogger(__name__)
1✔
20

21
with LazyImport(message="Run 'pip install \"azure-ai-formrecognizer>=3.2.0b2\"'") as azure_import:
1✔
22
    from azure.ai.formrecognizer import AnalyzeResult, DocumentAnalysisClient, DocumentLine, DocumentParagraph
1✔
23
    from azure.core.credentials import AzureKeyCredential
1✔
24

25
with LazyImport(message="Run 'pip install pandas'") as pandas_import:
1✔
26
    from pandas import DataFrame
1✔
27

28

29
@component
1✔
30
class AzureOCRDocumentConverter:
1✔
31
    """
32
    Converts files to documents using Azure's Document Intelligence service.
33

34
    Supported file formats are: PDF, JPEG, PNG, BMP, TIFF, DOCX, XLSX, PPTX, and HTML.
35

36
    To use this component, you need an active Azure account
37
    and a Document Intelligence or Cognitive Services resource. For help with setting up your resource, see
38
    [Azure documentation](https://learn.microsoft.com/en-us/azure/ai-services/document-intelligence/quickstarts/get-started-sdks-rest-api).
39

40
    ### Usage example
41

42
    ```python
43
    from haystack.components.converters import AzureOCRDocumentConverter
44
    from haystack.utils import Secret
45

46
    converter = AzureOCRDocumentConverter(endpoint="<url>", api_key=Secret.from_token("<your-api-key>"))
47
    results = converter.run(sources=["path/to/doc_with_images.pdf"], meta={"date_added": datetime.now().isoformat()})
48
    documents = results["documents"]
49
    print(documents[0].content)
50
    # 'This is a text from the PDF file.'
51
    ```
52
    """
53

54
    def __init__(  # pylint: disable=too-many-positional-arguments
1✔
55
        self,
56
        endpoint: str,
57
        api_key: Secret = Secret.from_env_var("AZURE_AI_API_KEY"),
58
        model_id: str = "prebuilt-read",
59
        preceding_context_len: int = 3,
60
        following_context_len: int = 3,
61
        merge_multiple_column_headers: bool = True,
62
        page_layout: Literal["natural", "single_column"] = "natural",
63
        threshold_y: Optional[float] = 0.05,
64
        store_full_path: bool = False,
65
    ):
66
        """
67
        Creates an AzureOCRDocumentConverter component.
68

69
        :param endpoint:
70
            The endpoint of your Azure resource.
71
        :param api_key:
72
            The API key of your Azure resource.
73
        :param model_id:
74
            The ID of the model you want to use. For a list of available models, see [Azure documentation]
75
            (https://learn.microsoft.com/en-us/azure/ai-services/document-intelligence/choose-model-feature).
76
        :param preceding_context_len: Number of lines before a table to include as preceding context
77
            (this will be added to the metadata).
78
        :param following_context_len: Number of lines after a table to include as subsequent context (
79
            this will be added to the metadata).
80
        :param merge_multiple_column_headers: If `True`, merges multiple column header rows into a single row.
81
        :param page_layout: The type reading order to follow. Possible options:
82
            - `natural`: Uses the natural reading order determined by Azure.
83
            - `single_column`: Groups all lines with the same height on the page based on a threshold
84
            determined by `threshold_y`.
85
        :param threshold_y: Only relevant if `single_column` is set to `page_layout`.
86
            The threshold, in inches, to determine if two recognized PDF elements are grouped into a
87
            single line. This is crucial for section headers or numbers which may be spatially separated
88
            from the remaining text on the horizontal axis.
89
        :param store_full_path:
90
            If True, the full path of the file is stored in the metadata of the document.
91
            If False, only the file name is stored.
92
        """
93
        azure_import.check()
1✔
94
        pandas_import.check()
1✔
95

96
        self.document_analysis_client = DocumentAnalysisClient(
1✔
97
            endpoint=endpoint, credential=AzureKeyCredential(api_key.resolve_value() or "")
98
        )
99
        self.endpoint = endpoint
1✔
100
        self.model_id = model_id
1✔
101
        self.api_key = api_key
1✔
102
        self.preceding_context_len = preceding_context_len
1✔
103
        self.following_context_len = following_context_len
1✔
104
        self.merge_multiple_column_headers = merge_multiple_column_headers
1✔
105
        self.page_layout = page_layout
1✔
106
        self.threshold_y = threshold_y
1✔
107
        self.store_full_path = store_full_path
1✔
108
        if self.page_layout == "single_column" and self.threshold_y is None:
1✔
109
            self.threshold_y = 0.05
×
110

111
    @component.output_types(documents=list[Document], raw_azure_response=list[dict])
1✔
112
    def run(
1✔
113
        self,
114
        sources: list[Union[str, Path, ByteStream]],
115
        meta: Optional[Union[dict[str, Any], list[dict[str, Any]]]] = None,
116
    ):
117
        """
118
        Convert a list of files to Documents using Azure's Document Intelligence service.
119

120
        :param sources:
121
            List of file paths or ByteStream objects.
122
        :param meta:
123
            Optional metadata to attach to the Documents.
124
            This value can be either a list of dictionaries or a single dictionary.
125
            If it's a single dictionary, its content is added to the metadata of all produced Documents.
126
            If it's a list, the length of the list must match the number of sources, because the two lists will be
127
            zipped. If `sources` contains ByteStream objects, their `meta` will be added to the output Documents.
128

129
        :returns:
130
            A dictionary with the following keys:
131
            - `documents`: List of created Documents
132
            - `raw_azure_response`: List of raw Azure responses used to create the Documents
133
        """
134
        documents = []
1✔
135
        azure_output = []
1✔
136
        meta_list: list[dict[str, Any]] = normalize_metadata(meta=meta, sources_count=len(sources))
1✔
137
        for source, metadata in zip(sources, meta_list):
1✔
138
            try:
1✔
139
                bytestream = get_bytestream_from_source(source=source)
1✔
140
            except Exception as e:
×
141
                logger.warning("Could not read {source}. Skipping it. Error: {error}", source=source, error=e)
×
142
                continue
×
143

144
            poller = self.document_analysis_client.begin_analyze_document(
1✔
145
                model_id=self.model_id, document=bytestream.data
146
            )
147
            result = poller.result()
1✔
148
            azure_output.append(result.to_dict())
1✔
149

150
            merged_metadata = {**bytestream.meta, **metadata}
1✔
151

152
            if not self.store_full_path and (file_path := bytestream.meta.get("file_path")):
1✔
153
                merged_metadata["file_path"] = os.path.basename(file_path)
1✔
154
            docs = self._convert_tables_and_text(result=result, meta=merged_metadata)
1✔
155
            documents.extend(docs)
1✔
156

157
        return {"documents": documents, "raw_azure_response": azure_output}
1✔
158

159
    def to_dict(self) -> dict[str, Any]:
1✔
160
        """
161
        Serializes the component to a dictionary.
162

163
        :returns:
164
            Dictionary with serialized data.
165
        """
166
        return default_to_dict(
1✔
167
            self,
168
            api_key=self.api_key.to_dict(),
169
            endpoint=self.endpoint,
170
            model_id=self.model_id,
171
            preceding_context_len=self.preceding_context_len,
172
            following_context_len=self.following_context_len,
173
            merge_multiple_column_headers=self.merge_multiple_column_headers,
174
            page_layout=self.page_layout,
175
            threshold_y=self.threshold_y,
176
            store_full_path=self.store_full_path,
177
        )
178

179
    @classmethod
1✔
180
    def from_dict(cls, data: dict[str, Any]) -> "AzureOCRDocumentConverter":
1✔
181
        """
182
        Deserializes the component from a dictionary.
183

184
        :param data:
185
            The dictionary to deserialize from.
186
        :returns:
187
            The deserialized component.
188
        """
189
        deserialize_secrets_inplace(data["init_parameters"], keys=["api_key"])
×
190
        return default_from_dict(cls, data)
×
191

192
    # pylint: disable=line-too-long
193
    def _convert_tables_and_text(self, result: "AnalyzeResult", meta: Optional[dict[str, Any]]) -> list[Document]:
1✔
194
        """
195
        Converts the tables and text extracted by Azure's Document Intelligence service into Haystack Documents.
196

197
        :param result: The AnalyzeResult object returned by the `begin_analyze_document` method. Docs on Analyze result
198
            can be found [here](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-ai-formrecognizer/3.3.0/azure.ai.formrecognizer.html?highlight=read#azure.ai.formrecognizer.AnalyzeResult).
199
        :param meta: Optional dictionary with metadata that shall be attached to all resulting documents.
200
            Can be any custom keys and values.
201
        :returns: List of Documents containing the tables and text extracted from the AnalyzeResult object.
202
        """
203
        tables = self._convert_tables(result=result, meta=meta)
1✔
204
        if self.page_layout == "natural":
1✔
205
            text = self._convert_to_natural_text(result=result, meta=meta)
1✔
206
        else:
207
            assert isinstance(self.threshold_y, float)
1✔
208
            text = self._convert_to_single_column_text(result=result, meta=meta, threshold_y=self.threshold_y)
1✔
209
        docs = [*tables, text]
1✔
210
        return docs
1✔
211

212
    def _convert_tables(self, result: "AnalyzeResult", meta: Optional[dict[str, Any]]) -> list[Document]:
1✔
213
        """
214
        Converts the tables extracted by Azure's Document Intelligence service into Haystack Documents.
215

216
        :param result: The AnalyzeResult Azure object
217
        :param meta: Optional dictionary with metadata that shall be attached to all resulting documents.
218

219
        :returns: List of Documents containing the tables extracted from the AnalyzeResult object.
220
        """
221
        converted_tables: list[Document] = []
1✔
222

223
        if not result.tables:
1✔
224
            return converted_tables
1✔
225

226
        for table in result.tables:
1✔
227
            # Initialize table with empty cells
228
            table_list = [[""] * table.column_count for _ in range(table.row_count)]
1✔
229
            additional_column_header_rows = set()
1✔
230
            caption = ""
1✔
231
            row_idx_start = 0
1✔
232

233
            for idx, cell in enumerate(table.cells):
1✔
234
                # Remove ':selected:'/':unselected:' tags from cell's content
235
                cell.content = cell.content.replace(":selected:", "")
1✔
236
                cell.content = cell.content.replace(":unselected:", "")
1✔
237

238
                # Check if first row is a merged cell spanning whole table
239
                # -> exclude this row and use as a caption
240
                if idx == 0 and cell.column_span == table.column_count:
1✔
241
                    caption = cell.content
1✔
242
                    row_idx_start = 1
1✔
243
                    table_list.pop(0)
1✔
244
                    continue
1✔
245

246
                column_span = cell.column_span if cell.column_span else 0
1✔
247
                for c in range(column_span):  # pylint: disable=invalid-name
1✔
248
                    row_span = cell.row_span if cell.row_span else 0
1✔
249
                    for r in range(row_span):  # pylint: disable=invalid-name
1✔
250
                        if (
1✔
251
                            self.merge_multiple_column_headers
252
                            and cell.kind == "columnHeader"
253
                            and cell.row_index > row_idx_start
254
                        ):
255
                            # More than one row serves as column header
256
                            table_list[0][cell.column_index + c] += f"\n{cell.content}"
×
257
                            additional_column_header_rows.add(cell.row_index - row_idx_start)
×
258
                        else:
259
                            table_list[cell.row_index + r - row_idx_start][cell.column_index + c] = cell.content
1✔
260

261
            # Remove additional column header rows, as these got attached to the first row
262
            for row_idx in sorted(additional_column_header_rows, reverse=True):
1✔
263
                del table_list[row_idx]
×
264

265
            # Get preceding context of table
266
            if table.bounding_regions:
1✔
267
                table_beginning_page = next(
1✔
268
                    page for page in result.pages if page.page_number == table.bounding_regions[0].page_number
269
                )
270
            else:
271
                table_beginning_page = None
×
272
            table_start_offset = table.spans[0].offset
1✔
273
            if table_beginning_page and table_beginning_page.lines:
1✔
274
                preceding_lines = [
1✔
275
                    line.content for line in table_beginning_page.lines if line.spans[0].offset < table_start_offset
276
                ]
277
            else:
278
                preceding_lines = []
×
279
            preceding_context = "\n".join(preceding_lines[-self.preceding_context_len :]) + f"\n{caption}"
1✔
280
            preceding_context = preceding_context.strip()
1✔
281

282
            # Get following context
283
            if table.bounding_regions and len(table.bounding_regions) == 1:
1✔
284
                table_end_page = table_beginning_page
1✔
285
            elif table.bounding_regions:
×
286
                table_end_page = next(
×
287
                    page for page in result.pages if page.page_number == table.bounding_regions[-1].page_number
288
                )
289
            else:
290
                table_end_page = None
×
291

292
            table_end_offset = table_start_offset + table.spans[0].length
1✔
293
            if table_end_page and table_end_page.lines:
1✔
294
                following_lines = [
1✔
295
                    line.content for line in table_end_page.lines if line.spans[0].offset > table_end_offset
296
                ]
297
            else:
298
                following_lines = []
×
299
            following_context = "\n".join(following_lines[: self.following_context_len])
1✔
300

301
            table_meta = copy.deepcopy(meta)
1✔
302

303
            if isinstance(table_meta, dict):
1✔
304
                table_meta["preceding_context"] = preceding_context
1✔
305
                table_meta["following_context"] = following_context
1✔
306
            else:
307
                table_meta = {"preceding_context": preceding_context, "following_context": following_context}
×
308

309
            if table.bounding_regions:
1✔
310
                table_meta["page"] = table.bounding_regions[0].page_number
1✔
311

312
            # Convert table to CSV
313
            table_df = DataFrame(data=table_list)
1✔
314
            table_content = table_df.to_csv(header=False, index=False, lineterminator="\n")
1✔
315
            converted_tables.append(Document(content=table_content, meta=table_meta))
1✔
316

317
        return converted_tables
1✔
318

319
    def _convert_to_natural_text(self, result: "AnalyzeResult", meta: Optional[dict[str, Any]]) -> Document:
1✔
320
        """
321
        This converts the `AnalyzeResult` object into a single document.
322

323
        We add "\f" separators between to differentiate between the text on separate pages. This is the expected format
324
        for the PreProcessor.
325

326
        :param result: The AnalyzeResult object returned by the `begin_analyze_document` method. Docs on Analyze result
327
            can be found [here](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-ai-formrecognizer/3.3.0/azure.ai.formrecognizer.html?highlight=read#azure.ai.formrecognizer.AnalyzeResult).
328
        :param meta: Optional dictionary with metadata that shall be attached to all resulting documents.
329
            Can be any custom keys and values.
330
        :returns: A single Document containing all the text extracted from the AnalyzeResult object.
331
        """
332
        table_spans_by_page = self._collect_table_spans(result=result)
1✔
333

334
        texts = []
1✔
335
        if result.paragraphs:
1✔
336
            paragraphs_to_pages: dict[int, str] = defaultdict(str)
1✔
337
            for paragraph in result.paragraphs:
1✔
338
                if paragraph.bounding_regions:
1✔
339
                    # If paragraph spans multiple pages we group it with the first page number
340
                    page_numbers = [b.page_number for b in paragraph.bounding_regions]
1✔
341
                else:
342
                    # If page_number is not available we put the paragraph onto an existing page
343
                    current_last_page_number = sorted(paragraphs_to_pages.keys())[-1] if paragraphs_to_pages else 1
×
344
                    page_numbers = [current_last_page_number]
×
345
                tables_on_page = table_spans_by_page[page_numbers[0]]
1✔
346
                # Check if paragraph is part of a table and if so skip
347
                if self._check_if_in_table(tables_on_page, line_or_paragraph=paragraph):
1✔
348
                    continue
1✔
349
                paragraphs_to_pages[page_numbers[0]] += paragraph.content + "\n"
1✔
350

351
            max_page_number: int = max(paragraphs_to_pages)
1✔
352
            for page_idx in range(1, max_page_number + 1):
1✔
353
                # We add empty strings for missing pages so the preprocessor can still extract the correct page number
354
                # from the original PDF.
355
                page_text = paragraphs_to_pages.get(page_idx, "")
1✔
356
                texts.append(page_text)
1✔
357
        else:
358
            logger.warning("No text paragraphs were detected by the OCR conversion.")
×
359

360
        all_text = "\f".join(texts)
1✔
361
        return Document(content=all_text, meta=meta if meta else {})
1✔
362

363
    def _convert_to_single_column_text(
1✔
364
        self, result: "AnalyzeResult", meta: Optional[dict[str, str]], threshold_y: float = 0.05
365
    ) -> Document:
366
        """
367
        This converts the `AnalyzeResult` object into a single Haystack Document.
368

369
        We add "\f" separators between to differentiate between the text on separate pages. This is the expected format
370
        for the PreProcessor.
371

372
        :param result: The AnalyzeResult object returned by the `begin_analyze_document` method. Docs on Analyze result
373
            can be found [here](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-ai-formrecognizer/3.3.0/azure.ai.formrecognizer.html?highlight=read#azure.ai.formrecognizer.AnalyzeResult).
374
        :param meta: Optional dictionary with metadata that shall be attached to all resulting documents.
375
            Can be any custom keys and values.
376
        :param threshold_y: height threshold in inches for PDF and pixels for images
377
        :returns: A single Document containing all the text extracted from the AnalyzeResult object.
378
        """
379
        table_spans_by_page = self._collect_table_spans(result=result)
1✔
380

381
        # Find all pairs of lines that should be grouped together based on the y-value of the upper left coordinate
382
        # of their bounding box
383
        pairs_by_page = defaultdict(list)
1✔
384
        for page_idx, page in enumerate(result.pages):
1✔
385
            lines = page.lines if page.lines else []
1✔
386
            # Only works if polygons is available
387
            if all(line.polygon is not None for line in lines):
1✔
388
                for i in range(len(lines)):  # pylint: disable=consider-using-enumerate
1✔
389
                    # left_upi, right_upi, right_lowi, left_lowi = lines[i].polygon
390
                    left_upi, _, _, _ = lines[i].polygon
1✔
391
                    pairs_by_page[page_idx].append([i, i])
1✔
392
                    for j in range(i + 1, len(lines)):  # pylint: disable=invalid-name
1✔
393
                        left_upj, _, _, _ = lines[j].polygon
1✔
394
                        close_on_y_axis = abs(left_upi[1] - left_upj[1]) < threshold_y
1✔
395
                        if close_on_y_axis:
1✔
396
                            pairs_by_page[page_idx].append([i, j])
1✔
397
            # Default if polygon is not available
398
            else:
399
                logger.info(
×
400
                    "Polygon information for lines on page {page_idx} is not available so it is not possible "
401
                    "to enforce a single column page layout.".format(page_idx=page_idx)
402
                )
403
                for i in range(len(lines)):
×
404
                    pairs_by_page[page_idx].append([i, i])
×
405

406
        # merged the line pairs that are connected by page
407
        merged_pairs_by_page = {}
1✔
408
        for page_idx in pairs_by_page:
1✔
409
            graph = nx.Graph()
1✔
410
            graph.add_edges_from(pairs_by_page[page_idx])
1✔
411
            merged_pairs_by_page[page_idx] = [list(a) for a in list(nx.connected_components(graph))]
1✔
412

413
        # Convert line indices to the DocumentLine objects
414
        merged_lines_by_page = {}
1✔
415
        for page_idx, page in enumerate(result.pages):
1✔
416
            rows = []
1✔
417
            lines = page.lines if page.lines else []
1✔
418
            # We use .get(page_idx, []) since the page could be empty
419
            for row_of_lines in merged_pairs_by_page.get(page_idx, []):
1✔
420
                lines_in_row = [lines[line_idx] for line_idx in row_of_lines]
1✔
421
                rows.append(lines_in_row)
1✔
422
            merged_lines_by_page[page_idx] = rows
1✔
423

424
        # Sort the merged pairs in each row by the x-value of the upper left bounding box coordinate
425
        x_sorted_lines_by_page = {}
1✔
426
        for page_idx, _ in enumerate(result.pages):
1✔
427
            sorted_rows = []
1✔
428
            for row_of_lines in merged_lines_by_page[page_idx]:
1✔
429
                sorted_rows.append(sorted(row_of_lines, key=lambda x: x.polygon[0][0]))
1✔
430
            x_sorted_lines_by_page[page_idx] = sorted_rows
1✔
431

432
        # Sort each row within the page by the y-value of the upper left bounding box coordinate
433
        y_sorted_lines_by_page = {}
1✔
434
        for page_idx, _ in enumerate(result.pages):
1✔
435
            sorted_rows = sorted(x_sorted_lines_by_page[page_idx], key=lambda x: x[0].polygon[0][1])
1✔
436
            y_sorted_lines_by_page[page_idx] = sorted_rows
1✔
437

438
        # Construct the text to write
439
        texts = []
1✔
440
        for page_idx, page in enumerate(result.pages):
1✔
441
            tables_on_page = table_spans_by_page[page.page_number]
1✔
442
            page_text = ""
1✔
443
            for row_of_lines in y_sorted_lines_by_page[page_idx]:
1✔
444
                # Check if line is part of a table and if so skip
445
                if any(self._check_if_in_table(tables_on_page, line_or_paragraph=line) for line in row_of_lines):
1✔
446
                    continue
1✔
447
                page_text += " ".join(line.content for line in row_of_lines)
1✔
448
                page_text += "\n"
1✔
449
            texts.append(page_text)
1✔
450
        all_text = "\f".join(texts)
1✔
451
        return Document(content=all_text, meta=meta if meta else {})
1✔
452

453
    def _collect_table_spans(self, result: "AnalyzeResult") -> dict:
1✔
454
        """
455
        Collect the spans of all tables by page number.
456

457
        :param result: The AnalyzeResult object returned by the `begin_analyze_document` method.
458
        :returns: A dictionary with the page number as key and a list of table spans as value.
459
        """
460
        table_spans_by_page = defaultdict(list)
1✔
461
        tables = result.tables if result.tables else []
1✔
462
        for table in tables:
1✔
463
            if not table.bounding_regions:
1✔
464
                continue
×
465
            table_spans_by_page[table.bounding_regions[0].page_number].append(table.spans[0])
1✔
466
        return table_spans_by_page
1✔
467

468
    def _check_if_in_table(
1✔
469
        self, tables_on_page: dict, line_or_paragraph: Union["DocumentLine", "DocumentParagraph"]
470
    ) -> bool:
471
        """
472
        Check if a line or paragraph is part of a table.
473

474
        :param tables_on_page: A dictionary with the page number as key and a list of table spans as value.
475
        :param line_or_paragraph: The line or paragraph to check.
476
        :returns: True if the line or paragraph is part of a table, False otherwise.
477
        """
478
        in_table = False
1✔
479
        # Check if line is part of a table
480
        for table in tables_on_page:
1✔
481
            if table.offset <= line_or_paragraph.spans[0].offset <= table.offset + table.length:
1✔
482
                in_table = True
1✔
483
                break
1✔
484
        return in_table
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc