• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 13855199401

14 Mar 2025 11:01AM UTC coverage: 90.0% (+0.08%) from 89.922%
13855199401

Pull #9037

github

web-flow
Merge 621afb687 into 3d7d65a26
Pull Request #9037: Chat Generatol Protocol POC

9702 of 10780 relevant lines covered (90.0%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.38
haystack/components/extractors/llm_metadata_extractor.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import copy
1✔
6
import json
1✔
7
from concurrent.futures import ThreadPoolExecutor
1✔
8
from enum import Enum
1✔
9
from typing import Any, Dict, List, Optional, Union
1✔
10

11
from jinja2 import meta
1✔
12
from jinja2.sandbox import SandboxedEnvironment
1✔
13

14
from haystack import Document, component, default_from_dict, default_to_dict, logging
1✔
15
from haystack.components.builders import PromptBuilder
1✔
16
from haystack.components.generators.chat.types import ChatGenerator
1✔
17
from haystack.components.preprocessors import DocumentSplitter
1✔
18
from haystack.core.serialization import import_class_by_name
1✔
19
from haystack.dataclasses import ChatMessage
1✔
20
from haystack.utils import deserialize_callable, deserialize_secrets_inplace, expand_page_range
1✔
21

22
logger = logging.getLogger(__name__)
1✔
23

24

25
@component
1✔
26
class LLMMetadataExtractor:
1✔
27
    """
28
    Extracts metadata from documents using a Large Language Model (LLM).
29

30
    The metadata is extracted by providing a prompt to an LLM that generates the metadata.
31

32
    This component expects as input a list of documents and a prompt. The prompt should have a variable called
33
    `document` that will point to a single document in the list of documents. So to access the content of the document,
34
    you can use `{{ document.content }}` in the prompt.
35

36
    The component will run the LLM on each document in the list and extract metadata from the document. The metadata
37
    will be added to the document's metadata field. If the LLM fails to extract metadata from a document, the document
38
    will be added to the `failed_documents` list. The failed documents will have the keys `metadata_extraction_error` and
39
    `metadata_extraction_response` in their metadata. These documents can be re-run with another extractor to
40
    extract metadata by using the `metadata_extraction_response` and `metadata_extraction_error` in the prompt.
41

42
    ```python
43
    from haystack import Document
44
    from haystack_experimental.components.extractors.llm_metadata_extractor import LLMMetadataExtractor
45

46
    NER_PROMPT = '''
47
    -Goal-
48
    Given text and a list of entity types, identify all entities of those types from the text.
49

50
    -Steps-
51
    1. Identify all entities. For each identified entity, extract the following information:
52
    - entity_name: Name of the entity, capitalized
53
    - entity_type: One of the following types: [organization, product, service, industry]
54
    Format each entity as a JSON like: {"entity": <entity_name>, "entity_type": <entity_type>}
55

56
    2. Return output in a single list with all the entities identified in steps 1.
57

58
    -Examples-
59
    ######################
60
    Example 1:
61
    entity_types: [organization, person, partnership, financial metric, product, service, industry, investment strategy, market trend]
62
    text: Another area of strength is our co-brand issuance. Visa is the primary network partner for eight of the top
63
    10 co-brand partnerships in the US today and we are pleased that Visa has finalized a multi-year extension of
64
    our successful credit co-branded partnership with Alaska Airlines, a portfolio that benefits from a loyal customer
65
    base and high cross-border usage.
66
    We have also had significant co-brand momentum in CEMEA. First, we launched a new co-brand card in partnership
67
    with Qatar Airways, British Airways and the National Bank of Kuwait. Second, we expanded our strong global
68
    Marriott relationship to launch Qatar's first hospitality co-branded card with Qatar Islamic Bank. Across the
69
    United Arab Emirates, we now have exclusive agreements with all the leading airlines marked by a recent
70
    agreement with Emirates Skywards.
71
    And we also signed an inaugural Airline co-brand agreement in Morocco with Royal Air Maroc. Now newer digital
72
    issuers are equally
73
    ------------------------
74
    output:
75
    {"entities": [{"entity": "Visa", "entity_type": "company"}, {"entity": "Alaska Airlines", "entity_type": "company"}, {"entity": "Qatar Airways", "entity_type": "company"}, {"entity": "British Airways", "entity_type": "company"}, {"entity": "National Bank of Kuwait", "entity_type": "company"}, {"entity": "Marriott", "entity_type": "company"}, {"entity": "Qatar Islamic Bank", "entity_type": "company"}, {"entity": "Emirates Skywards", "entity_type": "company"}, {"entity": "Royal Air Maroc", "entity_type": "company"}]}
76
    #############################
77
    -Real Data-
78
    ######################
79
    entity_types: [company, organization, person, country, product, service]
80
    text: {{ document.content }}
81
    ######################
82
    output:
83
    '''
84

85
    docs = [
86
        Document(content="deepset was founded in 2018 in Berlin, and is known for its Haystack framework"),
87
        Document(content="Hugging Face is a company that was founded in New York, USA and is known for its Transformers library")
88
    ]
89

90
    extractor = LLMMetadataExtractor(
91
        prompt=NER_PROMPT,
92
        generator_api="openai",
93
        generator_api_params={
94
            "generation_kwargs": {
95
                "max_tokens": 500,
96
                "temperature": 0.0,
97
                "seed": 0,
98
                "response_format": {"type": "json_object"},
99
            },
100
            "max_retries": 1,
101
            "timeout": 60.0,
102
        },
103
        expected_keys=["entities"],
104
        raise_on_failure=False,
105
    )
106
    extractor.warm_up()
107
    extractor.run(documents=docs)
108
    >> {'documents': [
109
        Document(id=.., content: 'deepset was founded in 2018 in Berlin, and is known for its Haystack framework',
110
        meta: {'entities': [{'entity': 'deepset', 'entity_type': 'company'}, {'entity': 'Berlin', 'entity_type': 'city'},
111
              {'entity': 'Haystack', 'entity_type': 'product'}]}),
112
        Document(id=.., content: 'Hugging Face is a company that was founded in New York, USA and is known for its Transformers library',
113
        meta: {'entities': [
114
                {'entity': 'Hugging Face', 'entity_type': 'company'}, {'entity': 'New York', 'entity_type': 'city'},
115
                {'entity': 'USA', 'entity_type': 'country'}, {'entity': 'Transformers', 'entity_type': 'product'}
116
                ]})
117
           ]
118
        'failed_documents': []
119
       }
120
    >>
121
    ```
122
    """  # noqa: E501
123

124
    def __init__(  # pylint: disable=R0917
1✔
125
        self,
126
        prompt: str,
127
        chat_generator: ChatGenerator,
128
        expected_keys: Optional[List[str]] = None,
129
        page_range: Optional[List[Union[str, int]]] = None,
130
        raise_on_failure: bool = False,
131
        max_workers: int = 3,
132
    ):
133
        """
134
        Initializes the LLMMetadataExtractor.
135

136
        :param prompt: The prompt to be used for the LLM.
137
        :param generator_api: The API provider for the LLM. Currently supported providers are:
138
                              "openai", "openai_azure", "aws_bedrock", "google_vertex"
139
        :param generator_api_params: The parameters for the LLM generator.
140
        :param expected_keys: The keys expected in the JSON output from the LLM.
141
        :param page_range: A range of pages to extract metadata from. For example, page_range=['1', '3'] will extract
142
                           metadata from the first and third pages of each document. It also accepts printable range
143
                           strings, e.g.: ['1-3', '5', '8', '10-12'] will extract metadata from pages 1, 2, 3, 5, 8, 10,
144
                           11, 12. If None, metadata will be extracted from the entire document for each document in the
145
                           documents list.
146
                           This parameter is optional and can be overridden in the `run` method.
147
        :param raise_on_failure: Whether to raise an error on failure during the execution of the Generator or
148
                                 validation of the JSON output.
149
        :param max_workers: The maximum number of workers to use in the thread pool executor.
150
        """
151
        self.prompt = prompt
1✔
152
        ast = SandboxedEnvironment().parse(prompt)
1✔
153
        template_variables = meta.find_undeclared_variables(ast)
1✔
154
        variables = list(template_variables)
1✔
155
        if len(variables) > 1 or variables[0] != "document":
1✔
156
            raise ValueError(
1✔
157
                f"Prompt must have exactly one variable called 'document'. Found {','.join(variables)} in the prompt."
158
            )
159
        self.builder = PromptBuilder(prompt, required_variables=variables)
1✔
160
        self.raise_on_failure = raise_on_failure
1✔
161
        self.expected_keys = expected_keys or []
1✔
162
        self._chat_generator = chat_generator
1✔
163
        self.splitter = DocumentSplitter(split_by="page", split_length=1)
1✔
164
        self.expanded_range = expand_page_range(page_range) if page_range else None
1✔
165
        self.max_workers = max_workers
1✔
166

167
    def warm_up(self):
1✔
168
        """
169
        Warm up the LLM provider component.
170
        """
171
        if hasattr(self._chat_generator, "warm_up"):
1✔
172
            self._chat_generator.warm_up()
×
173

174
    def to_dict(self) -> Dict[str, Any]:
1✔
175
        """
176
        Serializes the component to a dictionary.
177

178
        :returns:
179
            Dictionary with serialized data.
180
        """
181

182
        chat_generator = self._chat_generator.to_dict()
1✔
183

184
        return default_to_dict(
1✔
185
            self,
186
            prompt=self.prompt,
187
            chat_generator=chat_generator,
188
            expected_keys=self.expected_keys,
189
            page_range=self.expanded_range,
190
            raise_on_failure=self.raise_on_failure,
191
            max_workers=self.max_workers,
192
        )
193

194
    @classmethod
1✔
195
    def from_dict(cls, data: Dict[str, Any]) -> "LLMMetadataExtractor":
1✔
196
        """
197
        Deserializes the component from a dictionary.
198

199
        :param data:
200
            Dictionary with serialized data.
201
        :returns:
202
            An instance of the component.
203
        """
204

205
        init_parameters = data.get("init_parameters", {})
1✔
206

207
        chat_generator_class = import_class_by_name(data["init_parameters"]["chat_generator"]["type"])
1✔
208
        chat_generator_instance = chat_generator_class.from_dict(init_parameters["chat_generator"])
1✔
209

210
        init_parameters["chat_generator"] = chat_generator_instance
1✔
211

212
        return default_from_dict(cls, data)
1✔
213

214
    def _extract_metadata(self, llm_answer: str) -> Dict[str, Any]:
1✔
215
        try:
1✔
216
            parsed_metadata = json.loads(llm_answer)
1✔
217
        except json.JSONDecodeError as e:
1✔
218
            logger.warning(
1✔
219
                "Response from the LLM is not valid JSON. Skipping metadata extraction. Received output: {response}",
220
                response=llm_answer,
221
            )
222
            if self.raise_on_failure:
1✔
223
                raise e
1✔
224
            return {"error": "Response is not valid JSON. Received JSONDecodeError: " + str(e)}
1✔
225

226
        if not all(key in parsed_metadata for key in self.expected_keys):
1✔
227
            logger.warning(
1✔
228
                "Expected response from LLM to be a JSON with keys {expected_keys}, got {parsed_json}. "
229
                "Continuing extraction with received output.",
230
                expected_keys=self.expected_keys,
231
                parsed_json=parsed_metadata,
232
            )
233

234
        return parsed_metadata
1✔
235

236
    def _prepare_prompts(
1✔
237
        self, documents: List[Document], expanded_range: Optional[List[int]] = None
238
    ) -> List[Union[ChatMessage, None]]:
239
        all_prompts: List[Union[ChatMessage, None]] = []
1✔
240
        for document in documents:
1✔
241
            if not document.content:
1✔
242
                logger.warning("Document {doc_id} has no content. Skipping metadata extraction.", doc_id=document.id)
1✔
243
                all_prompts.append(None)
1✔
244
                continue
1✔
245

246
            if expanded_range:
1✔
247
                doc_copy = copy.deepcopy(document)
1✔
248
                pages = self.splitter.run(documents=[doc_copy])
1✔
249
                content = ""
1✔
250
                for idx, page in enumerate(pages["documents"]):
1✔
251
                    if idx + 1 in expanded_range:
1✔
252
                        content += page.content
1✔
253
                doc_copy.content = content
1✔
254
            else:
255
                doc_copy = document
1✔
256

257
            prompt_with_doc = self.builder.run(template=self.prompt, template_variables={"document": doc_copy})
1✔
258

259
            # build a ChatMessage with the prompt
260
            message = ChatMessage.from_user(prompt_with_doc["prompt"])
1✔
261
            all_prompts.append(message)
1✔
262

263
        return all_prompts
1✔
264

265
    def _run_on_thread(self, prompt: Optional[ChatMessage]) -> Dict[str, Any]:
1✔
266
        # If prompt is None, return an empty dictionary
267
        if prompt is None:
×
268
            return {"replies": ["{}"]}
×
269

270
        try:
×
271
            result = self._chat_generator.run(messages=[prompt])
×
272
        except Exception as e:
×
273
            logger.error(
×
274
                "LLM {class_name} execution failed. Skipping metadata extraction. Failed with exception '{error}'.",
275
                class_name=self._chat_generator.__class__.__name__,
276
                error=e,
277
            )
278
            if self.raise_on_failure:
×
279
                raise e
×
280
            result = {"error": "LLM failed with exception: " + str(e)}
×
281
        return result
×
282

283
    @component.output_types(documents=List[Document], failed_documents=List[Document])
1✔
284
    def run(self, documents: List[Document], page_range: Optional[List[Union[str, int]]] = None):
1✔
285
        """
286
        Extract metadata from documents using a Large Language Model.
287

288
        If `page_range` is provided, the metadata will be extracted from the specified range of pages. This component
289
        will split the documents into pages and extract metadata from the specified range of pages. The metadata will be
290
        extracted from the entire document if `page_range` is not provided.
291

292
        The original documents will be returned  updated with the extracted metadata.
293

294
        :param documents: List of documents to extract metadata from.
295
        :param page_range: A range of pages to extract metadata from. For example, page_range=['1', '3'] will extract
296
                           metadata from the first and third pages of each document. It also accepts printable range
297
                           strings, e.g.: ['1-3', '5', '8', '10-12'] will extract metadata from pages 1, 2, 3, 5, 8, 10,
298
                           11, 12.
299
                           If None, metadata will be extracted from the entire document for each document in the
300
                           documents list.
301
        :returns:
302
            A dictionary with the keys:
303
            - "documents": A list of documents that were successfully updated with the extracted metadata.
304
            - "failed_documents": A list of documents that failed to extract metadata. These documents will have
305
            "metadata_extraction_error" and "metadata_extraction_response" in their metadata. These documents can be
306
            re-run with the extractor to extract metadata.
307
        """
308
        if len(documents) == 0:
1✔
309
            logger.warning("No documents provided. Skipping metadata extraction.")
1✔
310
            return {"documents": [], "failed_documents": []}
1✔
311

312
        expanded_range = self.expanded_range
×
313
        if page_range:
×
314
            expanded_range = expand_page_range(page_range)
×
315

316
        # Create ChatMessage prompts for each document
317
        all_prompts = self._prepare_prompts(documents=documents, expanded_range=expanded_range)
×
318

319
        # Run the LLM on each prompt
320
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
×
321
            results = executor.map(self._run_on_thread, all_prompts)
×
322

323
        successful_documents = []
×
324
        failed_documents = []
×
325
        for document, result in zip(documents, results):
×
326
            if "error" in result:
×
327
                document.meta["metadata_extraction_error"] = result["error"]
×
328
                document.meta["metadata_extraction_response"] = None
×
329
                failed_documents.append(document)
×
330
                continue
×
331

332
            parsed_metadata = self._extract_metadata(result["replies"][0].text)
×
333
            if "error" in parsed_metadata:
×
334
                document.meta["metadata_extraction_error"] = parsed_metadata["error"]
×
335
                document.meta["metadata_extraction_response"] = result["replies"][0]
×
336
                failed_documents.append(document)
×
337
                continue
×
338

339
            for key in parsed_metadata:
×
340
                document.meta[key] = parsed_metadata[key]
×
341
                # Remove metadata_extraction_error and metadata_extraction_response if present from previous runs
342
                document.meta.pop("metadata_extraction_error", None)
×
343
                document.meta.pop("metadata_extraction_response", None)
×
344
            successful_documents.append(document)
×
345

346
        return {"documents": successful_documents, "failed_documents": failed_documents}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc