• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 20241601392

15 Dec 2025 05:34PM UTC coverage: 92.121% (-0.01%) from 92.133%
20241601392

Pull #10244

github

web-flow
Merge 5f2f7fd60 into fd989fecc
Pull Request #10244: feat!: drop Python 3.9 support due to EOL

14123 of 15331 relevant lines covered (92.12%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.5
haystack/components/extractors/llm_metadata_extractor.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import copy
1✔
6
import json
1✔
7
from concurrent.futures import ThreadPoolExecutor
1✔
8
from dataclasses import replace
1✔
9
from typing import Any, Optional, Union
1✔
10

11
from jinja2 import meta
1✔
12
from jinja2.sandbox import SandboxedEnvironment
1✔
13

14
from haystack import Document, component, default_from_dict, default_to_dict, logging
1✔
15
from haystack.components.builders import PromptBuilder
1✔
16
from haystack.components.generators.chat.types import ChatGenerator
1✔
17
from haystack.components.preprocessors import DocumentSplitter
1✔
18
from haystack.core.serialization import component_to_dict
1✔
19
from haystack.dataclasses import ChatMessage
1✔
20
from haystack.utils import deserialize_chatgenerator_inplace, expand_page_range
1✔
21

22
logger = logging.getLogger(__name__)
1✔
23

24

25
@component
1✔
26
class LLMMetadataExtractor:
1✔
27
    """
28
    Extracts metadata from documents using a Large Language Model (LLM).
29

30
    The metadata is extracted by providing a prompt to an LLM that generates the metadata.
31

32
    This component expects as input a list of documents and a prompt. The prompt should have a variable called
33
    `document` that will point to a single document in the list of documents. So to access the content of the document,
34
    you can use `{{ document.content }}` in the prompt.
35

36
    The component will run the LLM on each document in the list and extract metadata from the document. The metadata
37
    will be added to the document's metadata field. If the LLM fails to extract metadata from a document, the document
38
    will be added to the `failed_documents` list. The failed documents will have the keys `metadata_extraction_error` and
39
    `metadata_extraction_response` in their metadata. These documents can be re-run with another extractor to
40
    extract metadata by using the `metadata_extraction_response` and `metadata_extraction_error` in the prompt.
41

42
    ```python
43
    from haystack import Document
44
    from haystack.components.extractors.llm_metadata_extractor import LLMMetadataExtractor
45
    from haystack.components.generators.chat import OpenAIChatGenerator
46

47
    NER_PROMPT = '''
48
    -Goal-
49
    Given text and a list of entity types, identify all entities of those types from the text.
50

51
    -Steps-
52
    1. Identify all entities. For each identified entity, extract the following information:
53
    - entity: Name of the entity
54
    - entity_type: One of the following types: [organization, product, service, industry]
55
    Format each entity as a JSON like: {"entity": <entity_name>, "entity_type": <entity_type>}
56

57
    2. Return output in a single list with all the entities identified in steps 1.
58

59
    -Examples-
60
    ######################
61
    Example 1:
62
    entity_types: [organization, person, partnership, financial metric, product, service, industry, investment strategy, market trend]
63
    text: Another area of strength is our co-brand issuance. Visa is the primary network partner for eight of the top
64
    10 co-brand partnerships in the US today and we are pleased that Visa has finalized a multi-year extension of
65
    our successful credit co-branded partnership with Alaska Airlines, a portfolio that benefits from a loyal customer
66
    base and high cross-border usage.
67
    We have also had significant co-brand momentum in CEMEA. First, we launched a new co-brand card in partnership
68
    with Qatar Airways, British Airways and the National Bank of Kuwait. Second, we expanded our strong global
69
    Marriott relationship to launch Qatar's first hospitality co-branded card with Qatar Islamic Bank. Across the
70
    United Arab Emirates, we now have exclusive agreements with all the leading airlines marked by a recent
71
    agreement with Emirates Skywards.
72
    And we also signed an inaugural Airline co-brand agreement in Morocco with Royal Air Maroc. Now newer digital
73
    issuers are equally
74
    ------------------------
75
    output:
76
    {"entities": [{"entity": "Visa", "entity_type": "company"}, {"entity": "Alaska Airlines", "entity_type": "company"}, {"entity": "Qatar Airways", "entity_type": "company"}, {"entity": "British Airways", "entity_type": "company"}, {"entity": "National Bank of Kuwait", "entity_type": "company"}, {"entity": "Marriott", "entity_type": "company"}, {"entity": "Qatar Islamic Bank", "entity_type": "company"}, {"entity": "Emirates Skywards", "entity_type": "company"}, {"entity": "Royal Air Maroc", "entity_type": "company"}]}
77
    #############################
78
    -Real Data-
79
    ######################
80
    entity_types: [company, organization, person, country, product, service]
81
    text: {{ document.content }}
82
    ######################
83
    output:
84
    '''
85

86
    docs = [
87
        Document(content="deepset was founded in 2018 in Berlin, and is known for its Haystack framework"),
88
        Document(content="Hugging Face is a company that was founded in New York, USA and is known for its Transformers library")
89
    ]
90

91
    chat_generator = OpenAIChatGenerator(
92
        generation_kwargs={
93
            "max_completion_tokens": 500,
94
            "temperature": 0.0,
95
            "seed": 0,
96
            "response_format": {"type": "json_object"},
97
        },
98
        max_retries=1,
99
        timeout=60.0,
100
    )
101

102
    extractor = LLMMetadataExtractor(
103
        prompt=NER_PROMPT,
104
        chat_generator=generator,
105
        expected_keys=["entities"],
106
        raise_on_failure=False,
107
    )
108

109
    extractor.warm_up()
110
    extractor.run(documents=docs)
111
    >> {'documents': [
112
        Document(id=.., content: 'deepset was founded in 2018 in Berlin, and is known for its Haystack framework',
113
        meta: {'entities': [{'entity': 'deepset', 'entity_type': 'company'}, {'entity': 'Berlin', 'entity_type': 'city'},
114
              {'entity': 'Haystack', 'entity_type': 'product'}]}),
115
        Document(id=.., content: 'Hugging Face is a company that was founded in New York, USA and is known for its Transformers library',
116
        meta: {'entities': [
117
                {'entity': 'Hugging Face', 'entity_type': 'company'}, {'entity': 'New York', 'entity_type': 'city'},
118
                {'entity': 'USA', 'entity_type': 'country'}, {'entity': 'Transformers', 'entity_type': 'product'}
119
                ]})
120
           ]
121
        'failed_documents': []
122
       }
123
    >>
124
    ```
125
    """  # noqa: E501
126

127
    def __init__(  # pylint: disable=R0917
1✔
128
        self,
129
        prompt: str,
130
        chat_generator: ChatGenerator,
131
        expected_keys: Optional[list[str]] = None,
132
        page_range: Optional[list[Union[str, int]]] = None,
133
        raise_on_failure: bool = False,
134
        max_workers: int = 3,
135
    ):
136
        """
137
        Initializes the LLMMetadataExtractor.
138

139
        :param prompt: The prompt to be used for the LLM.
140
        :param chat_generator: a ChatGenerator instance which represents the LLM. In order for the component to work,
141
            the LLM should be configured to return a JSON object. For example, when using the OpenAIChatGenerator, you
142
            should pass `{"response_format": {"type": "json_object"}}` in the `generation_kwargs`.
143
        :param expected_keys: The keys expected in the JSON output from the LLM.
144
        :param page_range: A range of pages to extract metadata from. For example, page_range=['1', '3'] will extract
145
            metadata from the first and third pages of each document. It also accepts printable range strings, e.g.:
146
            ['1-3', '5', '8', '10-12'] will extract metadata from pages 1, 2, 3, 5, 8, 10,11, 12.
147
            If None, metadata will be extracted from the entire document for each document in the documents list.
148
            This parameter is optional and can be overridden in the `run` method.
149
        :param raise_on_failure: Whether to raise an error on failure during the execution of the Generator or
150
            validation of the JSON output.
151
        :param max_workers: The maximum number of workers to use in the thread pool executor.
152
        """
153
        self.prompt = prompt
1✔
154
        ast = SandboxedEnvironment().parse(prompt)
1✔
155
        template_variables = meta.find_undeclared_variables(ast)
1✔
156
        variables = list(template_variables)
1✔
157
        if len(variables) > 1 or variables[0] != "document":
1✔
158
            raise ValueError(
1✔
159
                f"Prompt must have exactly one variable called 'document'. Found {','.join(variables)} in the prompt."
160
            )
161
        self.builder = PromptBuilder(prompt, required_variables=variables)
1✔
162
        self.raise_on_failure = raise_on_failure
1✔
163
        self.expected_keys = expected_keys or []
1✔
164
        self.splitter = DocumentSplitter(split_by="page", split_length=1)
1✔
165
        self.expanded_range = expand_page_range(page_range) if page_range else None
1✔
166
        self.max_workers = max_workers
1✔
167
        self._chat_generator = chat_generator
1✔
168
        self._is_warmed_up = False
1✔
169

170
    def warm_up(self):
1✔
171
        """
172
        Warm up the LLM provider component.
173
        """
174
        if not self._is_warmed_up:
1✔
175
            if hasattr(self._chat_generator, "warm_up"):
1✔
176
                self._chat_generator.warm_up()
1✔
177
            self._is_warmed_up = True
1✔
178

179
    def to_dict(self) -> dict[str, Any]:
1✔
180
        """
181
        Serializes the component to a dictionary.
182

183
        :returns:
184
            Dictionary with serialized data.
185
        """
186

187
        return default_to_dict(
1✔
188
            self,
189
            prompt=self.prompt,
190
            chat_generator=component_to_dict(obj=self._chat_generator, name="chat_generator"),
191
            expected_keys=self.expected_keys,
192
            page_range=self.expanded_range,
193
            raise_on_failure=self.raise_on_failure,
194
            max_workers=self.max_workers,
195
        )
196

197
    @classmethod
1✔
198
    def from_dict(cls, data: dict[str, Any]) -> "LLMMetadataExtractor":
1✔
199
        """
200
        Deserializes the component from a dictionary.
201

202
        :param data:
203
            Dictionary with serialized data.
204
        :returns:
205
            An instance of the component.
206
        """
207

208
        deserialize_chatgenerator_inplace(data["init_parameters"], key="chat_generator")
1✔
209
        return default_from_dict(cls, data)
1✔
210

211
    def _extract_metadata(self, llm_answer: str) -> dict[str, Any]:
1✔
212
        parsed_metadata: dict[str, Any] = {}
1✔
213

214
        try:
1✔
215
            parsed_metadata = json.loads(llm_answer)
1✔
216
        except json.JSONDecodeError as e:
1✔
217
            logger.warning(
1✔
218
                "Response from the LLM is not valid JSON. Skipping metadata extraction. Received output: {response}",
219
                response=llm_answer,
220
            )
221
            if self.raise_on_failure:
1✔
222
                raise e
1✔
223
            return {"error": "Response is not valid JSON. Received JSONDecodeError: " + str(e)}
×
224

225
        if not all(key in parsed_metadata for key in self.expected_keys):
1✔
226
            logger.warning(
1✔
227
                "Expected response from LLM to be a JSON with keys {expected_keys}, got {parsed_json}. "
228
                "Continuing extraction with received output.",
229
                expected_keys=self.expected_keys,
230
                parsed_json=parsed_metadata,
231
            )
232

233
        return parsed_metadata
1✔
234

235
    def _prepare_prompts(
1✔
236
        self, documents: list[Document], expanded_range: Optional[list[int]] = None
237
    ) -> list[Union[ChatMessage, None]]:
238
        all_prompts: list[Union[ChatMessage, None]] = []
1✔
239
        for document in documents:
1✔
240
            if not document.content:
1✔
241
                logger.warning("Document {doc_id} has no content. Skipping metadata extraction.", doc_id=document.id)
1✔
242
                all_prompts.append(None)
1✔
243
                continue
1✔
244

245
            if expanded_range:
1✔
246
                doc_copy = copy.deepcopy(document)
1✔
247
                pages = self.splitter.run(documents=[doc_copy])
1✔
248
                content = ""
1✔
249
                for idx, page in enumerate(pages["documents"]):
1✔
250
                    if idx + 1 in expanded_range:
1✔
251
                        content += page.content
1✔
252
                doc_copy.content = content
1✔
253
            else:
254
                doc_copy = document
1✔
255

256
            prompt_with_doc = self.builder.run(template=self.prompt, template_variables={"document": doc_copy})
1✔
257

258
            # build a ChatMessage with the prompt
259
            message = ChatMessage.from_user(prompt_with_doc["prompt"])
1✔
260
            all_prompts.append(message)
1✔
261

262
        return all_prompts
1✔
263

264
    def _run_on_thread(self, prompt: Optional[ChatMessage]) -> dict[str, Any]:
1✔
265
        # If prompt is None, return an error dictionary
266
        if prompt is None:
1✔
267
            return {"error": "Document has no content, skipping LLM call."}
1✔
268

269
        try:
×
270
            result = self._chat_generator.run(messages=[prompt])
×
271
        except Exception as e:
×
272
            if self.raise_on_failure:
×
273
                raise e
×
274
            logger.error(
×
275
                "LLM {class_name} execution failed. Skipping metadata extraction. Failed with exception '{error}'.",
276
                class_name=self._chat_generator.__class__.__name__,
277
                error=e,
278
            )
279
            result = {"error": "LLM failed with exception: " + str(e)}
×
280
        return result
×
281

282
    @component.output_types(documents=list[Document], failed_documents=list[Document])
1✔
283
    def run(self, documents: list[Document], page_range: Optional[list[Union[str, int]]] = None):
1✔
284
        """
285
        Extract metadata from documents using a Large Language Model.
286

287
        If `page_range` is provided, the metadata will be extracted from the specified range of pages. This component
288
        will split the documents into pages and extract metadata from the specified range of pages. The metadata will be
289
        extracted from the entire document if `page_range` is not provided.
290

291
        The original documents will be returned  updated with the extracted metadata.
292

293
        :param documents: List of documents to extract metadata from.
294
        :param page_range: A range of pages to extract metadata from. For example, page_range=['1', '3'] will extract
295
                           metadata from the first and third pages of each document. It also accepts printable range
296
                           strings, e.g.: ['1-3', '5', '8', '10-12'] will extract metadata from pages 1, 2, 3, 5, 8, 10,
297
                           11, 12.
298
                           If None, metadata will be extracted from the entire document for each document in the
299
                           documents list.
300
        :returns:
301
            A dictionary with the keys:
302
            - "documents": A list of documents that were successfully updated with the extracted metadata.
303
            - "failed_documents": A list of documents that failed to extract metadata. These documents will have
304
            "metadata_extraction_error" and "metadata_extraction_response" in their metadata. These documents can be
305
            re-run with the extractor to extract metadata.
306
        """
307
        if len(documents) == 0:
1✔
308
            logger.warning("No documents provided. Skipping metadata extraction.")
1✔
309
            return {"documents": [], "failed_documents": []}
1✔
310

311
        if not self._is_warmed_up:
1✔
312
            self.warm_up()
1✔
313

314
        expanded_range = self.expanded_range
1✔
315
        if page_range:
1✔
316
            expanded_range = expand_page_range(page_range)
×
317

318
        # Create ChatMessage prompts for each document
319
        all_prompts = self._prepare_prompts(documents=documents, expanded_range=expanded_range)
1✔
320

321
        # Run the LLM on each prompt
322
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
1✔
323
            results = executor.map(self._run_on_thread, all_prompts)
1✔
324

325
        successful_documents = []
1✔
326
        failed_documents = []
1✔
327
        for document, result in zip(documents, results):
1✔
328
            new_meta = {**document.meta}
1✔
329
            if "error" in result:
1✔
330
                new_meta["metadata_extraction_error"] = result["error"]
1✔
331
                new_meta["metadata_extraction_response"] = None
1✔
332
                failed_documents.append(replace(document, meta=new_meta))
1✔
333
                continue
1✔
334

335
            parsed_metadata = self._extract_metadata(result["replies"][0].text)
×
336
            if "error" in parsed_metadata:
×
337
                new_meta["metadata_extraction_error"] = parsed_metadata["error"]
×
338
                new_meta["metadata_extraction_response"] = result["replies"][0]
×
339
                failed_documents.append(replace(document, meta=new_meta))
×
340
                continue
×
341

342
            for key in parsed_metadata:
×
343
                new_meta[key] = parsed_metadata[key]
×
344
                # Remove metadata_extraction_error and metadata_extraction_response if present from previous runs
345
                new_meta.pop("metadata_extraction_error", None)
×
346
                new_meta.pop("metadata_extraction_response", None)
×
347
            successful_documents.append(replace(document, meta=new_meta))
×
348

349
        return {"documents": successful_documents, "failed_documents": failed_documents}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc