• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 16933015230

13 Aug 2025 09:18AM UTC coverage: 92.184% (+0.2%) from 91.969%
16933015230

Pull #9699

github

web-flow
Merge cfbd602e7 into 8160ea8bf
Pull Request #9699: feat: Update `source_id_meta_field` in `SentenceWindowRetriever` to also accept a list of values

12891 of 13984 relevant lines covered (92.18%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.61
haystack/components/extractors/llm_metadata_extractor.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import copy
1✔
6
import json
1✔
7
from concurrent.futures import ThreadPoolExecutor
1✔
8
from dataclasses import replace
1✔
9
from typing import Any, Optional, Union
1✔
10

11
from jinja2 import meta
1✔
12
from jinja2.sandbox import SandboxedEnvironment
1✔
13

14
from haystack import Document, component, default_from_dict, default_to_dict, logging
1✔
15
from haystack.components.builders import PromptBuilder
1✔
16
from haystack.components.generators.chat.types import ChatGenerator
1✔
17
from haystack.components.preprocessors import DocumentSplitter
1✔
18
from haystack.core.serialization import component_to_dict
1✔
19
from haystack.dataclasses import ChatMessage
1✔
20
from haystack.utils import deserialize_chatgenerator_inplace, expand_page_range
1✔
21

22
logger = logging.getLogger(__name__)
1✔
23

24

25
@component
1✔
26
class LLMMetadataExtractor:
1✔
27
    """
28
    Extracts metadata from documents using a Large Language Model (LLM).
29

30
    The metadata is extracted by providing a prompt to an LLM that generates the metadata.
31

32
    This component expects as input a list of documents and a prompt. The prompt should have a variable called
33
    `document` that will point to a single document in the list of documents. So to access the content of the document,
34
    you can use `{{ document.content }}` in the prompt.
35

36
    The component will run the LLM on each document in the list and extract metadata from the document. The metadata
37
    will be added to the document's metadata field. If the LLM fails to extract metadata from a document, the document
38
    will be added to the `failed_documents` list. The failed documents will have the keys `metadata_extraction_error` and
39
    `metadata_extraction_response` in their metadata. These documents can be re-run with another extractor to
40
    extract metadata by using the `metadata_extraction_response` and `metadata_extraction_error` in the prompt.
41

42
    ```python
43
    from haystack import Document
44
    from haystack.components.extractors.llm_metadata_extractor import LLMMetadataExtractor
45
    from haystack.components.generators.chat import OpenAIChatGenerator
46

47
    NER_PROMPT = '''
48
    -Goal-
49
    Given text and a list of entity types, identify all entities of those types from the text.
50

51
    -Steps-
52
    1. Identify all entities. For each identified entity, extract the following information:
53
    - entity: Name of the entity
54
    - entity_type: One of the following types: [organization, product, service, industry]
55
    Format each entity as a JSON like: {"entity": <entity_name>, "entity_type": <entity_type>}
56

57
    2. Return output in a single list with all the entities identified in steps 1.
58

59
    -Examples-
60
    ######################
61
    Example 1:
62
    entity_types: [organization, person, partnership, financial metric, product, service, industry, investment strategy, market trend]
63
    text: Another area of strength is our co-brand issuance. Visa is the primary network partner for eight of the top
64
    10 co-brand partnerships in the US today and we are pleased that Visa has finalized a multi-year extension of
65
    our successful credit co-branded partnership with Alaska Airlines, a portfolio that benefits from a loyal customer
66
    base and high cross-border usage.
67
    We have also had significant co-brand momentum in CEMEA. First, we launched a new co-brand card in partnership
68
    with Qatar Airways, British Airways and the National Bank of Kuwait. Second, we expanded our strong global
69
    Marriott relationship to launch Qatar's first hospitality co-branded card with Qatar Islamic Bank. Across the
70
    United Arab Emirates, we now have exclusive agreements with all the leading airlines marked by a recent
71
    agreement with Emirates Skywards.
72
    And we also signed an inaugural Airline co-brand agreement in Morocco with Royal Air Maroc. Now newer digital
73
    issuers are equally
74
    ------------------------
75
    output:
76
    {"entities": [{"entity": "Visa", "entity_type": "company"}, {"entity": "Alaska Airlines", "entity_type": "company"}, {"entity": "Qatar Airways", "entity_type": "company"}, {"entity": "British Airways", "entity_type": "company"}, {"entity": "National Bank of Kuwait", "entity_type": "company"}, {"entity": "Marriott", "entity_type": "company"}, {"entity": "Qatar Islamic Bank", "entity_type": "company"}, {"entity": "Emirates Skywards", "entity_type": "company"}, {"entity": "Royal Air Maroc", "entity_type": "company"}]}
77
    #############################
78
    -Real Data-
79
    ######################
80
    entity_types: [company, organization, person, country, product, service]
81
    text: {{ document.content }}
82
    ######################
83
    output:
84
    '''
85

86
    docs = [
87
        Document(content="deepset was founded in 2018 in Berlin, and is known for its Haystack framework"),
88
        Document(content="Hugging Face is a company that was founded in New York, USA and is known for its Transformers library")
89
    ]
90

91
    chat_generator = OpenAIChatGenerator(
92
        generation_kwargs={
93
            "max_tokens": 500,
94
            "temperature": 0.0,
95
            "seed": 0,
96
            "response_format": {"type": "json_object"},
97
        },
98
        max_retries=1,
99
        timeout=60.0,
100
    )
101

102
    extractor = LLMMetadataExtractor(
103
        prompt=NER_PROMPT,
104
        chat_generator=generator,
105
        expected_keys=["entities"],
106
        raise_on_failure=False,
107
    )
108

109
    extractor.warm_up()
110
    extractor.run(documents=docs)
111
    >> {'documents': [
112
        Document(id=.., content: 'deepset was founded in 2018 in Berlin, and is known for its Haystack framework',
113
        meta: {'entities': [{'entity': 'deepset', 'entity_type': 'company'}, {'entity': 'Berlin', 'entity_type': 'city'},
114
              {'entity': 'Haystack', 'entity_type': 'product'}]}),
115
        Document(id=.., content: 'Hugging Face is a company that was founded in New York, USA and is known for its Transformers library',
116
        meta: {'entities': [
117
                {'entity': 'Hugging Face', 'entity_type': 'company'}, {'entity': 'New York', 'entity_type': 'city'},
118
                {'entity': 'USA', 'entity_type': 'country'}, {'entity': 'Transformers', 'entity_type': 'product'}
119
                ]})
120
           ]
121
        'failed_documents': []
122
       }
123
    >>
124
    ```
125
    """  # noqa: E501
126

127
    def __init__(  # pylint: disable=R0917
1✔
128
        self,
129
        prompt: str,
130
        chat_generator: ChatGenerator,
131
        expected_keys: Optional[list[str]] = None,
132
        page_range: Optional[list[Union[str, int]]] = None,
133
        raise_on_failure: bool = False,
134
        max_workers: int = 3,
135
    ):
136
        """
137
        Initializes the LLMMetadataExtractor.
138

139
        :param prompt: The prompt to be used for the LLM.
140
        :param chat_generator: a ChatGenerator instance which represents the LLM. In order for the component to work,
141
            the LLM should be configured to return a JSON object. For example, when using the OpenAIChatGenerator, you
142
            should pass `{"response_format": {"type": "json_object"}}` in the `generation_kwargs`.
143
        :param expected_keys: The keys expected in the JSON output from the LLM.
144
        :param page_range: A range of pages to extract metadata from. For example, page_range=['1', '3'] will extract
145
            metadata from the first and third pages of each document. It also accepts printable range strings, e.g.:
146
            ['1-3', '5', '8', '10-12'] will extract metadata from pages 1, 2, 3, 5, 8, 10,11, 12.
147
            If None, metadata will be extracted from the entire document for each document in the documents list.
148
            This parameter is optional and can be overridden in the `run` method.
149
        :param raise_on_failure: Whether to raise an error on failure during the execution of the Generator or
150
            validation of the JSON output.
151
        :param max_workers: The maximum number of workers to use in the thread pool executor.
152
        """
153
        self.prompt = prompt
1✔
154
        ast = SandboxedEnvironment().parse(prompt)
1✔
155
        template_variables = meta.find_undeclared_variables(ast)
1✔
156
        variables = list(template_variables)
1✔
157
        if len(variables) > 1 or variables[0] != "document":
1✔
158
            raise ValueError(
1✔
159
                f"Prompt must have exactly one variable called 'document'. Found {','.join(variables)} in the prompt."
160
            )
161
        self.builder = PromptBuilder(prompt, required_variables=variables)
1✔
162
        self.raise_on_failure = raise_on_failure
1✔
163
        self.expected_keys = expected_keys or []
1✔
164
        self.splitter = DocumentSplitter(split_by="page", split_length=1)
1✔
165
        self.expanded_range = expand_page_range(page_range) if page_range else None
1✔
166
        self.max_workers = max_workers
1✔
167
        self._chat_generator = chat_generator
1✔
168

169
    def warm_up(self):
1✔
170
        """
171
        Warm up the LLM provider component.
172
        """
173
        if hasattr(self._chat_generator, "warm_up"):
1✔
174
            self._chat_generator.warm_up()
1✔
175

176
    def to_dict(self) -> dict[str, Any]:
1✔
177
        """
178
        Serializes the component to a dictionary.
179

180
        :returns:
181
            Dictionary with serialized data.
182
        """
183

184
        return default_to_dict(
1✔
185
            self,
186
            prompt=self.prompt,
187
            chat_generator=component_to_dict(obj=self._chat_generator, name="chat_generator"),
188
            expected_keys=self.expected_keys,
189
            page_range=self.expanded_range,
190
            raise_on_failure=self.raise_on_failure,
191
            max_workers=self.max_workers,
192
        )
193

194
    @classmethod
1✔
195
    def from_dict(cls, data: dict[str, Any]) -> "LLMMetadataExtractor":
1✔
196
        """
197
        Deserializes the component from a dictionary.
198

199
        :param data:
200
            Dictionary with serialized data.
201
        :returns:
202
            An instance of the component.
203
        """
204

205
        deserialize_chatgenerator_inplace(data["init_parameters"], key="chat_generator")
1✔
206
        return default_from_dict(cls, data)
1✔
207

208
    def _extract_metadata(self, llm_answer: str) -> dict[str, Any]:
1✔
209
        parsed_metadata: dict[str, Any] = {}
1✔
210

211
        try:
1✔
212
            parsed_metadata = json.loads(llm_answer)
1✔
213
        except json.JSONDecodeError as e:
1✔
214
            logger.warning(
1✔
215
                "Response from the LLM is not valid JSON. Skipping metadata extraction. Received output: {response}",
216
                response=llm_answer,
217
            )
218
            if self.raise_on_failure:
1✔
219
                raise e
1✔
220
            return {"error": "Response is not valid JSON. Received JSONDecodeError: " + str(e)}
1✔
221

222
        if not all(key in parsed_metadata for key in self.expected_keys):
1✔
223
            logger.warning(
1✔
224
                "Expected response from LLM to be a JSON with keys {expected_keys}, got {parsed_json}. "
225
                "Continuing extraction with received output.",
226
                expected_keys=self.expected_keys,
227
                parsed_json=parsed_metadata,
228
            )
229

230
        return parsed_metadata
1✔
231

232
    def _prepare_prompts(
1✔
233
        self, documents: list[Document], expanded_range: Optional[list[int]] = None
234
    ) -> list[Union[ChatMessage, None]]:
235
        all_prompts: list[Union[ChatMessage, None]] = []
1✔
236
        for document in documents:
1✔
237
            if not document.content:
1✔
238
                logger.warning("Document {doc_id} has no content. Skipping metadata extraction.", doc_id=document.id)
1✔
239
                all_prompts.append(None)
1✔
240
                continue
1✔
241

242
            if expanded_range:
1✔
243
                doc_copy = copy.deepcopy(document)
1✔
244
                pages = self.splitter.run(documents=[doc_copy])
1✔
245
                content = ""
1✔
246
                for idx, page in enumerate(pages["documents"]):
1✔
247
                    if idx + 1 in expanded_range:
1✔
248
                        content += page.content
1✔
249
                doc_copy.content = content
1✔
250
            else:
251
                doc_copy = document
1✔
252

253
            prompt_with_doc = self.builder.run(template=self.prompt, template_variables={"document": doc_copy})
1✔
254

255
            # build a ChatMessage with the prompt
256
            message = ChatMessage.from_user(prompt_with_doc["prompt"])
1✔
257
            all_prompts.append(message)
1✔
258

259
        return all_prompts
1✔
260

261
    def _run_on_thread(self, prompt: Optional[ChatMessage]) -> dict[str, Any]:
1✔
262
        # If prompt is None, return an error dictionary
263
        if prompt is None:
1✔
264
            return {"error": "Document has no content, skipping LLM call."}
1✔
265

266
        try:
×
267
            result = self._chat_generator.run(messages=[prompt])
×
268
        except Exception as e:
×
269
            if self.raise_on_failure:
×
270
                raise e
×
271
            logger.error(
×
272
                "LLM {class_name} execution failed. Skipping metadata extraction. Failed with exception '{error}'.",
273
                class_name=self._chat_generator.__class__.__name__,
274
                error=e,
275
            )
276
            result = {"error": "LLM failed with exception: " + str(e)}
×
277
        return result
×
278

279
    @component.output_types(documents=list[Document], failed_documents=list[Document])
1✔
280
    def run(self, documents: list[Document], page_range: Optional[list[Union[str, int]]] = None):
1✔
281
        """
282
        Extract metadata from documents using a Large Language Model.
283

284
        If `page_range` is provided, the metadata will be extracted from the specified range of pages. This component
285
        will split the documents into pages and extract metadata from the specified range of pages. The metadata will be
286
        extracted from the entire document if `page_range` is not provided.
287

288
        The original documents will be returned  updated with the extracted metadata.
289

290
        :param documents: List of documents to extract metadata from.
291
        :param page_range: A range of pages to extract metadata from. For example, page_range=['1', '3'] will extract
292
                           metadata from the first and third pages of each document. It also accepts printable range
293
                           strings, e.g.: ['1-3', '5', '8', '10-12'] will extract metadata from pages 1, 2, 3, 5, 8, 10,
294
                           11, 12.
295
                           If None, metadata will be extracted from the entire document for each document in the
296
                           documents list.
297
        :returns:
298
            A dictionary with the keys:
299
            - "documents": A list of documents that were successfully updated with the extracted metadata.
300
            - "failed_documents": A list of documents that failed to extract metadata. These documents will have
301
            "metadata_extraction_error" and "metadata_extraction_response" in their metadata. These documents can be
302
            re-run with the extractor to extract metadata.
303
        """
304
        if len(documents) == 0:
1✔
305
            logger.warning("No documents provided. Skipping metadata extraction.")
1✔
306
            return {"documents": [], "failed_documents": []}
1✔
307

308
        expanded_range = self.expanded_range
1✔
309
        if page_range:
1✔
310
            expanded_range = expand_page_range(page_range)
×
311

312
        # Create ChatMessage prompts for each document
313
        all_prompts = self._prepare_prompts(documents=documents, expanded_range=expanded_range)
1✔
314

315
        # Run the LLM on each prompt
316
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
1✔
317
            results = executor.map(self._run_on_thread, all_prompts)
1✔
318

319
        successful_documents = []
1✔
320
        failed_documents = []
1✔
321
        for document, result in zip(documents, results):
1✔
322
            new_meta = {**document.meta}
1✔
323
            if "error" in result:
1✔
324
                new_meta["metadata_extraction_error"] = result["error"]
1✔
325
                new_meta["metadata_extraction_response"] = None
1✔
326
                failed_documents.append(replace(document, meta=new_meta))
1✔
327
                continue
1✔
328

329
            parsed_metadata = self._extract_metadata(result["replies"][0].text)
×
330
            if "error" in parsed_metadata:
×
331
                new_meta["metadata_extraction_error"] = parsed_metadata["error"]
×
332
                new_meta["metadata_extraction_response"] = result["replies"][0]
×
333
                failed_documents.append(replace(document, meta=new_meta))
×
334
                continue
×
335

336
            for key in parsed_metadata:
×
337
                new_meta[key] = parsed_metadata[key]
×
338
                # Remove metadata_extraction_error and metadata_extraction_response if present from previous runs
339
                new_meta.pop("metadata_extraction_error", None)
×
340
                new_meta.pop("metadata_extraction_response", None)
×
341
            successful_documents.append(replace(document, meta=new_meta))
×
342

343
        return {"documents": successful_documents, "failed_documents": failed_documents}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc