• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 13634803133

03 Mar 2025 03:47PM UTC coverage: 90.124% (+0.1%) from 89.986%
13634803133

Pull #8906

github

web-flow
Merge e48e49114 into 1b2053b35
Pull Request #8906: refactor!: remove `dataframe` field from `Document` and `ExtractedTableAnswer`; make `pandas` optional

9536 of 10581 relevant lines covered (90.12%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.1
haystack/components/preprocessors/document_cleaner.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import re
1✔
6
from copy import deepcopy
1✔
7
from functools import partial, reduce
1✔
8
from itertools import chain
1✔
9
from typing import Generator, List, Literal, Optional, Set
1✔
10
from unicodedata import normalize
1✔
11

12
from haystack import Document, component, logging
1✔
13

14
logger = logging.getLogger(__name__)
1✔
15

16

17
@component
1✔
18
class DocumentCleaner:
1✔
19
    """
20
    Cleans the text in the documents.
21

22
    It removes extra whitespaces,
23
    empty lines, specified substrings, regexes,
24
    page headers and footers (in this order).
25

26
    ### Usage example:
27

28
    ```python
29
    from haystack import Document
30
    from haystack.components.preprocessors import DocumentCleaner
31

32
    doc = Document(content="This   is  a  document  to  clean\\n\\n\\nsubstring to remove")
33

34
    cleaner = DocumentCleaner(remove_substrings = ["substring to remove"])
35
    result = cleaner.run(documents=[doc])
36

37
    assert result["documents"][0].content == "This is a document to clean "
38
    ```
39
    """
40

41
    def __init__(  # pylint: disable=too-many-positional-arguments
1✔
42
        self,
43
        remove_empty_lines: bool = True,
44
        remove_extra_whitespaces: bool = True,
45
        remove_repeated_substrings: bool = False,
46
        keep_id: bool = False,
47
        remove_substrings: Optional[List[str]] = None,
48
        remove_regex: Optional[str] = None,
49
        unicode_normalization: Optional[Literal["NFC", "NFKC", "NFD", "NFKD"]] = None,
50
        ascii_only: bool = False,
51
    ):
52
        """
53
        Initialize DocumentCleaner.
54

55
        :param remove_empty_lines: If `True`, removes empty lines.
56
        :param remove_extra_whitespaces: If `True`, removes extra whitespaces.
57
        :param remove_repeated_substrings: If `True`, removes repeated substrings (headers and footers) from pages.
58
            Pages must be separated by a form feed character "\\f",
59
            which is supported by `TextFileToDocument` and `AzureOCRDocumentConverter`.
60
        :param remove_substrings: List of substrings to remove from the text.
61
        :param remove_regex: Regex to match and replace substrings by "".
62
        :param keep_id: If `True`, keeps the IDs of the original documents.
63
        :param unicode_normalization: Unicode normalization form to apply to the text.
64
            Note: This will run before any other steps.
65
        :param ascii_only: Whether to convert the text to ASCII only.
66
            Will remove accents from characters and replace them with ASCII characters.
67
            Other non-ASCII characters will be removed.
68
            Note: This will run before any pattern matching or removal.
69
        """
70

71
        self._validate_params(unicode_normalization=unicode_normalization)
1✔
72

73
        self.remove_empty_lines = remove_empty_lines
1✔
74
        self.remove_extra_whitespaces = remove_extra_whitespaces
1✔
75
        self.remove_repeated_substrings = remove_repeated_substrings
1✔
76
        self.remove_substrings = remove_substrings
1✔
77
        self.remove_regex = remove_regex
1✔
78
        self.keep_id = keep_id
1✔
79
        self.unicode_normalization = unicode_normalization
1✔
80
        self.ascii_only = ascii_only
1✔
81

82
    def _validate_params(self, unicode_normalization: Optional[str]):
1✔
83
        """
84
        Validate the parameters of the DocumentCleaner.
85

86
        :param unicode_normalization: Unicode normalization form to apply to the text.
87
        :raises ValueError: if the parameters are not valid.
88
        """
89
        if unicode_normalization and unicode_normalization not in ["NFC", "NFKC", "NFD", "NFKD"]:
1✔
90
            raise ValueError("unicode_normalization must be one of 'NFC', 'NFKC', 'NFD', 'NFKD'.")
×
91

92
    @component.output_types(documents=List[Document])
1✔
93
    def run(self, documents: List[Document]):
1✔
94
        """
95
        Cleans up the documents.
96

97
        :param documents: List of Documents to clean.
98

99
        :returns: A dictionary with the following key:
100
            - `documents`: List of cleaned Documents.
101

102
        :raises TypeError: if documents is not a list of Documents.
103
        """
104
        if not isinstance(documents, list) or documents and not isinstance(documents[0], Document):
1✔
105
            raise TypeError("DocumentCleaner expects a List of Documents as input.")
1✔
106

107
        cleaned_docs = []
1✔
108
        for doc in documents:
1✔
109
            if doc.content is None:
1✔
110
                logger.warning(
1✔
111
                    "DocumentCleaner only cleans text documents but document.content for document ID"
112
                    " %{document_id} is None.",
113
                    document_id=doc.id,
114
                )
115
                cleaned_docs.append(doc)
1✔
116
                continue
1✔
117
            text = doc.content
1✔
118

119
            if self.unicode_normalization:
1✔
120
                text = self._normalize_unicode(text, self.unicode_normalization)
1✔
121
            if self.ascii_only:
1✔
122
                text = self._ascii_only(text)
1✔
123
            if self.remove_extra_whitespaces:
1✔
124
                text = self._remove_extra_whitespaces(text)
1✔
125
            if self.remove_empty_lines:
1✔
126
                text = self._remove_empty_lines(text)
1✔
127
            if self.remove_substrings:
1✔
128
                text = self._remove_substrings(text, self.remove_substrings)
1✔
129
            if self.remove_regex:
1✔
130
                text = self._remove_regex(text, self.remove_regex)
1✔
131
            if self.remove_repeated_substrings:
1✔
132
                text = self._remove_repeated_substrings(text)
1✔
133

134
            clean_doc = Document(
1✔
135
                id=doc.id if self.keep_id else "",
136
                content=text,
137
                blob=doc.blob,
138
                meta=deepcopy(doc.meta),
139
                score=doc.score,
140
                embedding=doc.embedding,
141
                sparse_embedding=doc.sparse_embedding,
142
            )
143
            cleaned_docs.append(clean_doc)
1✔
144

145
        return {"documents": cleaned_docs}
1✔
146

147
    def _normalize_unicode(self, text: str, form: Literal["NFC", "NFKC", "NFD", "NFKD"]) -> str:
1✔
148
        """
149
        Normalize the unicode of the text.
150

151
        :param text: Text to normalize.
152
        :param form: Unicode normalization form to apply to the text.
153
            Options: "NFC", "NFKC", "NFD", "NFKD".
154
        :returns: The normalized text.
155
        """
156
        return normalize(form, text)
1✔
157

158
    def _ascii_only(self, text: str) -> str:
1✔
159
        """
160
        Convert the text to ASCII only.
161

162
        Will remove accents from characters and replace them with ASCII characters.
163
        Other non-ASCII characters will be removed.
164

165
        :param text: Text to convert to ASCII only.
166
        :returns: The text in ASCII only.
167
        """
168

169
        # First normalize the text to NFKD to separate the characters and their diacritics
170
        # Then encode it to ASCII and ignore any characters that can't be encoded
171
        return self._normalize_unicode(text, "NFKD").encode("ascii", "ignore").decode("utf-8")
1✔
172

173
    def _remove_empty_lines(self, text: str) -> str:
1✔
174
        """
175
        Remove empty lines and lines that contain nothing but whitespaces from text.
176

177
        :param text: Text to clean.
178
        :returns: The text without empty lines.
179
        """
180
        pages = text.split("\f")
1✔
181
        cleaned_pages = ["\n".join(line for line in page.split("\n") if line.strip()) for page in pages]
1✔
182
        return "\f".join(cleaned_pages)
1✔
183

184
    def _remove_extra_whitespaces(self, text: str) -> str:
1✔
185
        """
186
        Remove extra whitespaces from text.
187

188
        :param text: Text to clean.
189
        :returns: The text without extra whitespaces.
190
        """
191
        texts = text.split("\f")
1✔
192
        cleaned_text = [re.sub(r"\s\s+", " ", text).strip() for text in texts]
1✔
193
        return "\f".join(cleaned_text)
1✔
194

195
    def _remove_regex(self, text: str, regex: str) -> str:
1✔
196
        """
197
        Remove substrings that match the specified regex from the text.
198

199
        :param text: Text to clean.
200
        :param regex: Regex to match and replace substrings by "".
201
        :returns: The text without the substrings that match the regex.
202
        """
203
        texts = text.split("\f")
1✔
204
        cleaned_text = [re.sub(regex, "", text).strip() for text in texts]
1✔
205
        return "\f".join(cleaned_text)
1✔
206

207
    def _remove_substrings(self, text: str, substrings: List[str]) -> str:
1✔
208
        """
209
        Remove all specified substrings from the text.
210

211
        :param text: Text to clean.
212
        :param substrings: Substrings to remove.
213
        :returns: The text without the specified substrings.
214
        """
215
        for substring in substrings:
1✔
216
            text = text.replace(substring, "")
1✔
217
        return text
1✔
218

219
    def _remove_repeated_substrings(self, text: str) -> str:
1✔
220
        """
221
        Remove any substrings from the text that occur repeatedly on every page. For example headers or footers.
222

223
        Pages in the text need to be separated by form feed character "\f".
224
        :param text: Text to clean.
225
        :returns: The text without the repeated substrings.
226
        """
227
        return self._find_and_remove_header_footer(
1✔
228
            text, n_chars=300, n_first_pages_to_ignore=1, n_last_pages_to_ignore=1
229
        )
230

231
    def _find_and_remove_header_footer(
1✔
232
        self, text: str, n_chars: int, n_first_pages_to_ignore: int, n_last_pages_to_ignore: int
233
    ) -> str:
234
        """
235
        Heuristic to find footers and headers across different pages by searching for the longest common string.
236

237
        Pages in the text need to be separated by form feed character "\f".
238
        For headers, we only search in the first n_chars characters (for footer: last n_chars).
239
        Note: This heuristic uses exact matches and therefore works well for footers like "Copyright 2019 by XXX",
240
         but won't detect "Page 3 of 4" or similar.
241

242
        :param n_chars: The number of first/last characters where the header/footer shall be searched in.
243
        :param n_first_pages_to_ignore: The number of first pages to ignore
244
            (e.g. TOCs often don't contain footer/header).
245
        :param n_last_pages_to_ignore: The number of last pages to ignore.
246
        :returns: The text without the found headers and footers.
247
        """
248

249
        pages = text.split("\f")
1✔
250

251
        # header
252
        start_of_pages = [p[:n_chars] for p in pages[n_first_pages_to_ignore:-n_last_pages_to_ignore]]
1✔
253
        found_header = self._find_longest_common_ngram(start_of_pages)
1✔
254
        if found_header:
1✔
255
            pages = [page.replace(found_header, "") for page in pages]
1✔
256

257
        # footer
258
        end_of_pages = [p[-n_chars:] for p in pages[n_first_pages_to_ignore:-n_last_pages_to_ignore]]
1✔
259
        found_footer = self._find_longest_common_ngram(end_of_pages)
1✔
260
        if found_footer:
1✔
261
            pages = [page.replace(found_footer, "") for page in pages]
1✔
262

263
        logger.debug(
1✔
264
            "Removed header '{header}' and footer '{footer}' in document", header=found_header, footer=found_footer
265
        )
266
        text = "\f".join(pages)
1✔
267
        return text
1✔
268

269
    def _ngram(self, seq: str, n: int) -> Generator[str, None, None]:
1✔
270
        """
271
        Return all ngrams of length n from a text sequence. Each ngram consists of n words split by whitespace.
272

273
        :param seq: The sequence to generate ngrams from.
274
        :param n: The length of the ngrams to generate.
275
        :returns: A Generator generating all ngrams of length n from the given sequence.
276
        """
277

278
        # In order to maintain the original whitespace, but still consider \n and \t for n-gram tokenization,
279
        # we add a space here and remove it after creation of the ngrams again (see below)
280
        seq = seq.replace("\n", " \n")
1✔
281
        seq = seq.replace("\t", " \t")
1✔
282

283
        words = seq.split(" ")
1✔
284
        ngrams = (
1✔
285
            " ".join(words[i : i + n]).replace(" \n", "\n").replace(" \t", "\t") for i in range(0, len(words) - n + 1)
286
        )
287

288
        return ngrams
1✔
289

290
    def _allngram(self, seq: str, min_ngram: int, max_ngram: int) -> Set[str]:
1✔
291
        """
292
        Generates all possible ngrams from a given sequence of text.
293

294
        Considering all ngram lengths between the minimum and maximum length.
295

296
        :param seq: The sequence to generate ngrams from.
297
        :param min_ngram: The minimum length of ngram to consider.
298
        :param max_ngram: The maximum length of ngram to consider.
299
        :returns: A set of all ngrams from the given sequence.
300
        """
301
        lengths = range(min_ngram, max_ngram) if max_ngram else range(min_ngram, len(seq))
1✔
302
        ngrams = map(partial(self._ngram, seq), lengths)
1✔
303
        res = set(chain.from_iterable(ngrams))
1✔
304
        return res
1✔
305

306
    def _find_longest_common_ngram(self, sequences: List[str], min_ngram: int = 3, max_ngram: int = 30) -> str:
1✔
307
        """
308
        Find the longest common ngram across a list of text sequences (e.g. start of pages).
309

310
        Considering all ngram lengths between the minimum and maximum length. Helpful for finding footers, headers etc.
311
        Empty sequences are ignored.
312

313
        :param sequences: The list of strings that shall be searched for common n_grams.
314
        :param max_ngram: The maximum length of ngram to consider.
315
        :param min_ngram: The minimum length of ngram to consider.
316
        :returns: The longest ngram that all sequences have in common.
317
        """
318
        sequences = [s for s in sequences if s]  # filter empty sequences
1✔
319
        if not sequences:
1✔
320
            return ""
×
321
        seqs_ngrams = map(partial(self._allngram, min_ngram=min_ngram, max_ngram=max_ngram), sequences)
1✔
322
        intersection = reduce(set.intersection, seqs_ngrams)
1✔
323

324
        longest = max(intersection, key=len, default="")
1✔
325
        return longest if longest.strip() else ""
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc