• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 10305030806

08 Aug 2024 03:29PM UTC coverage: 90.11% (-0.03%) from 90.143%
10305030806

Pull #8042

github

web-flow
Merge a1cc6f942 into ec02817f1
Pull Request #8042: feat: Implement apply_filter_policy and FilterPolicy.MERGE for the new filters

6952 of 7715 relevant lines covered (90.11%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.08
haystack/components/preprocessors/document_cleaner.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import re
1✔
6
from copy import deepcopy
1✔
7
from functools import partial, reduce
1✔
8
from itertools import chain
1✔
9
from typing import Generator, List, Literal, Optional, Set
1✔
10
from unicodedata import normalize
1✔
11

12
from haystack import Document, component, logging
1✔
13

14
logger = logging.getLogger(__name__)
1✔
15

16

17
@component
1✔
18
class DocumentCleaner:
1✔
19
    """
20
    Cleans the text in the documents.
21

22
    It removes extra whitespaces,
23
    empty lines, specified substrings, regexes,
24
    page headers and footers (in this order).
25

26
    ### Usage example:
27

28
    ```python
29
    from haystack import Document
30
    from haystack.components.preprocessors import DocumentCleaner
31

32
    doc = Document(content="This   is  a  document  to  clean\\n\\n\\nsubstring to remove")
33

34
    cleaner = DocumentCleaner(remove_substrings = ["substring to remove"])
35
    result = cleaner.run(documents=[doc])
36

37
    assert result["documents"][0].content == "This is a document to clean "
38
    ```
39
    """
40

41
    def __init__(
1✔
42
        self,
43
        remove_empty_lines: bool = True,
44
        remove_extra_whitespaces: bool = True,
45
        remove_repeated_substrings: bool = False,
46
        keep_id: bool = False,
47
        remove_substrings: Optional[List[str]] = None,
48
        remove_regex: Optional[str] = None,
49
        unicode_normalization: Optional[Literal["NFC", "NFKC", "NFD", "NFKD"]] = None,
50
        ascii_only: bool = False,
51
    ):
52
        """
53
        Initialize DocumentCleaner.
54

55
        :param remove_empty_lines: If `True`, removes empty lines.
56
        :param remove_extra_whitespaces: If `True`, removes extra whitespaces.
57
        :param remove_repeated_substrings: If `True`, removes repeated substrings (headers and footers) from pages.
58
            Pages must be separated by a form feed character "\\f",
59
            which is supported by `TextFileToDocument` and `AzureOCRDocumentConverter`.
60
        :param remove_substrings: List of substrings to remove from the text.
61
        :param remove_regex: Regex to match and replace substrings by "".
62
        :param keep_id: If `True`, keeps the IDs of the original documents.
63
        :param unicode_normalization: Unicode normalization form to apply to the text.
64
            Note: This will run before any other steps.
65
        :param ascii_only: Whether to convert the text to ASCII only.
66
            Will remove accents from characters and replace them with ASCII characters.
67
            Other non-ASCII characters will be removed.
68
            Note: This will run before any pattern matching or removal.
69
        """
70

71
        self._validate_params(unicode_normalization=unicode_normalization)
1✔
72

73
        self.remove_empty_lines = remove_empty_lines
1✔
74
        self.remove_extra_whitespaces = remove_extra_whitespaces
1✔
75
        self.remove_repeated_substrings = remove_repeated_substrings
1✔
76
        self.remove_substrings = remove_substrings
1✔
77
        self.remove_regex = remove_regex
1✔
78
        self.keep_id = keep_id
1✔
79
        self.unicode_normalization = unicode_normalization
1✔
80
        self.ascii_only = ascii_only
1✔
81

82
    def _validate_params(self, unicode_normalization: Optional[str]):
1✔
83
        """
84
        Validate the parameters of the DocumentCleaner.
85

86
        :param unicode_normalization: Unicode normalization form to apply to the text.
87
        :raises ValueError: if the parameters are not valid.
88
        """
89
        if unicode_normalization and unicode_normalization not in ["NFC", "NFKC", "NFD", "NFKD"]:
1✔
90
            raise ValueError("unicode_normalization must be one of 'NFC', 'NFKC', 'NFD', 'NFKD'.")
×
91

92
    @component.output_types(documents=List[Document])
1✔
93
    def run(self, documents: List[Document]):
1✔
94
        """
95
        Cleans up the documents.
96

97
        :param documents: List of Documents to clean.
98

99
        :returns: A dictionary with the following key:
100
            - `documents`: List of cleaned Documents.
101

102
        :raises TypeError: if documents is not a list of Documents.
103
        """
104
        if not isinstance(documents, list) or documents and not isinstance(documents[0], Document):
1✔
105
            raise TypeError("DocumentCleaner expects a List of Documents as input.")
1✔
106

107
        cleaned_docs = []
1✔
108
        for doc in documents:
1✔
109
            if doc.content is None:
1✔
110
                logger.warning(
1✔
111
                    "DocumentCleaner only cleans text documents but document.content for document ID"
112
                    " %{document_id} is None.",
113
                    document_id=doc.id,
114
                )
115
                cleaned_docs.append(doc)
1✔
116
                continue
1✔
117
            text = doc.content
1✔
118

119
            if self.unicode_normalization:
1✔
120
                text = self._normalize_unicode(text, self.unicode_normalization)
1✔
121
            if self.ascii_only:
1✔
122
                text = self._ascii_only(text)
1✔
123
            if self.remove_extra_whitespaces:
1✔
124
                text = self._remove_extra_whitespaces(text)
1✔
125
            if self.remove_empty_lines:
1✔
126
                text = self._remove_empty_lines(text)
1✔
127
            if self.remove_substrings:
1✔
128
                text = self._remove_substrings(text, self.remove_substrings)
1✔
129
            if self.remove_regex:
1✔
130
                text = self._remove_regex(text, self.remove_regex)
1✔
131
            if self.remove_repeated_substrings:
1✔
132
                text = self._remove_repeated_substrings(text)
1✔
133

134
            cleaned_docs.append(Document(content=text, meta=deepcopy(doc.meta), id=doc.id if self.keep_id else ""))
1✔
135

136
        return {"documents": cleaned_docs}
1✔
137

138
    def _normalize_unicode(self, text: str, form: Literal["NFC", "NFKC", "NFD", "NFKD"]) -> str:
1✔
139
        """
140
        Normalize the unicode of the text.
141

142
        :param text: Text to normalize.
143
        :param form: Unicode normalization form to apply to the text.
144
            Options: "NFC", "NFKC", "NFD", "NFKD".
145
        :returns: The normalized text.
146
        """
147
        return normalize(form, text)
1✔
148

149
    def _ascii_only(self, text: str) -> str:
1✔
150
        """
151
        Convert the text to ASCII only.
152

153
        Will remove accents from characters and replace them with ASCII characters.
154
        Other non-ASCII characters will be removed.
155

156
        :param text: Text to convert to ASCII only.
157
        :returns: The text in ASCII only.
158
        """
159

160
        # First normalize the text to NFKD to separate the characters and their diacritics
161
        # Then encode it to ASCII and ignore any characters that can't be encoded
162
        return self._normalize_unicode(text, "NFKD").encode("ascii", "ignore").decode("utf-8")
1✔
163

164
    def _remove_empty_lines(self, text: str) -> str:
1✔
165
        """
166
        Remove empty lines and lines that contain nothing but whitespaces from text.
167

168
        :param text: Text to clean.
169
        :returns: The text without empty lines.
170
        """
171
        pages = text.split("\f")
1✔
172
        cleaned_pages = ["\n".join(line for line in page.split("\n") if line.strip()) for page in pages]
1✔
173
        return "\f".join(cleaned_pages)
1✔
174

175
    def _remove_extra_whitespaces(self, text: str) -> str:
1✔
176
        """
177
        Remove extra whitespaces from text.
178

179
        :param text: Text to clean.
180
        :returns: The text without extra whitespaces.
181
        """
182
        texts = text.split("\f")
1✔
183
        cleaned_text = [re.sub(r"\s\s+", " ", text).strip() for text in texts]
1✔
184
        return "\f".join(cleaned_text)
1✔
185

186
    def _remove_regex(self, text: str, regex: str) -> str:
1✔
187
        """
188
        Remove substrings that match the specified regex from the text.
189

190
        :param text: Text to clean.
191
        :param regex: Regex to match and replace substrings by "".
192
        :returns: The text without the substrings that match the regex.
193
        """
194
        texts = text.split("\f")
1✔
195
        cleaned_text = [re.sub(regex, "", text).strip() for text in texts]
1✔
196
        return "\f".join(cleaned_text)
1✔
197

198
    def _remove_substrings(self, text: str, substrings: List[str]) -> str:
1✔
199
        """
200
        Remove all specified substrings from the text.
201

202
        :param text: Text to clean.
203
        :param substrings: Substrings to remove.
204
        :returns: The text without the specified substrings.
205
        """
206
        for substring in substrings:
1✔
207
            text = text.replace(substring, "")
1✔
208
        return text
1✔
209

210
    def _remove_repeated_substrings(self, text: str) -> str:
1✔
211
        """
212
        Remove any substrings from the text that occur repeatedly on every page. For example headers or footers.
213

214
        Pages in the text need to be separated by form feed character "\f".
215
        :param text: Text to clean.
216
        :returns: The text without the repeated substrings.
217
        """
218
        return self._find_and_remove_header_footer(
1✔
219
            text, n_chars=300, n_first_pages_to_ignore=1, n_last_pages_to_ignore=1
220
        )
221

222
    def _find_and_remove_header_footer(
1✔
223
        self, text: str, n_chars: int, n_first_pages_to_ignore: int, n_last_pages_to_ignore: int
224
    ) -> str:
225
        """
226
        Heuristic to find footers and headers across different pages by searching for the longest common string.
227

228
        Pages in the text need to be separated by form feed character "\f".
229
        For headers, we only search in the first n_chars characters (for footer: last n_chars).
230
        Note: This heuristic uses exact matches and therefore works well for footers like "Copyright 2019 by XXX",
231
         but won't detect "Page 3 of 4" or similar.
232

233
        :param n_chars: The number of first/last characters where the header/footer shall be searched in.
234
        :param n_first_pages_to_ignore: The number of first pages to ignore
235
            (e.g. TOCs often don't contain footer/header).
236
        :param n_last_pages_to_ignore: The number of last pages to ignore.
237
        :returns: The text without the found headers and footers.
238
        """
239

240
        pages = text.split("\f")
1✔
241

242
        # header
243
        start_of_pages = [p[:n_chars] for p in pages[n_first_pages_to_ignore:-n_last_pages_to_ignore]]
1✔
244
        found_header = self._find_longest_common_ngram(start_of_pages)
1✔
245
        if found_header:
1✔
246
            pages = [page.replace(found_header, "") for page in pages]
1✔
247

248
        # footer
249
        end_of_pages = [p[-n_chars:] for p in pages[n_first_pages_to_ignore:-n_last_pages_to_ignore]]
1✔
250
        found_footer = self._find_longest_common_ngram(end_of_pages)
1✔
251
        if found_footer:
1✔
252
            pages = [page.replace(found_footer, "") for page in pages]
1✔
253

254
        logger.debug(
1✔
255
            "Removed header '{header}' and footer '{footer}' in document", header=found_header, footer=found_footer
256
        )
257
        text = "\f".join(pages)
1✔
258
        return text
1✔
259

260
    def _ngram(self, seq: str, n: int) -> Generator[str, None, None]:
1✔
261
        """
262
        Return all ngrams of length n from a text sequence. Each ngram consists of n words split by whitespace.
263

264
        :param seq: The sequence to generate ngrams from.
265
        :param n: The length of the ngrams to generate.
266
        :returns: A Generator generating all ngrams of length n from the given sequence.
267
        """
268

269
        # In order to maintain the original whitespace, but still consider \n and \t for n-gram tokenization,
270
        # we add a space here and remove it after creation of the ngrams again (see below)
271
        seq = seq.replace("\n", " \n")
1✔
272
        seq = seq.replace("\t", " \t")
1✔
273

274
        words = seq.split(" ")
1✔
275
        ngrams = (
1✔
276
            " ".join(words[i : i + n]).replace(" \n", "\n").replace(" \t", "\t") for i in range(0, len(words) - n + 1)
277
        )
278

279
        return ngrams
1✔
280

281
    def _allngram(self, seq: str, min_ngram: int, max_ngram: int) -> Set[str]:
1✔
282
        """
283
        Generates all possible ngrams from a given sequence of text.
284

285
        Considering all ngram lengths between the minimum and maximum length.
286

287
        :param seq: The sequence to generate ngrams from.
288
        :param min_ngram: The minimum length of ngram to consider.
289
        :param max_ngram: The maximum length of ngram to consider.
290
        :returns: A set of all ngrams from the given sequence.
291
        """
292
        lengths = range(min_ngram, max_ngram) if max_ngram else range(min_ngram, len(seq))
1✔
293
        ngrams = map(partial(self._ngram, seq), lengths)
1✔
294
        res = set(chain.from_iterable(ngrams))
1✔
295
        return res
1✔
296

297
    def _find_longest_common_ngram(self, sequences: List[str], min_ngram: int = 3, max_ngram: int = 30) -> str:
1✔
298
        """
299
        Find the longest common ngram across a list of text sequences (e.g. start of pages).
300

301
        Considering all ngram lengths between the minimum and maximum length. Helpful for finding footers, headers etc.
302
        Empty sequences are ignored.
303

304
        :param sequences: The list of strings that shall be searched for common n_grams.
305
        :param max_ngram: The maximum length of ngram to consider.
306
        :param min_ngram: The minimum length of ngram to consider.
307
        :returns: The longest ngram that all sequences have in common.
308
        """
309
        sequences = [s for s in sequences if s]  # filter empty sequences
1✔
310
        if not sequences:
1✔
311
            return ""
×
312
        seqs_ngrams = map(partial(self._allngram, min_ngram=min_ngram, max_ngram=max_ngram), sequences)
1✔
313
        intersection = reduce(set.intersection, seqs_ngrams)
1✔
314

315
        longest = max(intersection, key=len, default="")
1✔
316
        return longest if longest.strip() else ""
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc