• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 13818025108

12 Mar 2025 05:09PM UTC coverage: 89.91% (-0.01%) from 89.922%
13818025108

Pull #9031

github

web-flow
Merge f1a017137 into 363ac504d
Pull Request #9031: feat: add `split_by_row` feature to `CSVDocumentSplitter`

9739 of 10832 relevant lines covered (89.91%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.24
haystack/components/preprocessors/csv_document_splitter.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
from io import StringIO
1✔
6
from typing import Any, Dict, List, Literal, Optional, Tuple
1✔
7

8
from haystack import Document, component, logging
1✔
9
from haystack.lazy_imports import LazyImport
1✔
10

11
with LazyImport("Run 'pip install pandas'") as pandas_import:
1✔
12
    import pandas as pd
1✔
13

14
logger = logging.getLogger(__name__)
1✔
15

16

17
@component
1✔
18
class CSVDocumentSplitter:
1✔
19
    """
20
    A component for splitting CSV documents into sub-tables based on split arguments.
21

22
    The splitter supports two modes of operation:
23
    - identify consecutive empty rows or columns that exceed a given threshold
24
    and uses them as delimiters to segment the document into smaller tables.
25
    - split each row into a separate sub-table, represented as a Document.
26

27
    """
28

29
    def __init__(
1✔
30
        self,
31
        row_split_threshold: Optional[int] = 2,
32
        column_split_threshold: Optional[int] = 2,
33
        read_csv_kwargs: Optional[Dict[str, Any]] = None,
34
        split_by_row: bool = False,
35
    ) -> None:
36
        """
37
        Initializes the CSVDocumentSplitter component.
38

39
        :param row_split_threshold: The minimum number of consecutive empty rows required to trigger a split.
40
        :param column_split_threshold: The minimum number of consecutive empty columns required to trigger a split.
41
        :param read_csv_kwargs: Additional keyword arguments to pass to `pandas.read_csv`.
42
            By default, the component with options:
43
            - `header=None`
44
            - `skip_blank_lines=False` to preserve blank lines
45
            - `dtype=object` to prevent type inference (e.g., converting numbers to floats).
46
            See https://pandas.pydata.org/docs/reference/api/pandas.read_csv.html for more information.
47
        :param split_by_row:
48
            If `True`, each row is treated as an individual sub-table.
49
            Overrides `row_split_threshold` and `column_split_threshold`, if enabled.
50
        """
51
        pandas_import.check()
1✔
52
        if row_split_threshold is not None and row_split_threshold < 1:
1✔
53
            raise ValueError("row_split_threshold must be greater than 0")
1✔
54

55
        if column_split_threshold is not None and column_split_threshold < 1:
1✔
56
            raise ValueError("column_split_threshold must be greater than 0")
1✔
57

58
        if row_split_threshold is None and column_split_threshold is None:
1✔
59
            raise ValueError("At least one of row_split_threshold or column_split_threshold must be specified.")
1✔
60

61
        if split_by_row:
1✔
62
            logger.warning("split_by_row is set to True. The other split arguments will be ignored.")
1✔
63

64
        self.row_split_threshold = row_split_threshold
1✔
65
        self.column_split_threshold = column_split_threshold
1✔
66
        self.read_csv_kwargs = read_csv_kwargs or {}
1✔
67
        self.split_by_row = split_by_row
1✔
68

69
    @component.output_types(documents=List[Document])
1✔
70
    def run(self, documents: List[Document]) -> Dict[str, List[Document]]:
1✔
71
        """
72
        Processes and splits a list of CSV documents into multiple sub-tables.
73

74
        **Splitting Process:**
75
        1. Applies a row-based split if `row_split_threshold` is provided.
76
        2. Applies a column-based split if `column_split_threshold` is provided.
77
        3. If both thresholds are specified, performs a recursive split by rows first, then columns, ensuring
78
           further fragmentation of any sub-tables that still contain empty sections.
79
        4. Sorts the resulting sub-tables based on their original positions within the document.
80

81
        :param documents: A list of Documents containing CSV-formatted content.
82
            Each document is assumed to contain one or more tables separated by empty rows or columns.
83

84
        :return:
85
            A dictionary with a key `"documents"`, mapping to a list of new `Document` objects,
86
            each representing an extracted sub-table from the original CSV.
87
            The metadata of each document includes:
88
                - A field `source_id` to track the original document.
89
                - A field `row_idx_start` to indicate the starting row index of the sub-table in the original table.
90
                - A field `col_idx_start` to indicate the starting column index of the sub-table in the original table.
91
                - A field `split_id` to indicate the order of the split in the original document.
92
                - All other metadata copied from the original document.
93

94
        - If a document cannot be processed, it is returned unchanged.
95
        - The `meta` field from the original document is preserved in the split documents.
96
        """
97
        if len(documents) == 0:
1✔
98
            return {"documents": documents}
1✔
99

100
        resolved_read_csv_kwargs = {"header": None, "skip_blank_lines": False, "dtype": object, **self.read_csv_kwargs}
1✔
101

102
        split_documents = []
1✔
103
        for document in documents:
1✔
104
            try:
1✔
105
                df = pd.read_csv(StringIO(document.content), **resolved_read_csv_kwargs)  # type: ignore
1✔
106
            except Exception as e:
1✔
107
                logger.error(f"Error processing document {document.id}. Keeping it, but skipping splitting. Error: {e}")
1✔
108
                split_documents.append(document)
1✔
109
                continue
1✔
110

111
            if self.split_by_row:
1✔
112
                # each row is a separate sub-table
113
                split_dfs = self._split_by_row_mode(df=df)
1✔
114

115
            elif self.row_split_threshold is not None and self.column_split_threshold is None:
1✔
116
                # split by rows
117
                split_dfs = self._split_dataframe(df=df, split_threshold=self.row_split_threshold, axis="row")
×
118
            elif self.column_split_threshold is not None and self.row_split_threshold is None:
1✔
119
                # split by columns
120
                split_dfs = self._split_dataframe(df=df, split_threshold=self.column_split_threshold, axis="column")
×
121
            else:
122
                # recursive split
123
                split_dfs = self._recursive_split(
1✔
124
                    df=df,
125
                    row_split_threshold=self.row_split_threshold,  # type: ignore
126
                    column_split_threshold=self.column_split_threshold,  # type: ignore
127
                )
128

129
            # Sort split_dfs first by row index, then by column index
130
            split_dfs.sort(key=lambda dataframe: (dataframe.index[0], dataframe.columns[0]))
1✔
131

132
            for split_id, split_df in enumerate(split_dfs):
1✔
133
                split_documents.append(
1✔
134
                    Document(
135
                        content=split_df.to_csv(index=False, header=False, lineterminator="\n"),
136
                        meta={
137
                            **document.meta.copy(),
138
                            "source_id": document.id,
139
                            "row_idx_start": int(split_df.index[0]),
140
                            "col_idx_start": int(split_df.columns[0]),
141
                            "split_id": split_id,
142
                        },
143
                    )
144
                )
145

146
        return {"documents": split_documents}
1✔
147

148
    @staticmethod
1✔
149
    def _find_split_indices(
1✔
150
        df: "pd.DataFrame", split_threshold: int, axis: Literal["row", "column"]
151
    ) -> List[Tuple[int, int]]:
152
        """
153
        Finds the indices of consecutive empty rows or columns in a DataFrame.
154

155
        :param df: DataFrame to split.
156
        :param split_threshold: Minimum number of consecutive empty rows or columns to trigger a split.
157
        :param axis: Axis along which to find empty elements. Either "row" or "column".
158
        :return: List of indices where consecutive empty rows or columns start.
159
        """
160
        if axis == "row":
1✔
161
            empty_elements = df[df.isnull().all(axis=1)].index.tolist()
1✔
162
        else:
163
            empty_elements = df.columns[df.isnull().all(axis=0)].tolist()
1✔
164

165
        # If no empty elements found, return empty list
166
        if len(empty_elements) == 0:
1✔
167
            return []
1✔
168

169
        # Identify groups of consecutive empty elements
170
        split_indices = []
1✔
171
        consecutive_count = 1
1✔
172
        start_index = empty_elements[0]
1✔
173

174
        for i in range(1, len(empty_elements)):
1✔
175
            if empty_elements[i] == empty_elements[i - 1] + 1:
1✔
176
                consecutive_count += 1
1✔
177
            else:
178
                if consecutive_count >= split_threshold:
1✔
179
                    split_indices.append((start_index, empty_elements[i - 1]))
1✔
180
                consecutive_count = 1
1✔
181
                start_index = empty_elements[i]
1✔
182

183
        # Handle the last group of consecutive elements
184
        if consecutive_count >= split_threshold:
1✔
185
            split_indices.append((start_index, empty_elements[-1]))
1✔
186

187
        return split_indices
1✔
188

189
    def _split_dataframe(
1✔
190
        self, df: "pd.DataFrame", split_threshold: int, axis: Literal["row", "column"]
191
    ) -> List["pd.DataFrame"]:
192
        """
193
        Splits a DataFrame into sub-tables based on consecutive empty rows or columns exceeding `split_threshold`.
194

195
        :param df: DataFrame to split.
196
        :param split_threshold: Minimum number of consecutive empty rows or columns to trigger a split.
197
        :param axis: Axis along which to split. Either "row" or "column".
198
        :return: List of split DataFrames.
199
        """
200
        # Find indices of consecutive empty rows or columns
201
        split_indices = self._find_split_indices(df=df, split_threshold=split_threshold, axis=axis)
1✔
202

203
        # If no split_indices are found, return the original DataFrame
204
        if len(split_indices) == 0:
1✔
205
            return [df]
1✔
206

207
        # Split the DataFrame at identified indices
208
        sub_tables = []
1✔
209
        table_start_idx = 0
1✔
210
        df_length = df.shape[0] if axis == "row" else df.shape[1]
1✔
211
        for empty_start_idx, empty_end_idx in split_indices + [(df_length, df_length)]:
1✔
212
            # Avoid empty splits
213
            if empty_start_idx - table_start_idx >= 1:
1✔
214
                if axis == "row":
1✔
215
                    sub_table = df.iloc[table_start_idx:empty_start_idx]
1✔
216
                else:
217
                    sub_table = df.iloc[:, table_start_idx:empty_start_idx]
1✔
218
                if not sub_table.empty:
1✔
219
                    sub_tables.append(sub_table)
1✔
220
            table_start_idx = empty_end_idx + 1
1✔
221

222
        return sub_tables
1✔
223

224
    def _recursive_split(
1✔
225
        self, df: "pd.DataFrame", row_split_threshold: int, column_split_threshold: int
226
    ) -> List["pd.DataFrame"]:
227
        """
228
        Recursively splits a DataFrame.
229

230
        Recursively splits a DataFrame first by empty rows, then by empty columns, and repeats the process
231
        until no more splits are possible. Returns a list of DataFrames, each representing a fully separated sub-table.
232

233
        :param df: A Pandas DataFrame representing a table (or multiple tables) extracted from a CSV.
234
        :param row_split_threshold: The minimum number of consecutive empty rows required to trigger a split.
235
        :param column_split_threshold: The minimum number of consecutive empty columns to trigger a split.
236
        """
237

238
        # Step 1: Split by rows
239
        new_sub_tables = self._split_dataframe(df=df, split_threshold=row_split_threshold, axis="row")
1✔
240

241
        # Step 2: Split by columns
242
        final_tables = []
1✔
243
        for table in new_sub_tables:
1✔
244
            final_tables.extend(self._split_dataframe(df=table, split_threshold=column_split_threshold, axis="column"))
1✔
245

246
        # Step 3: Recursively reapply splitting checked by whether any new empty rows appear after column split
247
        result = []
1✔
248
        for table in final_tables:
1✔
249
            # Check if there are consecutive rows >= row_split_threshold now present
250
            if len(self._find_split_indices(df=table, split_threshold=row_split_threshold, axis="row")) > 0:
1✔
251
                result.extend(
1✔
252
                    self._recursive_split(
253
                        df=table, row_split_threshold=row_split_threshold, column_split_threshold=column_split_threshold
254
                    )
255
                )
256
            else:
257
                result.append(table)
1✔
258

259
        return result
1✔
260

261
    def _split_by_row_mode(self, df: "pd.DataFrame") -> List["pd.DataFrame"]:
1✔
262
        """Split each CSV row into a separate subtable"""
263
        try:
1✔
264
            split_dfs = []
1✔
265
            for idx, row in enumerate(df.itertuples(index=False)):
1✔
266
                split_df = pd.DataFrame(row).T
1✔
267
                split_df.index = [idx]  # Set the index of the new DataFrame to idx
1✔
268
                split_dfs.append(split_df)
1✔
269
            return split_dfs
1✔
270
        except Exception as e:
×
271
            logger.warning("Error while splitting CSV rows to documents: {error}", error=e)
×
272
            return []
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc