• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

petbox-dev / tafra / 23517095938

24 Mar 2026 11:25PM UTC coverage: 92.425% (+0.1%) from 92.318%
23517095938

Pull #25

github

web-flow
Merge 4e69d6e88 into 5bc0bd561
Pull Request #25: fix: StringDType / <U interop and left join null handling (v2.2.1)

52 of 53 new or added lines in 2 files covered. (98.11%)

56 existing lines in 3 files now uncovered.

1391 of 1505 relevant lines covered (92.43%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.72
/tafra/csvreader.py
1
"""
2
Tafra: a minimalist dataframe
3

4
Copyright (c) 2020 Derrick W. Turk and David S. Fulford
5

6
Author
7
------
8
Derrick W. Turk
9
David S. Fulford
10

11
Notes
12
-----
13
Created on April 25, 2020
14
"""
15

16
from __future__ import annotations
1✔
17

18
import csv
1✔
19
import dataclasses as dc
1✔
20
from enum import Enum, auto
1✔
21
from io import TextIOWrapper
1✔
22
from pathlib import Path
1✔
23
from typing import Any, Callable, Sequence, cast
1✔
24

25
import numpy as np
1✔
26

27

28
# this doesn't type well in Python
29
@dc.dataclass(frozen=True)
1✔
30
class ReadableType:
1✔
31
    dtype: Any
1✔
32
    parse: Callable[[str], Any]
1✔
33

34

35
def _parse_bool(val: str) -> bool:
1✔
36
    folded = val.casefold()
1✔
37
    if folded in ("false", "no", "f"):
1✔
38
        return False
1✔
39
    if folded in ("true", "yes", "t"):
1✔
40
        return True
1✔
41
    raise ValueError("not a boolean")
1✔
42

43

44
# numpy-stubs is a lie about many of these, hence the type: ignore spam
45
_TYPE_PRECEDENCE: list[ReadableType] = [
1✔
46
    ReadableType(np.int32, cast(Callable[[str], Any], np.int32)),
47
    ReadableType(np.int64, cast(Callable[[str], Any], np.int64)),
48
    # np.float32, # nervous about ever inferring this
49
    ReadableType(np.float64, cast(Callable[[str], Any], np.float64)),
50
    ReadableType(bool, _parse_bool),
51
    # TODO: date,
52
    # TODO: datetime,
53
]
54

55
_TYPE_OBJECT: ReadableType = ReadableType(np.dtypes.StringDType(), lambda x: x)
1✔
56

57

58
class ReaderState(Enum):
1✔
59
    AWAIT_GUESSABLE = auto()
1✔
60
    EARLY_EOF = auto()
1✔
61
    GUESS = auto()
1✔
62
    READ = auto()
1✔
63
    EOF = auto()
1✔
64
    DONE = auto()
1✔
65

66

67
class CSVReader:
1✔
68
    def __init__(
1✔
69
        self,
70
        source: str | Path | TextIOWrapper,
71
        guess_rows: int = 5,
72
        missing: str | None = "",
73
        **csvkw: Any,
74
    ) -> None:
75
        if isinstance(source, (str, Path)):
1✔
76
            self._stream = open(source, newline="")
1✔
77
            self._should_close = True
1✔
78
        elif isinstance(source, TextIOWrapper):
1✔
79
            source.reconfigure(newline="")
1✔
80
            self._stream = source
1✔
81
            self._should_close = False
1✔
82
        else:
83
            raise TypeError(
1✔
84
                f"`source` must be `str`, `Path`, or `TextIOWrapper`, got `{type(source)}`"
85
            )
86
        reader = csv.reader(self._stream, dialect="excel", **csvkw)
1✔
87
        try:
1✔
88
            self._header = _unique_header(next(reader))
1✔
89
        except StopIteration:
1✔
90
            if self._should_close:
1✔
91
                self._stream.close()
1✔
92
            raise ValueError("CSV source is empty: no header row found")
1✔
93
        self._reader = (self._decode_missing(t) for t in reader)
1✔
94
        self._guess_types = {col: _TYPE_PRECEDENCE[0] for col in self._header}
1✔
95
        self._guess_data: dict[str, list[Any]] = {col: list() for col in self._header}
1✔
96
        self._data: dict[str, list[Any]] = dict()
1✔
97
        self._guess_rows = guess_rows
1✔
98
        self._missing = missing
1✔
99
        self._rows = 0
1✔
100
        self._state = ReaderState.AWAIT_GUESSABLE
1✔
101

102
    def read(self) -> dict[str, np.ndarray[Any, Any]]:
1✔
103
        while self._state != ReaderState.DONE:
1✔
104
            self._step()
1✔
105
        return self._finalize()
1✔
106

107
    def _step(self) -> None:
1✔
108
        if self._state == ReaderState.AWAIT_GUESSABLE:
1✔
109
            self.state_await_guessable()
1✔
110
            return
1✔
111

112
        if self._state == ReaderState.GUESS:
1✔
113
            self.state_guess()
1✔
114
            return
1✔
115

116
        if self._state == ReaderState.READ:
1✔
117
            self.state_read()
1✔
118
            return
1✔
119

120
        if self._state == ReaderState.EARLY_EOF:
1✔
121
            self.state_early_eof()
1✔
122
            return
1✔
123

124
        if self._state == ReaderState.EOF:
1✔
125
            self.state_eof()
1✔
126
            return
1✔
127

128
        if self._state == ReaderState.DONE:  # pragma: no cover
129
            return
130

131
    def state_await_guessable(self) -> None:
1✔
132
        try:
1✔
133
            row = next(self._reader)
1✔
134
        except StopIteration:
1✔
135
            self._state = ReaderState.EARLY_EOF
1✔
136
            return
1✔
137

138
        self._rows += 1
1✔
139
        if len(row) != len(self._header):
1✔
140
            raise ValueError(f"length of row #{self._rows} does not match header length")
1✔
141

142
        for col, val in zip(self._header, row):
1✔
143
            self._guess_data[col].append(val)
1✔
144

145
        if self._rows == self._guess_rows:
1✔
146
            self._state = ReaderState.GUESS
1✔
147

148
    def state_guess(self) -> None:
1✔
149
        for col in self._header:
1✔
150
            ty, parsed = _guess_column(_TYPE_PRECEDENCE, self._guess_data[col])
1✔
151
            self._guess_types[col] = ty
1✔
152
            self._data[col] = parsed
1✔
153
        self._state = ReaderState.READ
1✔
154

155
    def state_read(self) -> None:
1✔
156
        try:
1✔
157
            row = next(self._reader)
1✔
158
        except StopIteration:
1✔
159
            self._state = ReaderState.EOF
1✔
160
            return
1✔
161

162
        self._rows += 1
1✔
163
        if len(row) != len(self._header):
1✔
164
            raise ValueError(f"length of row #{self._rows} does not match header length")
1✔
165

166
        for col, val in zip(self._header, row):
1✔
167
            if val is None:
1✔
UNCOV
168
                self._data[col].append(None)
×
UNCOV
169
                continue
×
170
            try:
1✔
171
                self._data[col].append(self._guess_types[col].parse(val))
1✔
172
            except Exception:
1✔
173
                self._promote(col, val)
1✔
174

175
    def state_early_eof(self) -> None:
1✔
176
        if self._should_close:
1✔
177
            self._stream.close()
1✔
178

179
        for col in self._header:
1✔
180
            ty, parsed = _guess_column(_TYPE_PRECEDENCE, self._guess_data[col])
1✔
181
            self._guess_types[col] = ty
1✔
182
            self._data[col] = parsed
1✔
183

184
        self._state = ReaderState.DONE
1✔
185

186
    def state_eof(self) -> None:
1✔
187
        if self._should_close:
1✔
188
            self._stream.close()
1✔
189
        self._state = ReaderState.DONE
1✔
190

191
    def _promote(self, col: str, val: str | None) -> None:
1✔
192
        ty_ix = _TYPE_PRECEDENCE.index(self._guess_types[col])
1✔
193
        try_next = _TYPE_PRECEDENCE[ty_ix + 1 :]
1✔
194
        stringized = self._encode_missing(self._data[col])
1✔
195
        stringized.append(val)
1✔
196
        ty, parsed = _guess_column(try_next, stringized)
1✔
197
        self._guess_types[col] = ty
1✔
198
        self._data[col] = parsed
1✔
199

200
    def _finalize(self) -> dict[str, np.ndarray[Any, Any]]:
1✔
201
        assert self._state == ReaderState.DONE, "CSVReader is not in DONE state."
1✔
202
        return {
1✔
203
            col: np.array(self._data[col], dtype=self._guess_types[col].dtype)
204
            for col in self._header
205
        }
206

207
    def _decode_missing(self, row: list[str]) -> Sequence[str | None]:
1✔
208
        if self._missing is None:
1✔
209
            return row
1✔
210
        return [v if v != self._missing else None for v in row]
1✔
211

212
    def _encode_missing(self, row: Sequence[Any | None]) -> list[str | None]:
1✔
213
        return [str(v) if v is not None else self._missing for v in row]
1✔
214

215

216
def _unique_header(header: list[str]) -> list[str]:
1✔
217
    uniq: list[str] = list()
1✔
218
    for col in header:
1✔
219
        col_unique = col
1✔
220
        i = 2
1✔
221
        while col_unique in uniq:
1✔
222
            col_unique = f"{col} ({i})"
1✔
223
            i += 1
1✔
224
        uniq.append(col_unique)
1✔
225
    return uniq
1✔
226

227

228
# the "real" return type is a dependent pair (t: ReadableType ** List[t.dtype])
229
def _guess_column(
1✔
230
    precedence: list[ReadableType], vals: list[str | None]
231
) -> tuple[ReadableType, list[Any]]:
232
    for ty in precedence:
1✔
233
        try:
1✔
234
            # mypy doesn't really get that the thing we're mapping is not a method
235
            #   on `ty` but a data member
236
            typed = list(map(ty.parse, vals))  # type: ignore
1✔
237
            return ty, typed
1✔
238
        except Exception:
1✔
239
            continue
1✔
240
    return _TYPE_OBJECT, vals
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc