• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 12429118671

20 Dec 2024 09:41AM UTC coverage: 90.685% (-0.009%) from 90.694%
12429118671

push

github

web-flow
fix: Move potential nltk download to warm_up (#8646)

* Move potential nltk download to warm_up

* Update tests

* Add release notes

* Fix tests

* Uncomment

* Make mypy happy

* Add RuntimeError message

* Update release notes

---------

Co-authored-by: Julian Risch <julian.risch@deepset.ai>

8363 of 9222 relevant lines covered (90.69%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.51
haystack/components/preprocessors/document_splitter.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import warnings
1✔
6
from copy import deepcopy
1✔
7
from typing import Any, Callable, Dict, List, Literal, Optional, Tuple
1✔
8

9
from more_itertools import windowed
1✔
10

11
from haystack import Document, component, logging
1✔
12
from haystack.components.preprocessors.sentence_tokenizer import Language, SentenceSplitter, nltk_imports
1✔
13
from haystack.core.serialization import default_from_dict, default_to_dict
1✔
14
from haystack.utils import deserialize_callable, serialize_callable
1✔
15

16
logger = logging.getLogger(__name__)
1✔
17

18
# mapping of split by character, 'function' and 'sentence' don't split by character
19
_CHARACTER_SPLIT_BY_MAPPING = {"page": "\f", "passage": "\n\n", "period": ".", "word": " ", "line": "\n"}
1✔
20

21

22
@component
1✔
23
class DocumentSplitter:
1✔
24
    """
25
    Splits long documents into smaller chunks.
26

27
    This is a common preprocessing step during indexing. It helps Embedders create meaningful semantic representations
28
    and prevents exceeding language model context limits.
29

30
    The DocumentSplitter is compatible with the following DocumentStores:
31
    - [Astra](https://docs.haystack.deepset.ai/docs/astradocumentstore)
32
    - [Chroma](https://docs.haystack.deepset.ai/docs/chromadocumentstore) limited support, overlapping information is
33
      not stored
34
    - [Elasticsearch](https://docs.haystack.deepset.ai/docs/elasticsearch-document-store)
35
    - [OpenSearch](https://docs.haystack.deepset.ai/docs/opensearch-document-store)
36
    - [Pgvector](https://docs.haystack.deepset.ai/docs/pgvectordocumentstore)
37
    - [Pinecone](https://docs.haystack.deepset.ai/docs/pinecone-document-store) limited support, overlapping
38
       information is not stored
39
    - [Qdrant](https://docs.haystack.deepset.ai/docs/qdrant-document-store)
40
    - [Weaviate](https://docs.haystack.deepset.ai/docs/weaviatedocumentstore)
41

42
    ### Usage example
43

44
    ```python
45
    from haystack import Document
46
    from haystack.components.preprocessors import DocumentSplitter
47

48
    doc = Document(content="Moonlight shimmered softly, wolves howled nearby, night enveloped everything.")
49

50
    splitter = DocumentSplitter(split_by="word", split_length=3, split_overlap=0)
51
    result = splitter.run(documents=[doc])
52
    ```
53
    """
54

55
    def __init__(  # pylint: disable=too-many-positional-arguments
1✔
56
        self,
57
        split_by: Literal["function", "page", "passage", "period", "word", "line", "sentence"] = "word",
58
        split_length: int = 200,
59
        split_overlap: int = 0,
60
        split_threshold: int = 0,
61
        splitting_function: Optional[Callable[[str], List[str]]] = None,
62
        respect_sentence_boundary: bool = False,
63
        language: Language = "en",
64
        use_split_rules: bool = True,
65
        extend_abbreviations: bool = True,
66
    ):
67
        """
68
        Initialize DocumentSplitter.
69

70
        :param split_by: The unit for splitting your documents. Choose from:
71
            - `word` for splitting by spaces (" ")
72
            - `period` for splitting by periods (".")
73
            - `page` for splitting by form feed ("\\f")
74
            - `passage` for splitting by double line breaks ("\\n\\n")
75
            - `line` for splitting each line ("\\n")
76
            - `sentence` for splitting by NLTK sentence tokenizer
77

78
        :param split_length: The maximum number of units in each split.
79
        :param split_overlap: The number of overlapping units for each split.
80
        :param split_threshold: The minimum number of units per split. If a split has fewer units
81
            than the threshold, it's attached to the previous split.
82
        :param splitting_function: Necessary when `split_by` is set to "function".
83
            This is a function which must accept a single `str` as input and return a `list` of `str` as output,
84
            representing the chunks after splitting.
85
        :param respect_sentence_boundary: Choose whether to respect sentence boundaries when splitting by "word".
86
            If True, uses NLTK to detect sentence boundaries, ensuring splits occur only between sentences.
87
        :param language: Choose the language for the NLTK tokenizer. The default is English ("en").
88
        :param use_split_rules: Choose whether to use additional split rules when splitting by `sentence`.
89
        :param extend_abbreviations: Choose whether to extend NLTK's PunktTokenizer abbreviations with a list
90
            of curated abbreviations, if available. This is currently supported for English ("en") and German ("de").
91
        """
92

93
        self.split_by = split_by
1✔
94
        self.split_length = split_length
1✔
95
        self.split_overlap = split_overlap
1✔
96
        self.split_threshold = split_threshold
1✔
97
        self.splitting_function = splitting_function
1✔
98
        self.respect_sentence_boundary = respect_sentence_boundary
1✔
99
        self.language = language
1✔
100
        self.use_split_rules = use_split_rules
1✔
101
        self.extend_abbreviations = extend_abbreviations
1✔
102

103
        self._init_checks(
1✔
104
            split_by=split_by,
105
            split_length=split_length,
106
            split_overlap=split_overlap,
107
            splitting_function=splitting_function,
108
            respect_sentence_boundary=respect_sentence_boundary,
109
        )
110
        self._use_sentence_splitter = split_by == "sentence" or (respect_sentence_boundary and split_by == "word")
1✔
111
        if self._use_sentence_splitter:
1✔
112
            nltk_imports.check()
1✔
113
            self.sentence_splitter = None
1✔
114

115
        if split_by == "sentence":
1✔
116
            # ToDo: remove this warning in the next major release
117
            msg = (
1✔
118
                "The `split_by='sentence'` no longer splits by '.' and now relies on custom sentence tokenizer "
119
                "based on NLTK. To achieve the previous behaviour use `split_by='period'."
120
            )
121
            warnings.warn(msg)
1✔
122

123
    def _init_checks(
1✔
124
        self,
125
        *,
126
        split_by: str,
127
        split_length: int,
128
        split_overlap: int,
129
        splitting_function: Optional[Callable],
130
        respect_sentence_boundary: bool,
131
    ) -> None:
132
        """
133
        Validates initialization parameters for DocumentSplitter.
134

135
        :param split_by: The unit for splitting documents
136
        :param split_length: The maximum number of units in each split
137
        :param split_overlap: The number of overlapping units for each split
138
        :param splitting_function: Custom function for splitting when split_by="function"
139
        :param respect_sentence_boundary: Whether to respect sentence boundaries when splitting
140
        :raises ValueError: If any parameter is invalid
141
        """
142
        valid_split_by = ["function", "page", "passage", "period", "word", "line", "sentence"]
1✔
143
        if split_by not in valid_split_by:
1✔
144
            raise ValueError(f"split_by must be one of {', '.join(valid_split_by)}.")
1✔
145

146
        if split_by == "function" and splitting_function is None:
1✔
147
            raise ValueError("When 'split_by' is set to 'function', a valid 'splitting_function' must be provided.")
1✔
148

149
        if split_length <= 0:
1✔
150
            raise ValueError("split_length must be greater than 0.")
1✔
151

152
        if split_overlap < 0:
1✔
153
            raise ValueError("split_overlap must be greater than or equal to 0.")
1✔
154

155
        if respect_sentence_boundary and split_by != "word":
1✔
156
            logger.warning(
1✔
157
                "The 'respect_sentence_boundary' option is only supported for `split_by='word'`. "
158
                "The option `respect_sentence_boundary` will be set to `False`."
159
            )
160
            self.respect_sentence_boundary = False
1✔
161

162
    def warm_up(self):
1✔
163
        """
164
        Warm up the DocumentSplitter by loading the sentence tokenizer.
165
        """
166
        if self._use_sentence_splitter and self.sentence_splitter is None:
1✔
167
            self.sentence_splitter = SentenceSplitter(
1✔
168
                language=self.language,
169
                use_split_rules=self.use_split_rules,
170
                extend_abbreviations=self.extend_abbreviations,
171
                keep_white_spaces=True,
172
            )
173

174
    @component.output_types(documents=List[Document])
1✔
175
    def run(self, documents: List[Document]):
1✔
176
        """
177
        Split documents into smaller parts.
178

179
        Splits documents by the unit expressed in `split_by`, with a length of `split_length`
180
        and an overlap of `split_overlap`.
181

182
        :param documents: The documents to split.
183
        :returns: A dictionary with the following key:
184
            - `documents`: List of documents with the split texts. Each document includes:
185
                - A metadata field `source_id` to track the original document.
186
                - A metadata field `page_number` to track the original page number.
187
                - All other metadata copied from the original document.
188

189
        :raises TypeError: if the input is not a list of Documents.
190
        :raises ValueError: if the content of a document is None.
191
        """
192
        if self._use_sentence_splitter and self.sentence_splitter is None:
1✔
193
            raise RuntimeError(
×
194
                "The component DocumentSplitter wasn't warmed up. Run 'warm_up()' before calling 'run()'."
195
            )
196

197
        if not isinstance(documents, list) or (documents and not isinstance(documents[0], Document)):
1✔
198
            raise TypeError("DocumentSplitter expects a List of Documents as input.")
1✔
199

200
        split_docs: List[Document] = []
1✔
201
        for doc in documents:
1✔
202
            if doc.content is None:
1✔
203
                raise ValueError(
1✔
204
                    f"DocumentSplitter only works with text documents but content for document ID {doc.id} is None."
205
                )
206
            if doc.content == "":
1✔
207
                logger.warning("Document ID {doc_id} has an empty content. Skipping this document.", doc_id=doc.id)
1✔
208
                continue
1✔
209

210
            split_docs += self._split_document(doc)
1✔
211
        return {"documents": split_docs}
1✔
212

213
    def _split_document(self, doc: Document) -> List[Document]:
1✔
214
        if self.split_by == "sentence" or self.respect_sentence_boundary:
1✔
215
            return self._split_by_nltk_sentence(doc)
1✔
216

217
        if self.split_by == "function" and self.splitting_function is not None:
1✔
218
            return self._split_by_function(doc)
1✔
219

220
        return self._split_by_character(doc)
1✔
221

222
    def _split_by_nltk_sentence(self, doc: Document) -> List[Document]:
1✔
223
        split_docs = []
1✔
224

225
        result = self.sentence_splitter.split_sentences(doc.content)  # type: ignore # None check is done in run()
1✔
226
        units = [sentence["sentence"] for sentence in result]
1✔
227

228
        if self.respect_sentence_boundary:
1✔
229
            text_splits, splits_pages, splits_start_idxs = self._concatenate_sentences_based_on_word_amount(
1✔
230
                sentences=units, split_length=self.split_length, split_overlap=self.split_overlap
231
            )
232
        else:
233
            text_splits, splits_pages, splits_start_idxs = self._concatenate_units(
1✔
234
                elements=units,
235
                split_length=self.split_length,
236
                split_overlap=self.split_overlap,
237
                split_threshold=self.split_threshold,
238
            )
239
        metadata = deepcopy(doc.meta)
1✔
240
        metadata["source_id"] = doc.id
1✔
241
        split_docs += self._create_docs_from_splits(
1✔
242
            text_splits=text_splits, splits_pages=splits_pages, splits_start_idxs=splits_start_idxs, meta=metadata
243
        )
244

245
        return split_docs
1✔
246

247
    def _split_by_character(self, doc) -> List[Document]:
1✔
248
        split_at = _CHARACTER_SPLIT_BY_MAPPING[self.split_by]
1✔
249
        units = doc.content.split(split_at)
1✔
250
        # Add the delimiter back to all units except the last one
251
        for i in range(len(units) - 1):
1✔
252
            units[i] += split_at
1✔
253
        text_splits, splits_pages, splits_start_idxs = self._concatenate_units(
1✔
254
            units, self.split_length, self.split_overlap, self.split_threshold
255
        )
256
        metadata = deepcopy(doc.meta)
1✔
257
        metadata["source_id"] = doc.id
1✔
258
        return self._create_docs_from_splits(
1✔
259
            text_splits=text_splits, splits_pages=splits_pages, splits_start_idxs=splits_start_idxs, meta=metadata
260
        )
261

262
    def _split_by_function(self, doc) -> List[Document]:
1✔
263
        # the check for None is done already in the run method
264
        splits = self.splitting_function(doc.content)  # type: ignore
1✔
265
        docs: List[Document] = []
1✔
266
        for s in splits:
1✔
267
            meta = deepcopy(doc.meta)
1✔
268
            meta["source_id"] = doc.id
1✔
269
            docs.append(Document(content=s, meta=meta))
1✔
270
        return docs
1✔
271

272
    def _concatenate_units(
1✔
273
        self, elements: List[str], split_length: int, split_overlap: int, split_threshold: int
274
    ) -> Tuple[List[str], List[int], List[int]]:
275
        """
276
        Concatenates the elements into parts of split_length units.
277

278
        Keeps track of the original page number that each element belongs. If the length of the current units is less
279
        than the pre-defined `split_threshold`, it does not create a new split. Instead, it concatenates the current
280
        units with the last split, preventing the creation of excessively small splits.
281
        """
282

283
        text_splits: List[str] = []
1✔
284
        splits_pages: List[int] = []
1✔
285
        splits_start_idxs: List[int] = []
1✔
286
        cur_start_idx = 0
1✔
287
        cur_page = 1
1✔
288
        segments = windowed(elements, n=split_length, step=split_length - split_overlap)
1✔
289

290
        for seg in segments:
1✔
291
            current_units = [unit for unit in seg if unit is not None]
1✔
292
            txt = "".join(current_units)
1✔
293

294
            # check if length of current units is below split_threshold
295
            if len(current_units) < split_threshold and len(text_splits) > 0:
1✔
296
                # concatenate the last split with the current one
297
                text_splits[-1] += txt
1✔
298

299
            # NOTE: This line skips documents that have content=""
300
            elif len(txt) > 0:
1✔
301
                text_splits.append(txt)
1✔
302
                splits_pages.append(cur_page)
1✔
303
                splits_start_idxs.append(cur_start_idx)
1✔
304

305
            processed_units = current_units[: split_length - split_overlap]
1✔
306
            cur_start_idx += len("".join(processed_units))
1✔
307

308
            if self.split_by == "page":
1✔
309
                num_page_breaks = len(processed_units)
1✔
310
            else:
311
                num_page_breaks = sum(processed_unit.count("\f") for processed_unit in processed_units)
1✔
312

313
            cur_page += num_page_breaks
1✔
314

315
        return text_splits, splits_pages, splits_start_idxs
1✔
316

317
    def _create_docs_from_splits(
1✔
318
        self, text_splits: List[str], splits_pages: List[int], splits_start_idxs: List[int], meta: Dict[str, Any]
319
    ) -> List[Document]:
320
        """
321
        Creates Document objects from splits enriching them with page number and the metadata of the original document.
322
        """
323
        documents: List[Document] = []
1✔
324

325
        for i, (txt, split_idx) in enumerate(zip(text_splits, splits_start_idxs)):
1✔
326
            meta = deepcopy(meta)
1✔
327
            doc = Document(content=txt, meta=meta)
1✔
328
            doc.meta["page_number"] = splits_pages[i]
1✔
329
            doc.meta["split_id"] = i
1✔
330
            doc.meta["split_idx_start"] = split_idx
1✔
331
            documents.append(doc)
1✔
332

333
            if self.split_overlap <= 0:
1✔
334
                continue
1✔
335

336
            doc.meta["_split_overlap"] = []
1✔
337

338
            if i == 0:
1✔
339
                continue
1✔
340

341
            doc_start_idx = splits_start_idxs[i]
1✔
342
            previous_doc = documents[i - 1]
1✔
343
            previous_doc_start_idx = splits_start_idxs[i - 1]
1✔
344
            self._add_split_overlap_information(doc, doc_start_idx, previous_doc, previous_doc_start_idx)
1✔
345

346
        return documents
1✔
347

348
    @staticmethod
1✔
349
    def _add_split_overlap_information(
1✔
350
        current_doc: Document, current_doc_start_idx: int, previous_doc: Document, previous_doc_start_idx: int
351
    ):
352
        """
353
        Adds split overlap information to the current and previous Document's meta.
354

355
        :param current_doc: The Document that is being split.
356
        :param current_doc_start_idx: The starting index of the current Document.
357
        :param previous_doc: The Document that was split before the current Document.
358
        :param previous_doc_start_idx: The starting index of the previous Document.
359
        """
360
        overlapping_range = (current_doc_start_idx - previous_doc_start_idx, len(previous_doc.content))  # type: ignore
1✔
361

362
        if overlapping_range[0] < overlapping_range[1]:
1✔
363
            overlapping_str = previous_doc.content[overlapping_range[0] : overlapping_range[1]]  # type: ignore
1✔
364

365
            if current_doc.content.startswith(overlapping_str):  # type: ignore
1✔
366
                # add split overlap information to this Document regarding the previous Document
367
                current_doc.meta["_split_overlap"].append({"doc_id": previous_doc.id, "range": overlapping_range})
1✔
368

369
                # add split overlap information to previous Document regarding this Document
370
                overlapping_range = (0, overlapping_range[1] - overlapping_range[0])
1✔
371
                previous_doc.meta["_split_overlap"].append({"doc_id": current_doc.id, "range": overlapping_range})
1✔
372

373
    def to_dict(self) -> Dict[str, Any]:
1✔
374
        """
375
        Serializes the component to a dictionary.
376
        """
377
        serialized = default_to_dict(
1✔
378
            self,
379
            split_by=self.split_by,
380
            split_length=self.split_length,
381
            split_overlap=self.split_overlap,
382
            split_threshold=self.split_threshold,
383
            respect_sentence_boundary=self.respect_sentence_boundary,
384
            language=self.language,
385
            use_split_rules=self.use_split_rules,
386
            extend_abbreviations=self.extend_abbreviations,
387
        )
388
        if self.splitting_function:
1✔
389
            serialized["init_parameters"]["splitting_function"] = serialize_callable(self.splitting_function)
1✔
390
        return serialized
1✔
391

392
    @classmethod
1✔
393
    def from_dict(cls, data: Dict[str, Any]) -> "DocumentSplitter":
1✔
394
        """
395
        Deserializes the component from a dictionary.
396
        """
397
        init_params = data.get("init_parameters", {})
1✔
398

399
        splitting_function = init_params.get("splitting_function", None)
1✔
400
        if splitting_function:
1✔
401
            init_params["splitting_function"] = deserialize_callable(splitting_function)
1✔
402

403
        return default_from_dict(cls, data)
1✔
404

405
    @staticmethod
1✔
406
    def _concatenate_sentences_based_on_word_amount(
1✔
407
        sentences: List[str], split_length: int, split_overlap: int
408
    ) -> Tuple[List[str], List[int], List[int]]:
409
        """
410
        Groups the sentences into chunks of `split_length` words while respecting sentence boundaries.
411

412
        This function is only used when splitting by `word` and `respect_sentence_boundary` is set to `True`, i.e.:
413
        with NLTK sentence tokenizer.
414

415
        :param sentences: The list of sentences to split.
416
        :param split_length: The maximum number of words in each split.
417
        :param split_overlap: The number of overlapping words in each split.
418
        :returns: A tuple containing the concatenated sentences, the start page numbers, and the start indices.
419
        """
420
        # chunk information
421
        chunk_word_count = 0
1✔
422
        chunk_starting_page_number = 1
1✔
423
        chunk_start_idx = 0
1✔
424
        current_chunk: List[str] = []
1✔
425
        # output lists
426
        split_start_page_numbers = []
1✔
427
        list_of_splits: List[List[str]] = []
1✔
428
        split_start_indices = []
1✔
429

430
        for sentence_idx, sentence in enumerate(sentences):
1✔
431
            current_chunk.append(sentence)
1✔
432
            chunk_word_count += len(sentence.split())
1✔
433
            next_sentence_word_count = (
1✔
434
                len(sentences[sentence_idx + 1].split()) if sentence_idx < len(sentences) - 1 else 0
435
            )
436

437
            # Number of words in the current chunk plus the next sentence is larger than the split_length,
438
            # or we reached the last sentence
439
            if (chunk_word_count + next_sentence_word_count) > split_length or sentence_idx == len(sentences) - 1:
1✔
440
                #  Save current chunk and start a new one
441
                list_of_splits.append(current_chunk)
1✔
442
                split_start_page_numbers.append(chunk_starting_page_number)
1✔
443
                split_start_indices.append(chunk_start_idx)
1✔
444

445
                # Get the number of sentences that overlap with the next chunk
446
                num_sentences_to_keep = DocumentSplitter._number_of_sentences_to_keep(
1✔
447
                    sentences=current_chunk, split_length=split_length, split_overlap=split_overlap
448
                )
449
                # Set up information for the new chunk
450
                if num_sentences_to_keep > 0:
1✔
451
                    # Processed sentences are the ones that are not overlapping with the next chunk
452
                    processed_sentences = current_chunk[:-num_sentences_to_keep]
1✔
453
                    chunk_starting_page_number += sum(sent.count("\f") for sent in processed_sentences)
1✔
454
                    chunk_start_idx += len("".join(processed_sentences))
1✔
455
                    # Next chunk starts with the sentences that were overlapping with the previous chunk
456
                    current_chunk = current_chunk[-num_sentences_to_keep:]
1✔
457
                    chunk_word_count = sum(len(s.split()) for s in current_chunk)
1✔
458
                else:
459
                    # Here processed_sentences is the same as current_chunk since there is no overlap
460
                    chunk_starting_page_number += sum(sent.count("\f") for sent in current_chunk)
1✔
461
                    chunk_start_idx += len("".join(current_chunk))
1✔
462
                    current_chunk = []
1✔
463
                    chunk_word_count = 0
1✔
464

465
        # Concatenate the sentences together within each split
466
        text_splits = []
1✔
467
        for split in list_of_splits:
1✔
468
            text = "".join(split)
1✔
469
            if len(text) > 0:
1✔
470
                text_splits.append(text)
1✔
471

472
        return text_splits, split_start_page_numbers, split_start_indices
1✔
473

474
    @staticmethod
1✔
475
    def _number_of_sentences_to_keep(sentences: List[str], split_length: int, split_overlap: int) -> int:
1✔
476
        """
477
        Returns the number of sentences to keep in the next chunk based on the `split_overlap` and `split_length`.
478

479
        :param sentences: The list of sentences to split.
480
        :param split_length: The maximum number of words in each split.
481
        :param split_overlap: The number of overlapping words in each split.
482
        :returns: The number of sentences to keep in the next chunk.
483
        """
484
        # If the split_overlap is 0, we don't need to keep any sentences
485
        if split_overlap == 0:
1✔
486
            return 0
1✔
487

488
        num_sentences_to_keep = 0
1✔
489
        num_words = 0
1✔
490
        # Next overlapping Document should not start exactly the same as the previous one, so we skip the first sentence
491
        for sent in reversed(sentences[1:]):
1✔
492
            num_words += len(sent.split())
1✔
493
            # If the number of words is larger than the split_length then don't add any more sentences
494
            if num_words > split_length:
1✔
495
                break
1✔
496
            num_sentences_to_keep += 1
1✔
497
            if num_words > split_overlap:
1✔
498
                break
1✔
499
        return num_sentences_to_keep
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc