• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 14064199728

25 Mar 2025 03:52PM UTC coverage: 90.154% (+0.08%) from 90.07%
14064199728

Pull #9055

github

web-flow
Merge eaafb5e56 into e64db6197
Pull Request #9055: Added retries parameters to pipeline.draw()

9898 of 10979 relevant lines covered (90.15%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.26
haystack/components/preprocessors/csv_document_splitter.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
from io import StringIO
1✔
6
from typing import Any, Dict, List, Literal, Optional, Tuple, get_args
1✔
7

8
from haystack import Document, component, logging
1✔
9
from haystack.lazy_imports import LazyImport
1✔
10

11
with LazyImport("Run 'pip install pandas'") as pandas_import:
1✔
12
    import pandas as pd
1✔
13

14
logger = logging.getLogger(__name__)
1✔
15

16
SplitMode = Literal["threshold", "row-wise"]
1✔
17

18

19
@component
1✔
20
class CSVDocumentSplitter:
1✔
21
    """
22
    A component for splitting CSV documents into sub-tables based on split arguments.
23

24
    The splitter supports two modes of operation:
25
    - identify consecutive empty rows or columns that exceed a given threshold
26
    and uses them as delimiters to segment the document into smaller tables.
27
    - split each row into a separate sub-table, represented as a Document.
28

29
    """
30

31
    def __init__(
1✔
32
        self,
33
        row_split_threshold: Optional[int] = 2,
34
        column_split_threshold: Optional[int] = 2,
35
        read_csv_kwargs: Optional[Dict[str, Any]] = None,
36
        split_mode: SplitMode = "threshold",
37
    ) -> None:
38
        """
39
        Initializes the CSVDocumentSplitter component.
40

41
        :param row_split_threshold: The minimum number of consecutive empty rows required to trigger a split.
42
        :param column_split_threshold: The minimum number of consecutive empty columns required to trigger a split.
43
        :param read_csv_kwargs: Additional keyword arguments to pass to `pandas.read_csv`.
44
            By default, the component with options:
45
            - `header=None`
46
            - `skip_blank_lines=False` to preserve blank lines
47
            - `dtype=object` to prevent type inference (e.g., converting numbers to floats).
48
            See https://pandas.pydata.org/docs/reference/api/pandas.read_csv.html for more information.
49
        :param split_mode:
50
            If `threshold`, the component will split the document based on the number of
51
            consecutive empty rows or columns that exceed the `row_split_threshold` or `column_split_threshold`.
52
            If `row-wise`, the component will split each row into a separate sub-table.
53
        """
54
        pandas_import.check()
1✔
55
        if split_mode not in get_args(SplitMode):
1✔
56
            raise ValueError(
1✔
57
                f"Split mode '{split_mode}' not recognized. Choose one among: {', '.join(get_args(SplitMode))}."
58
            )
59
        if row_split_threshold is not None and row_split_threshold < 1:
1✔
60
            raise ValueError("row_split_threshold must be greater than 0")
1✔
61

62
        if column_split_threshold is not None and column_split_threshold < 1:
1✔
63
            raise ValueError("column_split_threshold must be greater than 0")
1✔
64

65
        if row_split_threshold is None and column_split_threshold is None:
1✔
66
            raise ValueError("At least one of row_split_threshold or column_split_threshold must be specified.")
1✔
67

68
        self.row_split_threshold = row_split_threshold
1✔
69
        self.column_split_threshold = column_split_threshold
1✔
70
        self.read_csv_kwargs = read_csv_kwargs or {}
1✔
71
        self.split_mode = split_mode
1✔
72

73
    @component.output_types(documents=List[Document])
1✔
74
    def run(self, documents: List[Document]) -> Dict[str, List[Document]]:
1✔
75
        """
76
        Processes and splits a list of CSV documents into multiple sub-tables.
77

78
        **Splitting Process:**
79
        1. Applies a row-based split if `row_split_threshold` is provided.
80
        2. Applies a column-based split if `column_split_threshold` is provided.
81
        3. If both thresholds are specified, performs a recursive split by rows first, then columns, ensuring
82
           further fragmentation of any sub-tables that still contain empty sections.
83
        4. Sorts the resulting sub-tables based on their original positions within the document.
84

85
        :param documents: A list of Documents containing CSV-formatted content.
86
            Each document is assumed to contain one or more tables separated by empty rows or columns.
87

88
        :return:
89
            A dictionary with a key `"documents"`, mapping to a list of new `Document` objects,
90
            each representing an extracted sub-table from the original CSV.
91
            The metadata of each document includes:
92
                - A field `source_id` to track the original document.
93
                - A field `row_idx_start` to indicate the starting row index of the sub-table in the original table.
94
                - A field `col_idx_start` to indicate the starting column index of the sub-table in the original table.
95
                - A field `split_id` to indicate the order of the split in the original document.
96
                - All other metadata copied from the original document.
97

98
        - If a document cannot be processed, it is returned unchanged.
99
        - The `meta` field from the original document is preserved in the split documents.
100
        """
101
        if len(documents) == 0:
1✔
102
            return {"documents": documents}
1✔
103

104
        resolved_read_csv_kwargs = {"header": None, "skip_blank_lines": False, "dtype": object, **self.read_csv_kwargs}
1✔
105

106
        split_documents = []
1✔
107
        split_dfs = []
1✔
108
        for document in documents:
1✔
109
            try:
1✔
110
                df = pd.read_csv(StringIO(document.content), **resolved_read_csv_kwargs)  # type: ignore
1✔
111
            except Exception as e:
1✔
112
                logger.error(f"Error processing document {document.id}. Keeping it, but skipping splitting. Error: {e}")
1✔
113
                split_documents.append(document)
1✔
114
                continue
1✔
115

116
            if self.split_mode == "row-wise":
1✔
117
                # each row is a separate sub-table
118
                split_dfs = self._split_by_row(df=df)
1✔
119

120
            elif self.split_mode == "threshold":
1✔
121
                if self.row_split_threshold is not None and self.column_split_threshold is None:
1✔
122
                    # split by rows
123
                    split_dfs = self._split_dataframe(df=df, split_threshold=self.row_split_threshold, axis="row")
×
124
                elif self.column_split_threshold is not None and self.row_split_threshold is None:
1✔
125
                    # split by columns
126
                    split_dfs = self._split_dataframe(df=df, split_threshold=self.column_split_threshold, axis="column")
×
127
                else:
128
                    # recursive split
129
                    split_dfs = self._recursive_split(
1✔
130
                        df=df,
131
                        row_split_threshold=self.row_split_threshold,  # type: ignore
132
                        column_split_threshold=self.column_split_threshold,  # type: ignore
133
                    )
134

135
            # check if no sub-tables were found
136
            if len(split_dfs) == 0:
1✔
137
                logger.warning(
×
138
                    "No sub-tables found while splitting CSV Document with id {doc_id}. Skipping document.",
139
                    doc_id=document.id,
140
                )
141
                continue
×
142

143
            # Sort split_dfs first by row index, then by column index
144
            split_dfs.sort(key=lambda dataframe: (dataframe.index[0], dataframe.columns[0]))
1✔
145

146
            for split_id, split_df in enumerate(split_dfs):
1✔
147
                split_documents.append(
1✔
148
                    Document(
149
                        content=split_df.to_csv(index=False, header=False, lineterminator="\n"),
150
                        meta={
151
                            **document.meta.copy(),
152
                            "source_id": document.id,
153
                            "row_idx_start": int(split_df.index[0]),
154
                            "col_idx_start": int(split_df.columns[0]),
155
                            "split_id": split_id,
156
                        },
157
                    )
158
                )
159

160
        return {"documents": split_documents}
1✔
161

162
    @staticmethod
1✔
163
    def _find_split_indices(
1✔
164
        df: "pd.DataFrame", split_threshold: int, axis: Literal["row", "column"]
165
    ) -> List[Tuple[int, int]]:
166
        """
167
        Finds the indices of consecutive empty rows or columns in a DataFrame.
168

169
        :param df: DataFrame to split.
170
        :param split_threshold: Minimum number of consecutive empty rows or columns to trigger a split.
171
        :param axis: Axis along which to find empty elements. Either "row" or "column".
172
        :return: List of indices where consecutive empty rows or columns start.
173
        """
174
        if axis == "row":
1✔
175
            empty_elements = df[df.isnull().all(axis=1)].index.tolist()
1✔
176
        else:
177
            empty_elements = df.columns[df.isnull().all(axis=0)].tolist()
1✔
178

179
        # If no empty elements found, return empty list
180
        if len(empty_elements) == 0:
1✔
181
            return []
1✔
182

183
        # Identify groups of consecutive empty elements
184
        split_indices = []
1✔
185
        consecutive_count = 1
1✔
186
        start_index = empty_elements[0]
1✔
187

188
        for i in range(1, len(empty_elements)):
1✔
189
            if empty_elements[i] == empty_elements[i - 1] + 1:
1✔
190
                consecutive_count += 1
1✔
191
            else:
192
                if consecutive_count >= split_threshold:
1✔
193
                    split_indices.append((start_index, empty_elements[i - 1]))
1✔
194
                consecutive_count = 1
1✔
195
                start_index = empty_elements[i]
1✔
196

197
        # Handle the last group of consecutive elements
198
        if consecutive_count >= split_threshold:
1✔
199
            split_indices.append((start_index, empty_elements[-1]))
1✔
200

201
        return split_indices
1✔
202

203
    def _split_dataframe(
1✔
204
        self, df: "pd.DataFrame", split_threshold: int, axis: Literal["row", "column"]
205
    ) -> List["pd.DataFrame"]:
206
        """
207
        Splits a DataFrame into sub-tables based on consecutive empty rows or columns exceeding `split_threshold`.
208

209
        :param df: DataFrame to split.
210
        :param split_threshold: Minimum number of consecutive empty rows or columns to trigger a split.
211
        :param axis: Axis along which to split. Either "row" or "column".
212
        :return: List of split DataFrames.
213
        """
214
        # Find indices of consecutive empty rows or columns
215
        split_indices = self._find_split_indices(df=df, split_threshold=split_threshold, axis=axis)
1✔
216

217
        # If no split_indices are found, return the original DataFrame
218
        if len(split_indices) == 0:
1✔
219
            return [df]
1✔
220

221
        # Split the DataFrame at identified indices
222
        sub_tables = []
1✔
223
        table_start_idx = 0
1✔
224
        df_length = df.shape[0] if axis == "row" else df.shape[1]
1✔
225
        for empty_start_idx, empty_end_idx in split_indices + [(df_length, df_length)]:
1✔
226
            # Avoid empty splits
227
            if empty_start_idx - table_start_idx >= 1:
1✔
228
                if axis == "row":
1✔
229
                    sub_table = df.iloc[table_start_idx:empty_start_idx]
1✔
230
                else:
231
                    sub_table = df.iloc[:, table_start_idx:empty_start_idx]
1✔
232
                if not sub_table.empty:
1✔
233
                    sub_tables.append(sub_table)
1✔
234
            table_start_idx = empty_end_idx + 1
1✔
235

236
        return sub_tables
1✔
237

238
    def _recursive_split(
1✔
239
        self, df: "pd.DataFrame", row_split_threshold: int, column_split_threshold: int
240
    ) -> List["pd.DataFrame"]:
241
        """
242
        Recursively splits a DataFrame.
243

244
        Recursively splits a DataFrame first by empty rows, then by empty columns, and repeats the process
245
        until no more splits are possible. Returns a list of DataFrames, each representing a fully separated sub-table.
246

247
        :param df: A Pandas DataFrame representing a table (or multiple tables) extracted from a CSV.
248
        :param row_split_threshold: The minimum number of consecutive empty rows required to trigger a split.
249
        :param column_split_threshold: The minimum number of consecutive empty columns to trigger a split.
250
        """
251

252
        # Step 1: Split by rows
253
        new_sub_tables = self._split_dataframe(df=df, split_threshold=row_split_threshold, axis="row")
1✔
254

255
        # Step 2: Split by columns
256
        final_tables = []
1✔
257
        for table in new_sub_tables:
1✔
258
            final_tables.extend(self._split_dataframe(df=table, split_threshold=column_split_threshold, axis="column"))
1✔
259

260
        # Step 3: Recursively reapply splitting checked by whether any new empty rows appear after column split
261
        result = []
1✔
262
        for table in final_tables:
1✔
263
            # Check if there are consecutive rows >= row_split_threshold now present
264
            if len(self._find_split_indices(df=table, split_threshold=row_split_threshold, axis="row")) > 0:
1✔
265
                result.extend(
1✔
266
                    self._recursive_split(
267
                        df=table, row_split_threshold=row_split_threshold, column_split_threshold=column_split_threshold
268
                    )
269
                )
270
            else:
271
                result.append(table)
1✔
272

273
        return result
1✔
274

275
    def _split_by_row(self, df: "pd.DataFrame") -> List["pd.DataFrame"]:
1✔
276
        """Split each CSV row into a separate subtable"""
277
        split_dfs = []
1✔
278
        for idx, row in enumerate(df.itertuples(index=False)):
1✔
279
            split_df = pd.DataFrame(row).T
1✔
280
            split_df.index = [idx]  # Set the index of the new DataFrame to idx
1✔
281
            split_dfs.append(split_df)
1✔
282
        return split_dfs
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc