• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 5806148612

pending completion
5806148612

push

github

web-flow
chore: normalize more optional imports (#5251)

* docstore filters

* modeling metrics

* doc language classifier

* file converter

* docx converter

* tika

* preprocessor

* context matcher

* pylint

10921 of 23200 relevant lines covered (47.07%)

2.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.29
haystack/nodes/preprocessor/preprocessor.py
1
from typing import List, Optional, Generator, Set, Union, Tuple, Dict, Literal
11✔
2

3
import logging
11✔
4
import re
11✔
5
from copy import deepcopy
11✔
6
from functools import partial, reduce
11✔
7
from itertools import chain
11✔
8
import warnings
11✔
9
from pathlib import Path
11✔
10
from pickle import UnpicklingError
11✔
11

12
from tqdm import tqdm
11✔
13
from more_itertools import windowed
11✔
14

15
from haystack.nodes.preprocessor.base import BasePreProcessor
11✔
16
from haystack.errors import HaystackError
11✔
17
from haystack.schema import Document
11✔
18
from haystack.lazy_imports import LazyImport
11✔
19

20

21
logger = logging.getLogger(__name__)
11✔
22

23

24
with LazyImport("Run 'pip install farm-haystack[preprocessing]' or 'pip install nltk'") as nltk_import:
11✔
25
    import nltk
11✔
26

27

28
iso639_to_nltk = {
11✔
29
    "ru": "russian",
30
    "sl": "slovene",
31
    "es": "spanish",
32
    "sv": "swedish",
33
    "tr": "turkish",
34
    "cs": "czech",
35
    "da": "danish",
36
    "nl": "dutch",
37
    "en": "english",
38
    "et": "estonian",
39
    "fi": "finnish",
40
    "fr": "french",
41
    "de": "german",
42
    "el": "greek",
43
    "it": "italian",
44
    "no": "norwegian",
45
    "pl": "polish",
46
    "pt": "portuguese",
47
    "ml": "malayalam",
48
}
49

50

51
class PreProcessor(BasePreProcessor):
11✔
52
    def __init__(
11✔
53
        self,
54
        clean_whitespace: bool = True,
55
        clean_header_footer: bool = False,
56
        clean_empty_lines: bool = True,
57
        remove_substrings: Optional[List[str]] = None,
58
        split_by: Optional[Literal["word", "sentence", "passage"]] = "word",
59
        split_length: int = 200,
60
        split_overlap: int = 0,
61
        split_respect_sentence_boundary: bool = True,
62
        tokenizer_model_folder: Optional[Union[str, Path]] = None,
63
        language: str = "en",
64
        id_hash_keys: Optional[List[str]] = None,
65
        progress_bar: bool = True,
66
        add_page_number: bool = False,
67
        max_chars_check: int = 10_000,
68
    ):
69
        """
70
        :param clean_header_footer: Use heuristic to remove footers and headers across different pages by searching
71
                                     for the longest common string. This heuristic uses exact matches and therefore
72
                                     works well for footers like "Copyright 2019 by XXX", but won't detect "Page 3 of 4"
73
                                     or similar.
74
        :param clean_whitespace: Strip whitespaces before or after each line in the text.
75
        :param clean_empty_lines: Remove more than two empty lines in the text.
76
        :param remove_substrings: Remove specified substrings from the text. If no value is provided an empty list is created by default.
77
        :param split_by: Unit for splitting the document. Can be "word", "sentence", or "passage". Set to None to disable splitting.
78
        :param split_length: Max. number of the above split unit (e.g. words) that are allowed in one document. For instance, if n -> 10 & split_by ->
79
                           "sentence", then each output document will have 10 sentences.
80
        :param split_overlap: Word overlap between two adjacent documents after a split.
81
                              Setting this to a positive number essentially enables the sliding window approach.
82
                              For example, if split_by -> `word`,
83
                              split_length -> 5 & split_overlap -> 2, then the splits would be like:
84
                              [w1 w2 w3 w4 w5, w4 w5 w6 w7 w8, w7 w8 w10 w11 w12].
85
                              Set the value to 0 to ensure there is no overlap among the documents after splitting.
86
        :param split_respect_sentence_boundary: Whether to split in partial sentences if split_by -> `word`. If set
87
                                                to True, the individual split will always have complete sentences &
88
                                                the number of words will be <= split_length.
89
        :param language: The language used by "nltk.tokenize.sent_tokenize" in iso639 format.
90
            Available options: "ru","sl","es","sv","tr","cs","da","nl","en","et","fi","fr","de","el","it","no","pl","pt","ml"
91
        :param tokenizer_model_folder: Path to the folder containing the NTLK PunktSentenceTokenizer models, if loading a model from a local path. Leave empty otherwise.
92
        :param id_hash_keys: Generate the document id from a custom list of strings that refer to the document's
93
            attributes. If you want to ensure you don't have duplicate documents in your DocumentStore but texts are
94
            not unique, you can modify the metadata and pass e.g. `"meta"` to this field (e.g. [`"content"`, `"meta"`]).
95
            In this case the id will be generated by using the content and the defined metadata.
96
        :param progress_bar: Whether to show a progress bar.
97
        :param add_page_number: Add the number of the page a paragraph occurs in to the Document's meta
98
                                field `"page"`. Page boundaries are determined by `"\f"` character which is added
99
                                in between pages by `PDFToTextConverter`, `TikaConverter`, `ParsrConverter` and
100
                                `AzureConverter`.
101
        :param max_chars_check: the maximum length a document is expected to have. Each document that is longer than
102
            max_chars_check in characters after pre-processing will raise a warning and is going to be split at the
103
            `max_char_check`-th char, regardless of any other constraint. If the resulting documents are still too long,
104
            they'll be cut again until all fragments are below the maximum allowed length.
105
        """
106
        nltk_import.check()
1✔
107
        if remove_substrings is None:
1✔
108
            remove_substrings = []
1✔
109
        super().__init__()
1✔
110

111
        try:
1✔
112
            nltk.data.find("tokenizers/punkt")
1✔
113
        except LookupError:
1✔
114
            try:
1✔
115
                nltk.download("punkt")
1✔
116
            except FileExistsError as error:
1✔
117
                logger.debug("NLTK punkt tokenizer seems to be already downloaded. Error message: %s", error)
1✔
118
                pass
1✔
119
        self.clean_whitespace = clean_whitespace
1✔
120
        self.clean_header_footer = clean_header_footer
1✔
121
        self.clean_empty_lines = clean_empty_lines
1✔
122
        self.remove_substrings = remove_substrings
1✔
123
        self.split_by = split_by
1✔
124
        self.split_length = split_length
1✔
125
        self.split_overlap = split_overlap
1✔
126
        self.split_respect_sentence_boundary = split_respect_sentence_boundary
1✔
127
        self.language = language
1✔
128
        self.tokenizer_model_folder = tokenizer_model_folder
1✔
129
        self.print_log: Set[str] = set()
1✔
130
        self.id_hash_keys = id_hash_keys
1✔
131
        self.progress_bar = progress_bar
1✔
132
        self.add_page_number = add_page_number
1✔
133
        self.max_chars_check = max_chars_check
1✔
134

135
    def process(
11✔
136
        self,
137
        documents: Union[dict, Document, List[Union[dict, Document]]],
138
        clean_whitespace: Optional[bool] = None,
139
        clean_header_footer: Optional[bool] = None,
140
        clean_empty_lines: Optional[bool] = None,
141
        remove_substrings: Optional[List[str]] = None,
142
        split_by: Optional[Literal["word", "sentence", "passage"]] = None,
143
        split_length: Optional[int] = None,
144
        split_overlap: Optional[int] = None,
145
        split_respect_sentence_boundary: Optional[bool] = None,
146
        id_hash_keys: Optional[List[str]] = None,
147
    ) -> List[Document]:
148
        """
149
        Perform document cleaning and splitting. Can take a single document or a list of documents as input and returns a list of documents.
150
        """
151
        if remove_substrings is None:
1✔
152
            remove_substrings = []
1✔
153
        if not isinstance(documents, list):
1✔
154
            warnings.warn(
1✔
155
                "Using a single Document as argument to the 'documents' parameter is deprecated. Use a list "
156
                "of (a single) Document instead.",
157
                DeprecationWarning,
158
                2,
159
            )
160

161
        kwargs = {
1✔
162
            "clean_whitespace": clean_whitespace,
163
            "clean_header_footer": clean_header_footer,
164
            "clean_empty_lines": clean_empty_lines,
165
            "remove_substrings": remove_substrings,
166
            "split_by": split_by,
167
            "split_length": split_length,
168
            "split_overlap": split_overlap,
169
            "split_respect_sentence_boundary": split_respect_sentence_boundary,
170
        }
171

172
        if id_hash_keys is None:
1✔
173
            id_hash_keys = self.id_hash_keys
1✔
174

175
        if isinstance(documents, (Document, dict)):
1✔
176
            ret = self._process_single(document=documents, id_hash_keys=id_hash_keys, **kwargs)  # type: ignore
1✔
177
        elif isinstance(documents, list):
1✔
178
            ret = self._process_batch(documents=list(documents), id_hash_keys=id_hash_keys, **kwargs)
1✔
179
        else:
180
            raise Exception("documents provided to PreProcessor.prepreprocess() is not of type list nor Document")
×
181

182
        return ret
1✔
183

184
    def _long_documents(self, documents: List[Document], max_chars_check=10_000):
11✔
185
        """
186
        Function that tries to detect unusually long documents. When detected, such documents are going to be
187
        split at the `max_char_check`-th char, regardless of any other constraint. If the resulting documents
188
        are still too long, they'll be cut again until all fragments are below the maximum allowed length.
189

190
        NOTE: this function is a heuristic that is in place only because a proper fix that prevents such documents from forming
191
        would imply a complete revamp of this class, including better definitions of what the various units (word, sentence, passage) mean exactly.
192
        """
193
        for document in documents:
1✔
194
            if len(document.content) > max_chars_check:
1✔
195
                logger.warning(
1✔
196
                    "Document %s is %s characters long after preprocessing, where the maximum length should be %s. "
197
                    "Something might be wrong with the splitting, check the document affected to prevent issues at "
198
                    "query time. This document will be now hard-split at %s chars recursively.",
199
                    document.id,
200
                    len(document.content),
201
                    max_chars_check,
202
                    max_chars_check,
203
                )
204
                fields = document.to_dict()
1✔
205
                document.content = document.content[:max_chars_check]
1✔
206
                fields.pop("id")
1✔
207
                fields["content"] = fields["content"][max_chars_check:]
1✔
208
                # recursively check if tail_document is still too long
209
                tail_documents = self._long_documents(
1✔
210
                    documents=[Document.from_dict(fields)], max_chars_check=max_chars_check
211
                )
212
                documents += tail_documents
1✔
213
        return documents
1✔
214

215
    def _process_single(
11✔
216
        self,
217
        document: Union[dict, Document],
218
        clean_whitespace: Optional[bool] = None,
219
        clean_header_footer: Optional[bool] = None,
220
        clean_empty_lines: Optional[bool] = None,
221
        remove_substrings: Optional[List[str]] = None,
222
        split_by: Optional[Literal["word", "sentence", "passage"]] = None,
223
        split_length: Optional[int] = None,
224
        split_overlap: Optional[int] = None,
225
        split_respect_sentence_boundary: Optional[bool] = None,
226
        id_hash_keys: Optional[List[str]] = None,
227
    ) -> List[Document]:
228
        if remove_substrings is None:
1✔
229
            remove_substrings = []
×
230
        if clean_whitespace is None:
1✔
231
            clean_whitespace = self.clean_whitespace
1✔
232
        if clean_header_footer is None:
1✔
233
            clean_header_footer = self.clean_header_footer
1✔
234
        if clean_empty_lines is None:
1✔
235
            clean_empty_lines = self.clean_empty_lines
1✔
236
        if not remove_substrings:
1✔
237
            remove_substrings = self.remove_substrings
1✔
238
        if split_by is None:
1✔
239
            split_by = self.split_by
1✔
240
        if split_length is None:
1✔
241
            split_length = self.split_length
1✔
242
        if split_overlap is None:
1✔
243
            split_overlap = self.split_overlap
1✔
244
        if split_respect_sentence_boundary is None:
1✔
245
            split_respect_sentence_boundary = self.split_respect_sentence_boundary
1✔
246

247
        cleaned_document = self.clean(
1✔
248
            document=document,
249
            clean_whitespace=clean_whitespace,
250
            clean_header_footer=clean_header_footer,
251
            clean_empty_lines=clean_empty_lines,
252
            remove_substrings=remove_substrings,
253
            id_hash_keys=id_hash_keys,
254
        )
255
        split_documents = self.split(
1✔
256
            document=cleaned_document,
257
            split_by=split_by,
258
            split_length=split_length,
259
            split_overlap=split_overlap,
260
            split_respect_sentence_boundary=split_respect_sentence_boundary,
261
            id_hash_keys=id_hash_keys,
262
        )
263

264
        split_documents = self._long_documents(split_documents, max_chars_check=self.max_chars_check)
1✔
265

266
        return split_documents
1✔
267

268
    def _process_batch(
11✔
269
        self, documents: List[Union[dict, Document]], id_hash_keys: Optional[List[str]] = None, **kwargs
270
    ) -> List[Document]:
271
        nested_docs = [
1✔
272
            self._process_single(d, id_hash_keys=id_hash_keys, **kwargs)
273
            for d in tqdm(documents, disable=not self.progress_bar, desc="Preprocessing", unit="docs")
274
        ]
275
        return [d for x in nested_docs for d in x]
1✔
276

277
    def clean(
11✔
278
        self,
279
        document: Union[dict, Document],
280
        clean_whitespace: bool,
281
        clean_header_footer: bool,
282
        clean_empty_lines: bool,
283
        remove_substrings: Optional[List[str]] = None,
284
        id_hash_keys: Optional[List[str]] = None,
285
    ) -> Document:
286
        """
287
        Perform document cleaning on a single document and return a single document. This method will deal with whitespaces, headers, footers
288
        and empty lines. Its exact functionality is defined by the parameters passed into PreProcessor.__init__().
289
        """
290
        if remove_substrings is None:
1✔
291
            remove_substrings = []
×
292
        if id_hash_keys is None:
1✔
293
            id_hash_keys = self.id_hash_keys
1✔
294

295
        if isinstance(document, dict):
1✔
296
            document["id_hash_keys"] = id_hash_keys
×
297
            document = Document.from_dict(document)
×
298

299
        # Mainly needed for type checking
300
        if not isinstance(document, Document):
1✔
301
            raise HaystackError("Document must not be of type 'dict' but of type 'Document'.")
×
302

303
        if type(document.content) is not str:
1✔
304
            logger.error("Document content is not of type str. Nothing to clean.")
×
305
            return document
×
306

307
        text = document.content
1✔
308
        if clean_header_footer:
1✔
309
            text = self._find_and_remove_header_footer(
×
310
                text, n_chars=300, n_first_pages_to_ignore=1, n_last_pages_to_ignore=1
311
            )
312

313
        headlines = document.meta["headlines"] if "headlines" in document.meta else []
1✔
314

315
        if clean_whitespace:
1✔
316
            text, headlines = self._clean_whitespace(text=text, headlines=headlines)
1✔
317

318
        if clean_empty_lines:
1✔
319
            text, headlines = self._clean_empty_lines(text=text, headlines=headlines)
1✔
320

321
        for substring in remove_substrings:
1✔
322
            text, _ = self._remove_substring(text=text, substring=substring, headlines=headlines)
1✔
323

324
        if text != document.content:
1✔
325
            document = deepcopy(document)
1✔
326
            document.content = text
1✔
327
        if headlines:
1✔
328
            document.meta["headlines"] = headlines
1✔
329

330
        return document
1✔
331

332
    def split(
11✔
333
        self,
334
        document: Union[dict, Document],
335
        split_by: Optional[Literal["word", "sentence", "passage"]],
336
        split_length: int,
337
        split_overlap: int,
338
        split_respect_sentence_boundary: bool,
339
        id_hash_keys: Optional[List[str]] = None,
340
    ) -> List[Document]:
341
        """Perform document splitting on a single document. This method can split on different units, at different lengths,
342
        with different strides. It can also respect sentence boundaries. Its exact functionality is defined by
343
        the parameters passed into PreProcessor.__init__(). Takes a single document as input and returns a list of documents.
344
        """
345
        if id_hash_keys is None:
1✔
346
            id_hash_keys = self.id_hash_keys
1✔
347

348
        if isinstance(document, dict):
1✔
349
            document["id_hash_keys"] = id_hash_keys
×
350
            document = Document.from_dict(document)
×
351

352
        # Mainly needed for type checking
353
        if not isinstance(document, Document):
1✔
354
            raise HaystackError("Document must not be of type 'dict' but of type 'Document'.")
×
355

356
        if not split_by:
1✔
357
            return [document]
1✔
358

359
        if not split_length:
1✔
360
            raise Exception("split_length needs be set when using split_by.")
×
361

362
        if split_respect_sentence_boundary and split_by != "word":
1✔
363
            raise NotImplementedError("'split_respect_sentence_boundary=True' is only compatible with split_by='word'.")
×
364

365
        if type(document.content) is not str:
1✔
366
            logger.error("Document content is not of type str. Nothing to split.")
×
367
            return [document]
×
368

369
        text = document.content
1✔
370
        headlines = document.meta["headlines"] if "headlines" in document.meta else []
1✔
371

372
        if split_respect_sentence_boundary and split_by == "word":
1✔
373
            text_splits, splits_pages, splits_start_idxs = self._split_by_word_respecting_sent_boundary(
1✔
374
                text=text, split_length=split_length, split_overlap=split_overlap
375
            )
376
        else:
377
            # create individual "elements" of passage, sentence, or word
378
            elements, split_at = self._split_into_units(text=text, split_by=split_by)
1✔
379

380
            # concatenate individual elements based on split_length & split_stride
381
            text_splits, splits_pages, splits_start_idxs = self._concatenate_units(
1✔
382
                elements=elements, split_length=split_length, split_overlap=split_overlap, split_at=split_at
383
            )
384

385
        # create new document dicts for each text split
386
        documents = self._create_docs_from_splits(
1✔
387
            text_splits=text_splits,
388
            splits_pages=splits_pages,
389
            splits_start_idxs=splits_start_idxs,
390
            headlines=headlines,
391
            meta=document.meta or {},
392
            split_overlap=split_overlap,
393
            id_hash_keys=id_hash_keys,
394
        )
395

396
        return documents
1✔
397

398
    @staticmethod
11✔
399
    def _clean_whitespace(text: str, headlines: List[Dict]) -> Tuple[str, List[Dict]]:
11✔
400
        """
401
        Strips whitespaces before or after each line in the text.
402
        """
403
        pages = text.split("\f")
1✔
404
        cleaned_pages = []
1✔
405
        cur_headline_idx = 0
1✔
406
        num_headlines = len(headlines)
1✔
407
        cur_char_idx = 0
1✔
408
        num_removed_chars_total = 0
1✔
409
        for page in pages:
1✔
410
            lines = page.splitlines()
1✔
411
            cleaned_lines = []
1✔
412
            for line in lines:
1✔
413
                old_line_len = len(line)
1✔
414
                cleaned_line = line.strip()
1✔
415
                cleaned_line_len = len(cleaned_line)
1✔
416
                cur_char_idx += old_line_len + 1  # add 1 for newline char
1✔
417
                if old_line_len != cleaned_line_len:
1✔
418
                    num_removed_chars_current = old_line_len - cleaned_line_len
1✔
419
                    num_removed_chars_total += num_removed_chars_current
1✔
420
                    for headline_idx in range(cur_headline_idx, num_headlines):
1✔
421
                        if cur_char_idx - num_removed_chars_total <= headlines[headline_idx]["start_idx"]:
1✔
422
                            headlines[headline_idx]["start_idx"] -= num_removed_chars_current
×
423
                        else:
424
                            cur_headline_idx += 1
1✔
425

426
                cleaned_lines.append(cleaned_line)
1✔
427
            cleaned_page = "\n".join(cleaned_lines)
1✔
428
            cleaned_pages.append(cleaned_page)
1✔
429

430
        cleaned_text = "\f".join(cleaned_pages)
1✔
431
        return cleaned_text, headlines
1✔
432

433
    @staticmethod
11✔
434
    def _clean_empty_lines(text: str, headlines: List[Dict]) -> Tuple[str, List[Dict]]:
11✔
435
        if headlines:
1✔
436
            num_headlines = len(headlines)
1✔
437
            multiple_new_line_matches = re.finditer(r"\n\n\n+", text)
1✔
438
            cur_headline_idx = 0
1✔
439
            num_removed_chars_accumulated = 0
1✔
440
            for match in multiple_new_line_matches:
1✔
441
                num_removed_chars_current = match.end() - match.start() - 2
×
442
                for headline_idx in range(cur_headline_idx, num_headlines):
×
443
                    if match.end() - num_removed_chars_accumulated <= headlines[headline_idx]["start_idx"]:
×
444
                        headlines[headline_idx]["start_idx"] -= num_removed_chars_current
×
445
                    else:
446
                        cur_headline_idx += 1
×
447
                num_removed_chars_accumulated += num_removed_chars_current
×
448

449
        cleaned_text = re.sub(r"\n\n\n+", "\n\n", text)
1✔
450
        return cleaned_text, headlines
1✔
451

452
    @staticmethod
11✔
453
    def _remove_substring(text: str, substring: str, headlines: List[Dict]) -> Tuple[str, List[Dict]]:
11✔
454
        if headlines:
1✔
455
            num_headlines = len(headlines)
×
456
            multiple_substring_matches = re.finditer(substring, text)
×
457
            cur_headline_idx = 0
×
458
            num_removed_chars_accumulated = 0
×
459
            for match in multiple_substring_matches:
×
460
                for headline_idx in range(cur_headline_idx, num_headlines):
×
461
                    if match.end() - num_removed_chars_accumulated <= headlines[headline_idx]["start_idx"]:
×
462
                        headlines[headline_idx]["start_idx"] -= len(substring)
×
463
                    else:
464
                        cur_headline_idx += 1
×
465
                num_removed_chars_accumulated += len(substring)
×
466

467
        cleaned_text = text.replace(substring, "")
1✔
468
        return cleaned_text, headlines
1✔
469

470
    def _split_by_word_respecting_sent_boundary(
11✔
471
        self, text: str, split_length: int, split_overlap: int
472
    ) -> Tuple[List[str], List[int], List[int]]:
473
        """
474
        Splits the text into parts of split_length words while respecting sentence boundaries.
475
        """
476
        sentences = self._split_sentences(text)
1✔
477

478
        word_count_slice = 0
1✔
479
        cur_page = 1
1✔
480
        cur_start_idx = 0
1✔
481
        splits_pages = []
1✔
482
        list_splits = []
1✔
483
        splits_start_idxs = []
1✔
484
        current_slice: List[str] = []
1✔
485
        for sen in sentences:
1✔
486
            word_count_sen = len(sen.split())
1✔
487

488
            if word_count_sen > split_length:
1✔
489
                long_sentence_message = (
1✔
490
                    "We found one or more sentences whose word count is higher than the split length."
491
                )
492
                if long_sentence_message not in self.print_log:
1✔
493
                    self.print_log.add(long_sentence_message)
1✔
494
                    logger.warning(long_sentence_message)
1✔
495

496
            if word_count_slice + word_count_sen > split_length:
1✔
497
                # Number of words exceeds split_length -> save current slice and start a new one
498
                if current_slice:
1✔
499
                    list_splits.append(current_slice)
1✔
500
                    splits_pages.append(cur_page)
1✔
501
                    splits_start_idxs.append(cur_start_idx)
1✔
502

503
                if split_overlap:
1✔
504
                    processed_sents, current_slice, word_count_slice = self._get_overlap_from_slice(
1✔
505
                        current_slice, split_length, split_overlap
506
                    )
507
                else:
508
                    processed_sents = current_slice
1✔
509
                    current_slice = []
1✔
510
                    word_count_slice = 0
1✔
511

512
                cur_start_idx += len("".join(processed_sents))
1✔
513

514
                # Count number of page breaks in processed sentences
515
                if self.add_page_number:
1✔
516
                    num_page_breaks = self._count_processed_page_breaks(
1✔
517
                        sentences=processed_sents,
518
                        split_overlap=split_overlap,
519
                        overlapping_sents=current_slice,
520
                        current_sent=sen,
521
                    )
522
                    cur_page += num_page_breaks
1✔
523

524
            current_slice.append(sen)
1✔
525
            word_count_slice += word_count_sen
1✔
526

527
        if current_slice:
1✔
528
            list_splits.append(current_slice)
1✔
529
            splits_pages.append(cur_page)
1✔
530
            splits_start_idxs.append(cur_start_idx)
1✔
531

532
        text_splits = []
1✔
533
        for sl in list_splits:
1✔
534
            txt = "".join(sl)
1✔
535
            if len(txt) > 0:
1✔
536
                text_splits.append(txt)
1✔
537

538
        return text_splits, splits_pages, splits_start_idxs
1✔
539

540
    @staticmethod
11✔
541
    def _get_overlap_from_slice(
11✔
542
        current_slice: List[str], split_length: int, split_overlap: int
543
    ) -> Tuple[List[str], List[str], int]:
544
        """
545
        Returns a tuple with the following elements:
546
        - processed_sents: List of sentences that are not overlapping the with next slice (= completely processed sentences)
547
        - next_slice: List of sentences that are overlapping with the next slice
548
        - word_count_slice: Number of words in the next slice
549
        """
550

551
        overlap = []
1✔
552
        word_count_overlap = 0
1✔
553
        current_slice_copy = deepcopy(current_slice)
1✔
554
        # Next overlapping Document should not start exactly the same as the previous one, so we skip the first sentence
555
        for idx, s in reversed(list(enumerate(current_slice))[1:]):
1✔
556
            sen_len = len(s.split())
1✔
557
            if word_count_overlap < split_overlap and sen_len < split_length:
1✔
558
                overlap.append(s)
1✔
559
                word_count_overlap += sen_len
1✔
560
                current_slice_copy.pop(idx)
1✔
561
            else:
562
                break
1✔
563
        processed_sents = current_slice_copy
1✔
564
        next_slice = list(reversed(overlap))
1✔
565
        word_count_slice = word_count_overlap
1✔
566

567
        return processed_sents, next_slice, word_count_slice
1✔
568

569
    def _split_into_units(self, text: str, split_by: str) -> Tuple[List[str], str]:
11✔
570
        if split_by == "passage":
1✔
571
            elements = text.split("\n\n")
1✔
572
            split_at = "\n\n"
1✔
573
        elif split_by == "sentence":
1✔
574
            elements = self._split_sentences(text)
1✔
575
            split_at = ""  # whitespace will be preserved while splitting text into sentences
1✔
576
        elif split_by == "word":
1✔
577
            elements = text.split(" ")
1✔
578
            split_at = " "
1✔
579
        else:
580
            raise NotImplementedError("PreProcessor only supports 'passage', 'sentence' or 'word' split_by options.")
×
581

582
        return elements, split_at
1✔
583

584
    def _concatenate_units(
11✔
585
        self, elements: List[str], split_length: int, split_overlap: int, split_at: str
586
    ) -> Tuple[List[str], List[int], List[int]]:
587
        """
588
        Concatenates the elements into parts of split_length units.
589
        """
590
        segments = windowed(elements, n=split_length, step=split_length - split_overlap)
1✔
591
        split_at_len = len(split_at)
1✔
592
        text_splits = []
1✔
593
        splits_pages = []
1✔
594
        splits_start_idxs = []
1✔
595
        cur_page = 1
1✔
596
        cur_start_idx = 0
1✔
597
        for seg in segments:
1✔
598
            current_units = [unit for unit in seg if unit is not None]
1✔
599
            txt = split_at.join(current_units)
1✔
600
            if len(txt) > 0:
1✔
601
                text_splits.append(txt)
1✔
602
                splits_pages.append(cur_page)
1✔
603
                splits_start_idxs.append(cur_start_idx)
1✔
604
                processed_units = current_units[: split_length - split_overlap]
1✔
605
                cur_start_idx += len((split_at_len * " ").join(processed_units)) + split_at_len
1✔
606
                if self.add_page_number:
1✔
607
                    num_page_breaks = sum(processed_unit.count("\f") for processed_unit in processed_units)
1✔
608
                    cur_page += num_page_breaks
1✔
609

610
        return text_splits, splits_pages, splits_start_idxs
1✔
611

612
    def _create_docs_from_splits(
11✔
613
        self,
614
        text_splits: List[str],
615
        splits_pages: List[int],
616
        splits_start_idxs: List[int],
617
        headlines: List[Dict],
618
        meta: Dict,
619
        split_overlap: int,
620
        id_hash_keys=Optional[List[str]],
621
    ) -> List[Document]:
622
        """
623
        Creates Document objects from text splits enriching them with page number and headline information if given.
624
        """
625
        documents: List[Document] = []
1✔
626

627
        earliest_rel_hl = 0
1✔
628
        for i, txt in enumerate(text_splits):
1✔
629
            meta = deepcopy(meta)
1✔
630
            doc = Document(content=txt, meta=meta, id_hash_keys=id_hash_keys)
1✔
631
            doc.meta["_split_id"] = i
1✔
632
            if self.add_page_number:
1✔
633
                doc.meta["page"] = splits_pages[i]
1✔
634
            if headlines:
1✔
635
                split_start_idx = splits_start_idxs[i]
1✔
636
                relevant_headlines, earliest_rel_hl = self._extract_relevant_headlines_for_split(
1✔
637
                    headlines=headlines, split_txt=txt, split_start_idx=split_start_idx, earliest_rel_hl=earliest_rel_hl
638
                )
639
                doc.meta["headlines"] = relevant_headlines
1✔
640
            if split_overlap > 0:
1✔
641
                doc.meta["_split_overlap"] = []
1✔
642
                if i != 0:
1✔
643
                    doc_start_idx = splits_start_idxs[i]
1✔
644
                    previous_doc = documents[i - 1]
1✔
645
                    previous_doc_start_idx = splits_start_idxs[i - 1]
1✔
646
                    self._add_split_overlap_information(doc, doc_start_idx, previous_doc, previous_doc_start_idx)
1✔
647

648
            documents.append(doc)
1✔
649

650
        return documents
1✔
651

652
    @staticmethod
11✔
653
    def _add_split_overlap_information(
11✔
654
        current_doc: Document, current_doc_start_idx: int, previous_doc: Document, previos_doc_start_idx: int
655
    ):
656
        """
657
        Adds split overlap information to the current and previous Document's meta.
658
        """
659
        overlapping_range = (current_doc_start_idx - previos_doc_start_idx, len(previous_doc.content) - 1)
1✔
660
        if overlapping_range[0] < overlapping_range[1]:
1✔
661
            overlapping_str = previous_doc.content[overlapping_range[0] : overlapping_range[1]]
1✔
662
            if current_doc.content.startswith(overlapping_str):
1✔
663
                # Add split overlap information to previous Document regarding this Document
664
                previous_doc.meta["_split_overlap"].append({"doc_id": current_doc.id, "range": overlapping_range})
1✔
665
                # Add split overlap information to this Document regarding the previous Document
666
                overlapping_range = (0, overlapping_range[1] - overlapping_range[0])
1✔
667
                current_doc.meta["_split_overlap"].append({"doc_id": previous_doc.id, "range": overlapping_range})
1✔
668

669
    @staticmethod
11✔
670
    def _extract_relevant_headlines_for_split(
11✔
671
        headlines: List[Dict], split_txt: str, split_start_idx: int, earliest_rel_hl: int
672
    ) -> Tuple[List[Dict], int]:
673
        """
674
        If you give it a list of headlines, a text split, and the start index of the split in the original text, this method
675
        extracts the headlines that are relevant for the split.
676
        """
677
        relevant_headlines = []
1✔
678

679
        for headline_idx in range(earliest_rel_hl, len(headlines)):
1✔
680
            # Headline is part of current split
681
            if split_start_idx <= headlines[headline_idx]["start_idx"] < split_start_idx + len(split_txt):
1✔
682
                headline_copy = deepcopy(headlines[headline_idx])
1✔
683
                headline_copy["start_idx"] = headlines[headline_idx]["start_idx"] - split_start_idx
1✔
684
                relevant_headlines.append(headline_copy)
1✔
685
            # Headline appears before current split, but might be relevant for current split
686
            elif headlines[headline_idx]["start_idx"] < split_start_idx:
1✔
687
                # Check if following headlines are on a higher level
688
                headline_to_check = headline_idx + 1
1✔
689
                headline_is_relevant = True
1✔
690
                while (
1✔
691
                    headline_to_check < len(headlines) and headlines[headline_to_check]["start_idx"] <= split_start_idx
692
                ):
693
                    if headlines[headline_to_check]["level"] <= headlines[headline_idx]["level"]:
1✔
694
                        headline_is_relevant = False
1✔
695
                        break
1✔
696
                    headline_to_check += 1
1✔
697
                if headline_is_relevant:
1✔
698
                    headline_copy = deepcopy(headlines[headline_idx])
1✔
699
                    headline_copy["start_idx"] = None
1✔
700
                    relevant_headlines.append(headline_copy)
1✔
701
                else:
702
                    earliest_rel_hl += 1
1✔
703
            # Headline (and all subsequent ones) only relevant for later splits
704
            elif headlines[headline_idx]["start_idx"] > split_start_idx + len(split_txt):
1✔
705
                break
1✔
706

707
        return relevant_headlines, earliest_rel_hl
1✔
708

709
    def _find_and_remove_header_footer(
11✔
710
        self, text: str, n_chars: int, n_first_pages_to_ignore: int, n_last_pages_to_ignore: int
711
    ) -> str:
712
        """
713
        Heuristic to find footers and headers across different pages by searching for the longest common string.
714
        For headers we only search in the first n_chars characters (for footer: last n_chars).
715
        Note: This heuristic uses exact matches and therefore works well for footers like "Copyright 2019 by XXX",
716
         but won't detect "Page 3 of 4" or similar.
717

718
        :param n_chars: number of first/last characters where the header/footer shall be searched in
719
        :param n_first_pages_to_ignore: number of first pages to ignore (e.g. TOCs often don't contain footer/header)
720
        :param n_last_pages_to_ignore: number of last pages to ignore
721
        :return: (cleaned pages, found_header_str, found_footer_str)
722
        """
723

724
        pages = text.split("\f")
×
725

726
        # header
727
        start_of_pages = [p[:n_chars] for p in pages[n_first_pages_to_ignore:-n_last_pages_to_ignore]]
×
728
        found_header = self._find_longest_common_ngram(start_of_pages)
×
729
        if found_header:
×
730
            pages = [page.replace(found_header, "") for page in pages]
×
731

732
        # footer
733
        end_of_pages = [p[-n_chars:] for p in pages[n_first_pages_to_ignore:-n_last_pages_to_ignore]]
×
734
        found_footer = self._find_longest_common_ngram(end_of_pages)
×
735
        if found_footer:
×
736
            pages = [page.replace(found_footer, "") for page in pages]
×
737
        logger.debug("Removed header '%s' and footer '%s' in document", found_header, found_footer)
×
738
        text = "\f".join(pages)
×
739
        return text
×
740

741
    def _ngram(self, seq: str, n: int) -> Generator[str, None, None]:
11✔
742
        """
743
        Return ngram (of tokens - currently split by whitespace)
744
        :param seq: str, string from which the ngram shall be created
745
        :param n: int, n of ngram
746
        :return: str, ngram as string
747
        """
748

749
        # In order to maintain the original whitespace, but still consider \n and \t for n-gram tokenization,
750
        # we add a space here and remove it after creation of the ngrams again (see below)
751
        seq = seq.replace("\n", " \n")
×
752
        seq = seq.replace("\t", " \t")
×
753

754
        words = seq.split(" ")
×
755
        ngrams = (
×
756
            " ".join(words[i : i + n]).replace(" \n", "\n").replace(" \t", "\t") for i in range(0, len(words) - n + 1)
757
        )
758

759
        return ngrams
×
760

761
    def _allngram(self, seq: str, min_ngram: int, max_ngram: int) -> Set[str]:
11✔
762
        lengths = range(min_ngram, max_ngram) if max_ngram else range(min_ngram, len(seq))
×
763
        ngrams = map(partial(self._ngram, seq), lengths)
×
764
        res = set(chain.from_iterable(ngrams))
×
765
        return res
×
766

767
    def _find_longest_common_ngram(
11✔
768
        self, sequences: List[str], max_ngram: int = 30, min_ngram: int = 3
769
    ) -> Optional[str]:
770
        """
771
        Find the longest common ngram across different text sequences (e.g. start of pages).
772
        Considering all ngrams between the specified range. Helpful for finding footers, headers etc.
773

774
        :param sequences: list[str], list of strings that shall be searched for common n_grams
775
        :param max_ngram: int, maximum length of ngram to consider
776
        :param min_ngram: minimum length of ngram to consider
777
        :return: str, common string of all sections
778
        """
779
        sequences = [s for s in sequences if s]  # filter empty sequences
×
780
        if not sequences:
×
781
            return None
×
782
        seqs_ngrams = map(partial(self._allngram, min_ngram=min_ngram, max_ngram=max_ngram), sequences)
×
783
        intersection = reduce(set.intersection, seqs_ngrams)
×
784

785
        try:
×
786
            longest = max(intersection, key=len)
×
787
        except ValueError:
×
788
            # no common sequence found
789
            longest = ""
×
790
        return longest if longest.strip() else None
×
791

792
    def _split_sentences(self, text: str) -> List[str]:
11✔
793
        """
794
        Tokenize text into sentences.
795
        :param text: str, text to tokenize
796
        :return: list[str], list of sentences
797
        """
798
        language_name = iso639_to_nltk.get(self.language)
1✔
799

800
        sentence_tokenizer = self._load_sentence_tokenizer(language_name)
1✔
801
        # The following adjustment of PunktSentenceTokenizer is inspired by:
802
        # https://stackoverflow.com/questions/33139531/preserve-empty-lines-with-nltks-punkt-tokenizer
803
        # It is needed for preserving whitespace while splitting text into sentences.
804
        period_context_fmt = r"""
1✔
805
            %(SentEndChars)s             # a potential sentence ending
806
            \s*                          # match potential whitespace (is originally in lookahead assertion)
807
            (?=(?P<after_tok>
808
                %(NonWord)s              # either other punctuation
809
                |
810
                (?P<next_tok>\S+)        # or some other token - original version: \s+(?P<next_tok>\S+)
811
            ))"""
812
        re_period_context = re.compile(
1✔
813
            period_context_fmt
814
            % {
815
                "NonWord": sentence_tokenizer._lang_vars._re_non_word_chars,
816
                # SentEndChars might be followed by closing brackets, so we match them here.
817
                "SentEndChars": sentence_tokenizer._lang_vars._re_sent_end_chars + r"[\)\]}]*",
818
            },
819
            re.UNICODE | re.VERBOSE,
820
        )
821
        sentence_tokenizer._lang_vars._re_period_context = re_period_context
1✔
822

823
        sentences = sentence_tokenizer.tokenize(text)
1✔
824
        return sentences
1✔
825

826
    def _load_sentence_tokenizer(self, language_name: Optional[str]) -> "nltk.tokenize.punkt.PunktSentenceTokenizer":
11✔
827
        # Try to load a custom model from 'tokenizer_model_path'
828
        if self.tokenizer_model_folder is not None:
1✔
829
            tokenizer_model_path = Path(self.tokenizer_model_folder).absolute() / f"{self.language}.pickle"
1✔
830
            try:
1✔
831
                sentence_tokenizer = nltk.data.load(f"file:{str(tokenizer_model_path)}", format="pickle")
1✔
832
            except (LookupError, UnpicklingError, ValueError) as e:
1✔
833
                if isinstance(e, LookupError):
1✔
834
                    logger.exception("PreProcessor couldn't load sentence tokenizer from %s", tokenizer_model_path)
×
835
                else:
836
                    logger.exception(
1✔
837
                        "PreProcessor couldn't determine model format of sentence tokenizer at %s", tokenizer_model_path
838
                    )
839

840
                # NLTK failed to load custom SentenceTokenizer, fallback to the default model or to English
841
                if language_name is not None:
1✔
842
                    logger.error(
1✔
843
                        "PreProcessor couldn't find custom sentence tokenizer model for %s. Using default %s model.",
844
                        self.language,
845
                        self.language,
846
                    )
847
                    sentence_tokenizer = nltk.data.load(f"tokenizers/punkt/{language_name}.pickle")
1✔
848
                else:
849
                    logger.error(
×
850
                        "PreProcessor couldn't find default or custom sentence tokenizer model for %s. "
851
                        "Using English instead.",
852
                        self.language,
853
                    )
854
                    sentence_tokenizer = nltk.data.load("tokenizers/punkt/english.pickle")
×
855

856
        # Use a default NLTK model
857
        elif language_name is not None:
1✔
858
            sentence_tokenizer = nltk.data.load(f"tokenizers/punkt/{language_name}.pickle")
1✔
859
        else:
860
            logger.error(
1✔
861
                "PreProcessor couldn't find the default sentence tokenizer model for %s. "
862
                " Using English instead. You may train your own model and use the 'tokenizer_model_folder' parameter.",
863
                self.language,
864
            )
865
            sentence_tokenizer = nltk.data.load("tokenizers/punkt/english.pickle")
1✔
866

867
        return sentence_tokenizer
1✔
868

869
    @staticmethod
11✔
870
    def _count_processed_page_breaks(
11✔
871
        sentences: List[str], split_overlap: int, overlapping_sents: List[str], current_sent: str
872
    ) -> int:
873
        """
874
        Counts the number of processed page breaks in a list of processed sentences.
875
        """
876
        num_page_breaks = sum(sent.count("\f") for sent in sentences)
1✔
877
        if sentences and sentences[0].startswith("\f"):
1✔
878
            # Remove already used page break
879
            num_page_breaks -= 1
×
880
        # Increment page counter if new split starts with a page break
881
        if split_overlap and overlapping_sents:
1✔
882
            if overlapping_sents[0].startswith("\f"):
×
883
                num_page_breaks += 1
×
884
        else:
885
            if current_sent.startswith("\f"):
1✔
886
                num_page_breaks += 1
×
887

888
        return num_page_breaks
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc