• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 12009698226

25 Nov 2024 12:08PM UTC coverage: 90.29% (+0.001%) from 90.289%
12009698226

push

github

web-flow
Fix DocumentCleaner not preserving Document fields (#8578)

7978 of 8836 relevant lines covered (90.29%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.1
haystack/components/preprocessors/document_cleaner.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import re
1✔
6
from copy import deepcopy
1✔
7
from functools import partial, reduce
1✔
8
from itertools import chain
1✔
9
from typing import Generator, List, Literal, Optional, Set
1✔
10
from unicodedata import normalize
1✔
11

12
from haystack import Document, component, logging
1✔
13

14
logger = logging.getLogger(__name__)
1✔
15

16

17
@component
1✔
18
class DocumentCleaner:
1✔
19
    """
20
    Cleans the text in the documents.
21

22
    It removes extra whitespaces,
23
    empty lines, specified substrings, regexes,
24
    page headers and footers (in this order).
25

26
    ### Usage example:
27

28
    ```python
29
    from haystack import Document
30
    from haystack.components.preprocessors import DocumentCleaner
31

32
    doc = Document(content="This   is  a  document  to  clean\\n\\n\\nsubstring to remove")
33

34
    cleaner = DocumentCleaner(remove_substrings = ["substring to remove"])
35
    result = cleaner.run(documents=[doc])
36

37
    assert result["documents"][0].content == "This is a document to clean "
38
    ```
39
    """
40

41
    def __init__(  # pylint: disable=too-many-positional-arguments
1✔
42
        self,
43
        remove_empty_lines: bool = True,
44
        remove_extra_whitespaces: bool = True,
45
        remove_repeated_substrings: bool = False,
46
        keep_id: bool = False,
47
        remove_substrings: Optional[List[str]] = None,
48
        remove_regex: Optional[str] = None,
49
        unicode_normalization: Optional[Literal["NFC", "NFKC", "NFD", "NFKD"]] = None,
50
        ascii_only: bool = False,
51
    ):
52
        """
53
        Initialize DocumentCleaner.
54

55
        :param remove_empty_lines: If `True`, removes empty lines.
56
        :param remove_extra_whitespaces: If `True`, removes extra whitespaces.
57
        :param remove_repeated_substrings: If `True`, removes repeated substrings (headers and footers) from pages.
58
            Pages must be separated by a form feed character "\\f",
59
            which is supported by `TextFileToDocument` and `AzureOCRDocumentConverter`.
60
        :param remove_substrings: List of substrings to remove from the text.
61
        :param remove_regex: Regex to match and replace substrings by "".
62
        :param keep_id: If `True`, keeps the IDs of the original documents.
63
        :param unicode_normalization: Unicode normalization form to apply to the text.
64
            Note: This will run before any other steps.
65
        :param ascii_only: Whether to convert the text to ASCII only.
66
            Will remove accents from characters and replace them with ASCII characters.
67
            Other non-ASCII characters will be removed.
68
            Note: This will run before any pattern matching or removal.
69
        """
70

71
        self._validate_params(unicode_normalization=unicode_normalization)
1✔
72

73
        self.remove_empty_lines = remove_empty_lines
1✔
74
        self.remove_extra_whitespaces = remove_extra_whitespaces
1✔
75
        self.remove_repeated_substrings = remove_repeated_substrings
1✔
76
        self.remove_substrings = remove_substrings
1✔
77
        self.remove_regex = remove_regex
1✔
78
        self.keep_id = keep_id
1✔
79
        self.unicode_normalization = unicode_normalization
1✔
80
        self.ascii_only = ascii_only
1✔
81

82
    def _validate_params(self, unicode_normalization: Optional[str]):
1✔
83
        """
84
        Validate the parameters of the DocumentCleaner.
85

86
        :param unicode_normalization: Unicode normalization form to apply to the text.
87
        :raises ValueError: if the parameters are not valid.
88
        """
89
        if unicode_normalization and unicode_normalization not in ["NFC", "NFKC", "NFD", "NFKD"]:
1✔
90
            raise ValueError("unicode_normalization must be one of 'NFC', 'NFKC', 'NFD', 'NFKD'.")
×
91

92
    @component.output_types(documents=List[Document])
1✔
93
    def run(self, documents: List[Document]):
1✔
94
        """
95
        Cleans up the documents.
96

97
        :param documents: List of Documents to clean.
98

99
        :returns: A dictionary with the following key:
100
            - `documents`: List of cleaned Documents.
101

102
        :raises TypeError: if documents is not a list of Documents.
103
        """
104
        if not isinstance(documents, list) or documents and not isinstance(documents[0], Document):
1✔
105
            raise TypeError("DocumentCleaner expects a List of Documents as input.")
1✔
106

107
        cleaned_docs = []
1✔
108
        for doc in documents:
1✔
109
            if doc.content is None:
1✔
110
                logger.warning(
1✔
111
                    "DocumentCleaner only cleans text documents but document.content for document ID"
112
                    " %{document_id} is None.",
113
                    document_id=doc.id,
114
                )
115
                cleaned_docs.append(doc)
1✔
116
                continue
1✔
117
            text = doc.content
1✔
118

119
            if self.unicode_normalization:
1✔
120
                text = self._normalize_unicode(text, self.unicode_normalization)
1✔
121
            if self.ascii_only:
1✔
122
                text = self._ascii_only(text)
1✔
123
            if self.remove_extra_whitespaces:
1✔
124
                text = self._remove_extra_whitespaces(text)
1✔
125
            if self.remove_empty_lines:
1✔
126
                text = self._remove_empty_lines(text)
1✔
127
            if self.remove_substrings:
1✔
128
                text = self._remove_substrings(text, self.remove_substrings)
1✔
129
            if self.remove_regex:
1✔
130
                text = self._remove_regex(text, self.remove_regex)
1✔
131
            if self.remove_repeated_substrings:
1✔
132
                text = self._remove_repeated_substrings(text)
1✔
133

134
            clean_doc = Document(
1✔
135
                id=doc.id if self.keep_id else "",
136
                content=text,
137
                dataframe=doc.dataframe,
138
                blob=doc.blob,
139
                meta=deepcopy(doc.meta),
140
                score=doc.score,
141
                embedding=doc.embedding,
142
                sparse_embedding=doc.sparse_embedding,
143
            )
144
            cleaned_docs.append(clean_doc)
1✔
145

146
        return {"documents": cleaned_docs}
1✔
147

148
    def _normalize_unicode(self, text: str, form: Literal["NFC", "NFKC", "NFD", "NFKD"]) -> str:
1✔
149
        """
150
        Normalize the unicode of the text.
151

152
        :param text: Text to normalize.
153
        :param form: Unicode normalization form to apply to the text.
154
            Options: "NFC", "NFKC", "NFD", "NFKD".
155
        :returns: The normalized text.
156
        """
157
        return normalize(form, text)
1✔
158

159
    def _ascii_only(self, text: str) -> str:
1✔
160
        """
161
        Convert the text to ASCII only.
162

163
        Will remove accents from characters and replace them with ASCII characters.
164
        Other non-ASCII characters will be removed.
165

166
        :param text: Text to convert to ASCII only.
167
        :returns: The text in ASCII only.
168
        """
169

170
        # First normalize the text to NFKD to separate the characters and their diacritics
171
        # Then encode it to ASCII and ignore any characters that can't be encoded
172
        return self._normalize_unicode(text, "NFKD").encode("ascii", "ignore").decode("utf-8")
1✔
173

174
    def _remove_empty_lines(self, text: str) -> str:
1✔
175
        """
176
        Remove empty lines and lines that contain nothing but whitespaces from text.
177

178
        :param text: Text to clean.
179
        :returns: The text without empty lines.
180
        """
181
        pages = text.split("\f")
1✔
182
        cleaned_pages = ["\n".join(line for line in page.split("\n") if line.strip()) for page in pages]
1✔
183
        return "\f".join(cleaned_pages)
1✔
184

185
    def _remove_extra_whitespaces(self, text: str) -> str:
1✔
186
        """
187
        Remove extra whitespaces from text.
188

189
        :param text: Text to clean.
190
        :returns: The text without extra whitespaces.
191
        """
192
        texts = text.split("\f")
1✔
193
        cleaned_text = [re.sub(r"\s\s+", " ", text).strip() for text in texts]
1✔
194
        return "\f".join(cleaned_text)
1✔
195

196
    def _remove_regex(self, text: str, regex: str) -> str:
1✔
197
        """
198
        Remove substrings that match the specified regex from the text.
199

200
        :param text: Text to clean.
201
        :param regex: Regex to match and replace substrings by "".
202
        :returns: The text without the substrings that match the regex.
203
        """
204
        texts = text.split("\f")
1✔
205
        cleaned_text = [re.sub(regex, "", text).strip() for text in texts]
1✔
206
        return "\f".join(cleaned_text)
1✔
207

208
    def _remove_substrings(self, text: str, substrings: List[str]) -> str:
1✔
209
        """
210
        Remove all specified substrings from the text.
211

212
        :param text: Text to clean.
213
        :param substrings: Substrings to remove.
214
        :returns: The text without the specified substrings.
215
        """
216
        for substring in substrings:
1✔
217
            text = text.replace(substring, "")
1✔
218
        return text
1✔
219

220
    def _remove_repeated_substrings(self, text: str) -> str:
1✔
221
        """
222
        Remove any substrings from the text that occur repeatedly on every page. For example headers or footers.
223

224
        Pages in the text need to be separated by form feed character "\f".
225
        :param text: Text to clean.
226
        :returns: The text without the repeated substrings.
227
        """
228
        return self._find_and_remove_header_footer(
1✔
229
            text, n_chars=300, n_first_pages_to_ignore=1, n_last_pages_to_ignore=1
230
        )
231

232
    def _find_and_remove_header_footer(
1✔
233
        self, text: str, n_chars: int, n_first_pages_to_ignore: int, n_last_pages_to_ignore: int
234
    ) -> str:
235
        """
236
        Heuristic to find footers and headers across different pages by searching for the longest common string.
237

238
        Pages in the text need to be separated by form feed character "\f".
239
        For headers, we only search in the first n_chars characters (for footer: last n_chars).
240
        Note: This heuristic uses exact matches and therefore works well for footers like "Copyright 2019 by XXX",
241
         but won't detect "Page 3 of 4" or similar.
242

243
        :param n_chars: The number of first/last characters where the header/footer shall be searched in.
244
        :param n_first_pages_to_ignore: The number of first pages to ignore
245
            (e.g. TOCs often don't contain footer/header).
246
        :param n_last_pages_to_ignore: The number of last pages to ignore.
247
        :returns: The text without the found headers and footers.
248
        """
249

250
        pages = text.split("\f")
1✔
251

252
        # header
253
        start_of_pages = [p[:n_chars] for p in pages[n_first_pages_to_ignore:-n_last_pages_to_ignore]]
1✔
254
        found_header = self._find_longest_common_ngram(start_of_pages)
1✔
255
        if found_header:
1✔
256
            pages = [page.replace(found_header, "") for page in pages]
1✔
257

258
        # footer
259
        end_of_pages = [p[-n_chars:] for p in pages[n_first_pages_to_ignore:-n_last_pages_to_ignore]]
1✔
260
        found_footer = self._find_longest_common_ngram(end_of_pages)
1✔
261
        if found_footer:
1✔
262
            pages = [page.replace(found_footer, "") for page in pages]
1✔
263

264
        logger.debug(
1✔
265
            "Removed header '{header}' and footer '{footer}' in document", header=found_header, footer=found_footer
266
        )
267
        text = "\f".join(pages)
1✔
268
        return text
1✔
269

270
    def _ngram(self, seq: str, n: int) -> Generator[str, None, None]:
1✔
271
        """
272
        Return all ngrams of length n from a text sequence. Each ngram consists of n words split by whitespace.
273

274
        :param seq: The sequence to generate ngrams from.
275
        :param n: The length of the ngrams to generate.
276
        :returns: A Generator generating all ngrams of length n from the given sequence.
277
        """
278

279
        # In order to maintain the original whitespace, but still consider \n and \t for n-gram tokenization,
280
        # we add a space here and remove it after creation of the ngrams again (see below)
281
        seq = seq.replace("\n", " \n")
1✔
282
        seq = seq.replace("\t", " \t")
1✔
283

284
        words = seq.split(" ")
1✔
285
        ngrams = (
1✔
286
            " ".join(words[i : i + n]).replace(" \n", "\n").replace(" \t", "\t") for i in range(0, len(words) - n + 1)
287
        )
288

289
        return ngrams
1✔
290

291
    def _allngram(self, seq: str, min_ngram: int, max_ngram: int) -> Set[str]:
1✔
292
        """
293
        Generates all possible ngrams from a given sequence of text.
294

295
        Considering all ngram lengths between the minimum and maximum length.
296

297
        :param seq: The sequence to generate ngrams from.
298
        :param min_ngram: The minimum length of ngram to consider.
299
        :param max_ngram: The maximum length of ngram to consider.
300
        :returns: A set of all ngrams from the given sequence.
301
        """
302
        lengths = range(min_ngram, max_ngram) if max_ngram else range(min_ngram, len(seq))
1✔
303
        ngrams = map(partial(self._ngram, seq), lengths)
1✔
304
        res = set(chain.from_iterable(ngrams))
1✔
305
        return res
1✔
306

307
    def _find_longest_common_ngram(self, sequences: List[str], min_ngram: int = 3, max_ngram: int = 30) -> str:
1✔
308
        """
309
        Find the longest common ngram across a list of text sequences (e.g. start of pages).
310

311
        Considering all ngram lengths between the minimum and maximum length. Helpful for finding footers, headers etc.
312
        Empty sequences are ignored.
313

314
        :param sequences: The list of strings that shall be searched for common n_grams.
315
        :param max_ngram: The maximum length of ngram to consider.
316
        :param min_ngram: The minimum length of ngram to consider.
317
        :returns: The longest ngram that all sequences have in common.
318
        """
319
        sequences = [s for s in sequences if s]  # filter empty sequences
1✔
320
        if not sequences:
1✔
321
            return ""
×
322
        seqs_ngrams = map(partial(self._allngram, min_ngram=min_ngram, max_ngram=max_ngram), sequences)
1✔
323
        intersection = reduce(set.intersection, seqs_ngrams)
1✔
324

325
        longest = max(intersection, key=len, default="")
1✔
326
        return longest if longest.strip() else ""
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc