• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 18592817487

17 Oct 2025 12:33PM UTC coverage: 92.2% (+0.1%) from 92.062%
18592817487

Pull #9859

github

web-flow
Merge f20ff2b98 into a43c47b63
Pull Request #9859: feat: Add FallbackChatGenerator

13346 of 14475 relevant lines covered (92.2%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.45
haystack/components/converters/csv.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import csv
1✔
6
import io
1✔
7
import os
1✔
8
from pathlib import Path
1✔
9
from typing import Any, Literal, Optional, Union
1✔
10

11
from haystack import Document, component, logging
1✔
12
from haystack.components.converters.utils import get_bytestream_from_source, normalize_metadata
1✔
13
from haystack.dataclasses import ByteStream
1✔
14

15
logger = logging.getLogger(__name__)
1✔
16

17
_ROW_MODE_SIZE_WARN_BYTES = 5 * 1024 * 1024  # ~5MB; warn when parsing rows might be memory-heavy
1✔
18

19

20
@component
1✔
21
class CSVToDocument:
1✔
22
    """
23
    Converts CSV files to Documents.
24

25
    By default, it uses UTF-8 encoding when converting files but
26
    you can also set a custom encoding.
27
    It can attach metadata to the resulting documents.
28

29
    ### Usage example
30

31
    ```python
32
    from haystack.components.converters.csv import CSVToDocument
33
    converter = CSVToDocument()
34
    results = converter.run(sources=["sample.csv"], meta={"date_added": datetime.now().isoformat()})
35
    documents = results["documents"]
36
    print(documents[0].content)
37
    # 'col1,col2\\nrow1,row1\\nrow2,row2\\n'
38
    ```
39
    """
40

41
    def __init__(
1✔
42
        self,
43
        encoding: str = "utf-8",
44
        store_full_path: bool = False,
45
        *,
46
        conversion_mode: Literal["file", "row"] = "file",
47
        delimiter: str = ",",
48
        quotechar: str = '"',
49
    ):
50
        """
51
        Creates a CSVToDocument component.
52

53
        :param encoding:
54
            The encoding of the csv files to convert.
55
            If the encoding is specified in the metadata of a source ByteStream,
56
            it overrides this value.
57
        :param store_full_path:
58
            If True, the full path of the file is stored in the metadata of the document.
59
            If False, only the file name is stored.
60
        :param conversion_mode:
61
            - "file" (default): one Document per CSV file whose content is the raw CSV text.
62
            - "row": convert each CSV row to its own Document (requires `content_column` in `run()`).
63
        :param delimiter:
64
            CSV delimiter used when parsing in row mode (passed to ``csv.DictReader``).
65
        :param quotechar:
66
            CSV quote character used when parsing in row mode (passed to ``csv.DictReader``).
67
        """
68
        self.encoding = encoding
1✔
69
        self.store_full_path = store_full_path
1✔
70
        self.conversion_mode = conversion_mode
1✔
71
        self.delimiter = delimiter
1✔
72
        self.quotechar = quotechar
1✔
73

74
        # Basic validation
75
        if len(self.delimiter) != 1:
1✔
76
            raise ValueError("CSVToDocument: delimiter must be a single character.")
1✔
77
        if len(self.quotechar) != 1:
1✔
78
            raise ValueError("CSVToDocument: quotechar must be a single character.")
1✔
79

80
    @component.output_types(documents=list[Document])
1✔
81
    def run(
1✔
82
        self,
83
        sources: list[Union[str, Path, ByteStream]],
84
        *,
85
        content_column: Optional[str] = None,
86
        meta: Optional[Union[dict[str, Any], list[dict[str, Any]]]] = None,
87
    ):
88
        """
89
        Converts CSV files to a Document (file mode) or to one Document per row (row mode).
90

91
        :param sources:
92
            List of file paths or ByteStream objects.
93
        :param content_column:
94
            **Required when** ``conversion_mode="row"``.
95
            The column name whose values become ``Document.content`` for each row.
96
            The column must exist in the CSV header.
97
        :param meta:
98
            Optional metadata to attach to the documents.
99
            This value can be either a list of dictionaries or a single dictionary.
100
            If it's a single dictionary, its content is added to the metadata of all produced documents.
101
            If it's a list, the length of the list must match the number of sources, because the two lists will
102
            be zipped.
103
            If `sources` contains ByteStream objects, their `meta` will be added to the output documents.
104
        :returns:
105
            A dictionary with the following keys:
106
            - `documents`: Created documents
107
        """
108
        documents: list[Document] = []
1✔
109

110
        meta_list = normalize_metadata(meta, sources_count=len(sources))
1✔
111

112
        for source, metadata in zip(sources, meta_list):
1✔
113
            try:
1✔
114
                bytestream = get_bytestream_from_source(source)
1✔
115
            except Exception as e:
1✔
116
                logger.warning("Could not read {source}. Skipping it. Error: {error}", source=source, error=e)
1✔
117
                continue
1✔
118

119
            try:
1✔
120
                encoding = bytestream.meta.get("encoding", self.encoding)
1✔
121
                raw = io.BytesIO(bytestream.data).getvalue()
1✔
122
                data = raw.decode(encoding=encoding)
1✔
123
            except Exception as e:
1✔
124
                logger.warning(
1✔
125
                    "Could not convert file {source}. Skipping it. Error message: {error}", source=source, error=e
126
                )
127
                continue
1✔
128

129
            merged_metadata = {**bytestream.meta, **metadata}
1✔
130

131
            if not self.store_full_path and "file_path" in bytestream.meta:
1✔
132
                file_path = bytestream.meta.get("file_path")
1✔
133
                if file_path:  # Ensure the value is not None for pylint
1✔
134
                    merged_metadata["file_path"] = os.path.basename(file_path)
1✔
135

136
            # Mode: file (backward-compatible default) -> one Document per file
137
            if self.conversion_mode == "file":
1✔
138
                documents.append(Document(content=data, meta=merged_metadata))
1✔
139
                continue
1✔
140

141
            # --- ROW MODE (strict) ---
142
            # Require content_column in run(); no fallback
143
            if not content_column:
1✔
144
                raise ValueError(
1✔
145
                    "CSVToDocument(row): 'content_column' is required in run() when conversion_mode='row'."
146
                )
147

148
            # Warn for large CSVs in row mode (memory consideration)
149
            try:
1✔
150
                size_bytes = len(raw)
1✔
151
                if size_bytes > _ROW_MODE_SIZE_WARN_BYTES:
1✔
152
                    logger.warning(
1✔
153
                        "CSVToDocument(row): parsing a large CSV (~{mb:.1f} MB). "
154
                        "Consider chunking/streaming if you hit memory issues.",
155
                        mb=size_bytes / (1024 * 1024),
156
                    )
157
            except Exception:
×
158
                pass
×
159

160
            # Create DictReader; if this fails, raise (no fallback)
161
            try:
1✔
162
                reader = csv.DictReader(io.StringIO(data), delimiter=self.delimiter, quotechar=self.quotechar)
1✔
163
            except Exception as e:
1✔
164
                raise RuntimeError(f"CSVToDocument(row): could not parse CSV rows for {source}: {e}") from e
1✔
165

166
            # Validate header contains content_column; strict error if missing
167
            header = reader.fieldnames or []
1✔
168
            if content_column not in header:
1✔
169
                raise ValueError(
1✔
170
                    f"CSVToDocument(row): content_column='{content_column}' not found in header "
171
                    f"for {source}. Available columns: {header}"
172
                )
173

174
            # Build documents; if a row processing fails, raise immediately (no skip)
175
            for i, row in enumerate(reader):
1✔
176
                try:
1✔
177
                    doc = self._build_document_from_row(
1✔
178
                        row=row, base_meta=merged_metadata, row_index=i, content_column=content_column
179
                    )
180
                except Exception as e:
×
181
                    raise RuntimeError(f"CSVToDocument(row): failed to process row {i} for {source}: {e}") from e
×
182
                documents.append(doc)
1✔
183

184
        return {"documents": documents}
1✔
185

186
    # ----- helpers -----
187
    def _safe_value(self, value: Any) -> str:
1✔
188
        """Normalize CSV cell values: None -> '', everything -> str."""
189
        return "" if value is None else str(value)
1✔
190

191
    def _build_document_from_row(
1✔
192
        self, row: dict[str, Any], base_meta: dict[str, Any], row_index: int, content_column: str
193
    ) -> Document:
194
        """
195
        Build a ``Document`` from one parsed CSV row.
196

197
        :param row: Mapping of column name to cell value for the current row
198
            (as produced by ``csv.DictReader``).
199
        :param base_meta: File-level and user-provided metadata to start from
200
            (for example: ``file_path``, ``encoding``).
201
        :param row_index: Zero-based row index in the CSV; stored as
202
            ``row_number`` in the output document's metadata.
203
        :param content_column: Column name to use for ``Document.content``.
204
        :returns: A ``Document`` with chosen content and merged metadata.
205
            Remaining row columns are added to ``meta`` with collision-safe
206
            keys (prefixed with ``csv_`` if needed).
207
        """
208
        row_meta = dict(base_meta)
1✔
209

210
        # content (strict: content_column must exist; validated by caller)
211
        content = self._safe_value(row.get(content_column))
1✔
212

213
        # merge remaining columns into meta with collision handling
214
        for k, v in row.items():
1✔
215
            if k == content_column:
1✔
216
                continue
1✔
217
            key_to_use = k
1✔
218
            if key_to_use in row_meta:
1✔
219
                # Avoid clobbering existing meta like file_path/encoding; prefix and de-dupe
220
                base_key = f"csv_{key_to_use}"
1✔
221
                key_to_use = base_key
1✔
222
                suffix = 1
1✔
223
                while key_to_use in row_meta:
1✔
224
                    key_to_use = f"{base_key}_{suffix}"
1✔
225
                    suffix += 1
1✔
226
            row_meta[key_to_use] = self._safe_value(v)
1✔
227

228
        row_meta["row_number"] = row_index
1✔
229
        return Document(content=content, meta=row_meta)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc