• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mscaudill / tabbed / 18353692528

08 Oct 2025 06:01PM UTC coverage: 88.766% (-0.2%) from 89.015%
18353692528

push

github

mscaudill
[bugfix]: per issue #21, tabbed now reduces the poll amount to the last row of the data section if the poll amount exceeds the number of sampled

14 of 14 new or added lines in 1 file covered. (100.0%)

45 existing lines in 2 files now uncovered.

561 of 632 relevant lines covered (88.77%)

1.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.74
/src/tabbed/sniffing.py
1
"""Tools for determining the dialect and structure of a csv file that may
2
contain metadata, a header, and a data section.
3
"""
4

5
import warnings
2✔
6
from collections import Counter
2✔
7
from dataclasses import dataclass
2✔
8
from datetime import date, datetime, time
2✔
9
from itertools import chain
2✔
10
from types import SimpleNamespace
2✔
11
from typing import IO
2✔
12

13
import clevercsv
2✔
14
from clevercsv.dialect import SimpleDialect
2✔
15

16
from tabbed.utils import parsing
2✔
17
from tabbed.utils.mixins import ReprMixin
2✔
18
from tabbed.utils.parsing import CellTypes
2✔
19

20

21
@dataclass(frozen=True)
2✔
22
class Header:
2✔
23
    """An immutable dataclass representation of a text file's header.
24

25
    Attributes:
26
        line:
27
            The integer line number of this Header. If None, the header was not
28
            derived from a file.
29
        names:
30
            The string names of each of the columns comprising the header. If
31
            these names contain spaces or repeat, this representation
32
            automatically amends them.
33
        string:
34
            The original string that was split to create header names.  If None,
35
            the names were not derived from a file.
36
    """
37

38
    line: int | None
2✔
39
    names: list[str]
2✔
40
    string: str | None
2✔
41

42
    def __post_init__(self) -> None:
2✔
43
        """Amend the names during initialization."""
44

45
        # relabel the names to replace spaces, repeats etc.
46
        names = self._amend()
2✔
47
        super().__setattr__('names', names)
2✔
48

49
    def _amend(self):
2✔
50
        """Ensures header names have no spaces and are unique.
51

52
        Header names may not have spaces. This function replaces spaces with
53
        underscores. Header names must be unique. This function adds an
54
        underscore plus an integer to names that repeat.
55
        """
56

57
        # replace any blank chars with underscores
58
        names = [name.strip().replace(' ', '_') for name in self.names]
2✔
59

60
        # replace repeating names with name_i variants for i in [0, inf)
61
        counted = Counter(names)
2✔
62
        mapping = {
2✔
63
            name: (
64
                [name] if cnt < 2 else [name + '_' + str(v) for v in range(cnt)]
65
            )
66
            for name, cnt in counted.items()
67
        }
68

69
        result = [mapping[name].pop(0) for name in names]
2✔
70
        return result
2✔
71

72

73
@dataclass(frozen=True)
2✔
74
class MetaData:
2✔
75
    """An immutable dataclass representing a text file's metadata section.
76

77
    Attributes:
78
        lines:
79
            A 2-tuple of start and stop of file lines containing metadata. If
80
            None, the file does not contain a metadata section.
81
        string:
82
            The string of metadata with no conversion read from file instance.
83
            If None, the file does not contain a metadata section.
84
    """
85

86
    lines: tuple[int, int | None]
2✔
87
    string: str | None
2✔
88

89

90
class Sniffer(ReprMixin):
2✔
91
    r"""A tool for inferring the dialect and structure of a CSV file.
92

93
    The formatting of CSV files can vary widely. Python's builtin Sniffer is
94
    capable of handling different dialects (separators, line terminators, quotes
95
    etc) but assumes the first line within the file is a header or a row of
96
    unheaded data. In practice, many CSV files contain metadata prior to the
97
    header or data section. While these files are not compliant with CSV
98
    standards (RFC-4180), their broad use necessitates file sniffing that infers
99
    both dialect and structure. To date, some csv readers such as Pandas
100
    read_csv allow metadata rows to be skipped but no formal mechanism for
101
    sniffing dialect, metadata and header information exist. This Sniffer
102
    supports these operations.
103

104
    Attributes:
105
        infile:
106
            An open file, an IO instance.
107
        line_count:
108
            The number of lines in infile.
109
        start:
110
            The start line of infile for collecting a sample of 'amount' number
111
            of lines.
112
        amount:
113
            The number of infile lines to sample for dialect, header and
114
            metadata detection. The initial value defaults to the smaller of
115
            line_count or 100 lines. The amount should be large enough to
116
            include some of the data section of the file.
117
        skips:
118
            Line numbers to ignore during sample collection.
119

120
    Examples:
121
        >>> import tempfile
122
        >>> delimiter = ';'
123
        >>> # make a metadata and add to text that will be written to tempfile
124
        >>> metadata = {'exp': '3', 'name': 'Paul Dirac', 'date': '11/09/1942'}
125
        >>> text = [delimiter.join([key, val]) for key, val in metadata.items()]
126
        >>> # make a header and row to skip and add to text
127
        >>> header = delimiter.join('group count color'.split())
128
        >>> to_skip = delimiter.join('please ignore this line'.split())
129
        >>> text.extend([header, to_skip])
130
        >>> # make some data rows and add to text
131
        >>> group = 'a c b b c a c b c a a c'.split()
132
        >>> count = '22 2 13 15 4 19 4 21 5 24 18 1'.split()
133
        >>> color = 'r g b b r r r g g  b b g'.split()
134
        >>> data = [delimiter.join(row) for row in zip(group, count, color)]
135
        >>> text.extend(data)
136
        >>> # create a temp file and dump our text
137
        >>> outfile = tempfile.TemporaryFile(mode='w+')
138
        >>> _ = outfile.write('\n'.join(text))
139
        >>> # create a sniffer
140
        >>> sniffer = Sniffer(outfile)
141
        >>> # change the sample amount to 10 lines and skip line 4
142
        >>> # you would know to do this by inspecting the sample property
143
        >>> # and seeing the problematic line 4
144
        >>> sniffer.amount = 10
145
        >>> sniffer.skips = [4]
146
        >>> sniffer.sniff()
147
        >>> print(sniffer.dialect)
148
        SimpleDialect(';', '"', None)
149
        >>> # ask the sniffer to return a Header
150
        >>> header = sniffer.header(poll=4)
151
        >>> print(header)
152
        ... #doctest: +NORMALIZE_WHITESPACE
153
        Header(line=3,
154
        names=['group', 'count', 'color'],
155
        string='group;count;color')
156
        >>> # ask sniffer for the metadata given the header
157
        >>> sniffer.metadata(header)
158
        ... #doctest: +NORMALIZE_WHITESPACE
159
        MetaData(lines=(0, 3),
160
        string='exp;3\nname;Paul Dirac\ndate;11/09/1942')
161
        >>> # ask for the column types and consistency of types
162
        >>> # by polling the last 4 rows
163
        >>> types, consistent = sniffer.types(poll=4)
164
        >>> print(types)
165
        [<class 'str'>, <class 'int'>, <class 'str'>]
166
        >>> print(consistent)
167
        True
168
        >>> # close the temp outfile resource
169
        >>> outfile.close()
170
    """
171

172
    # help users set sane values for the sniffer
173
    # pylint: disable-next=R0917, dangerous-default-value
174
    def __init__(
2✔
175
        self,
176
        infile: IO[str],
177
        start: int = 0,
178
        amount: int = 100,
179
        skips: list[int] | None = None,
180
        delimiters: list[str] = [',', ';', '|', '\t'],
181
        decimal: str = '.',
182
    ) -> None:
183
        """Initialize this sniffer.
184

185
        Args:
186
            infile:
187
                A I/O stream instance such as returned by open.
188
            start:
189
                The start line of infile for collecting a sample of lines.
190
            amount:
191
                The number of infile lines to sample for dialect detection and
192
                locating header and metadata positions. The initial value defaults
193
                to the smaller of the infiles length or 100 lines.
194
            skips:
195
                Line numbers to ignore during sample collection.
196
            delimiters:
197
                A restricted list of delimiter strings for improving dialect
198
                detection. If None, any character will be considered a valid
199
                delimiter.
200
            decimal:
201
                The format of the decimal notation. Defaults to '.'.
202

203
        Raises:
204
            SoptIteration: is raised if start is greater than infile's size.
205

206
        Notes:
207
            Sniffer deviates from Python's Sniffer in that infile is strictly an
208
            IO stream, not a list because detecting the metadata and header
209
            structures requires movement within the file via 'seek'.
210
        """
211

212
        self.infile = infile
2✔
213
        self.infile.seek(0)
2✔
214
        self._start = start
2✔
215
        self._amount = amount
2✔
216
        self._skips = skips if skips else []
2✔
217
        # remove decimal from delimiter consideration
218
        delims = [d for d in delimiters if d != decimal]
2✔
219
        self.decimal = decimal
2✔
220
        # get sample for infile and sniff
221
        self._resample()
2✔
222
        self.sniff(delims)
2✔
223

224
    @property
2✔
225
    def start(self) -> int:
2✔
226
        """Returns the start line of this Sniffer's sample."""
227

228
        return self._start
2✔
229

230
    @start.setter
2✔
231
    def start(self, value: int) -> None:
2✔
232
        """Sets the start line & updates this Sniffer's sample
233

234
        Args:
235
            value:
236
                A new sample start line.
237
        """
238

239
        self._start = value
2✔
240
        self._resample()
2✔
241

242
    @property
2✔
243
    def amount(self) -> int:
2✔
244
        """Returns the number of lines in Sniffer's sample."""
245

246
        return self._amount
2✔
247

248
    @amount.setter
2✔
249
    def amount(self, value: int) -> None:
2✔
250
        """Sets the number of lines & updates this Sniffer's sample.
251

252
        Args:
253
            value:
254
                The new number of joined lines in the sample.
255
        """
256

257
        self._amount = value
2✔
258
        self._resample()
2✔
259

260
    @property
2✔
261
    def skips(self) -> list[int]:
2✔
262
        """Returns the skipped lines excluded from this Sniffer's sample."""
263

264
        return self._skips
2✔
265

266
    @skips.setter
2✔
267
    def skips(self, other: list[int]) -> None:
2✔
268
        """Sets the lines to exclude from this Sniffer's sample."""
269

270
        self._skips = other
2✔
271
        self._resample()
2✔
272

273
    @property
2✔
274
    def sample(self) -> str:
2✔
275
        """Returns this Sniffer's sample string."""
276

277
        return self._sample
2✔
278

279
    @property
2✔
280
    def lines(self) -> list[int]:
2✔
281
        """Returns a list of integer line numbers comprising the sample."""
282

283
        return self._lines
2✔
284

285
    @property
2✔
286
    def dialect(self) -> SimpleDialect | None:
2✔
287
        """Returns this Sniffer's dialect."""
288

289
        return self._dialect
2✔
290

291
    @dialect.setter
2✔
292
    def dialect(self, value: SimpleDialect | None) -> None:
2✔
293
        """Sets this Sniffer's dialect.
294

295
        Args:
296
            dialect:
297
                A clevercsv SimpleDialect instance containing a delimiter,
298
                escape character and quote character.
299

300
        Returns:
301
            None
302
        """
303

304
        if value:
2✔
305
            # python 3.11 deprecated '' for delimiter, escape & quotechars
306
            delimiter = '\r' if value.delimiter == '' else value.delimiter
2✔
307
            escapechar = None if value.escapechar == '' else value.escapechar
2✔
308
            quotechar = '"' if not value.quotechar else value.quotechar
2✔
309
            value.delimiter = delimiter
2✔
310
            value.escapechar = escapechar
2✔
311
            value.quotechar = quotechar
2✔
312

313
        self._dialect = value
2✔
314

315
    @property
2✔
316
    def rows(self) -> list[list[str]]:
2✔
317
        """Returns list of sample rows from this Sniffer's sample string.
318

319
        This method splits the sample string on new line chars, strips white
320
        spaces and replaces all double-quotes with single quotes.
321

322
        Returns:
323
            A list of list of strings from the sample string
324
        """
325

326
        if self.dialect is None:
2✔
327
            msg = "Dialect is unknown, please call sniff method or set dialect."
×
328
            raise TypeError(msg)
×
329

330
        result = []
2✔
331
        delimiter = self.dialect.delimiter
2✔
332

333
        # single column data uses carriage return delimiter
334
        if delimiter == '\r':
2✔
335
            return [
2✔
336
                [astr.replace('"', '')] for astr in self.sample.splitlines()
337
            ]
338

339
        # split sample_str on terminators, strip & split each line on delimiter
340
        for line in self.sample.splitlines():
2✔
341
            # lines may end in delimiter leading to empty trailing cells
342
            stripped = line.rstrip(delimiter)
2✔
343
            row = stripped.split(self.dialect.delimiter)
2✔
344
            # remove any double quotes
345
            row = [astring.replace('"', '') for astring in row]
2✔
346
            result.append(row)
2✔
347

348
        return result
2✔
349

350
    def _move(self, line: int) -> None:
2✔
351
        """Moves the line pointer in this file to line number.
352

353
        Args:
354
            line:
355
                A line number to move to within this Sniffer's infile.
356

357
        Returns:
358
            None but advances the line pointer to line.
359

360
        Raises:
361
            A StopIteration is issued if line is greater than Sniffer's infile
362
            size.
363
        """
364

365
        self.infile.seek(0)
2✔
366
        for _ in range(line):
2✔
367
            # NamedTemporaryFiles are not iterators like file instances
368
            next(iter(self.infile))
2✔
369

370
    def _resample(self) -> None:
2✔
371
        """Sample from infile using the start, amount and skip properties."""
372

373
        self._move(self.start)
2✔
374
        result = SimpleNamespace(indices=[], linestrs=[])
2✔
375
        amount = self.amount + len(self.skips)
2✔
376
        for current in range(self.start, amount + self.start):
2✔
377

378
            line = self.infile.readline()
2✔
379
            # only store non-blank lines
380
            if current not in self.skips and line:
2✔
381
                result.linestrs.append(line)
2✔
382
                result.indices.append(current)
2✔
383

384
        # move line pointer back to start of the file
385
        self._move(0)
2✔
386
        sampled = ''.join(result.linestrs)
2✔
387
        self._sample: str = sampled
2✔
388
        self._lines: list[int] = result.indices
2✔
389

390
    def sniff(self, delimiters: list[str] | None = None) -> None:
2✔
391
        """Returns a clevercsv SimpleDialect from this instances sample.
392

393
        Dialect is detected using clevercsv's sniffer as it has shown improved
394
        dialect detection accuracy over Python's csv sniffer built-in.
395

396
        Args:
397
            delimiters:
398
                A string of possibly valid delimiters see csv.Sniffer.sniff.
399

400
        Returns:
401
            A SimpleDialect instance (see clevercsv.dialect) or None if sniffing
402
            is inconclusive.
403

404
        References:
405
            van den Burg, G.J.J., Nazábal, A. & Sutton, C. Wrangling messy CSV
406
            files by detecting row and type patterns. Data Min Knowl Disc 33,
407
            1799–1820 (2019). https://doi.org/10.1007/s10618-019-00646-y
408
        """
409

410
        # result is None if clevercsv's sniff is indeterminant
411
        result = clevercsv.Sniffer().detect(self.sample, delimiters=delimiters)
2✔
412
        if result is None:
2✔
UNCOV
413
            msg1 = "Dialect could not be determined from Sniffer's sample.  "
×
UNCOV
414
            msg2 = "Please set this Sniffer's dialect attribute."
×
UNCOV
415
            warnings.warn(msg1 + msg2)
×
UNCOV
416
            self._dialect = None
×
417
        else:
418
            self.dialect = result
2✔
419

420
    # no mutation of exclude list here
421
    # pylint: disable-next=dangerous-default-value
422
    def types(
2✔
423
        self,
424
        poll: int,
425
        exclude: list[str] = ['', ' ', '-', 'nan', 'NaN', 'NAN'],
426
    ) -> tuple[CellTypes, bool]:
427
        """Infer the column types from the last poll count rows.
428

429
        Args:
430
            poll:
431
                The number of last sample rows to poll for type.
432
            exclude:
433
                A sequence of characters that indicate missing values. Rows
434
                containing these strings will be ignored for type determination.
435

436
        Returns:
437
            A list of types and a boolean indicating if types are
438
            consistent across polled rows. Ints, floats and complex within the
439
            same column are defined as consistent.
440
        """
441

442
        rows = self.rows[-poll:]
2✔
443
        rows = [row for row in rows if not bool(set(exclude).intersection(row))]
2✔
444
        if not rows:
2✔
UNCOV
445
            msg = (
×
446
                f'Types could not be determined as last {poll} polling '
447
                f'rows all contained at least one exclusion {exclude}. Try '
448
                'increasing the number of polling rows.'
449
            )
UNCOV
450
            raise RuntimeError(msg)
×
451

452
        cols = list(zip(*rows))
2✔
453
        type_cnts = [
2✔
454
            Counter([type(parsing.convert(el, self.decimal)) for el in col])
455
            for col in cols
456
        ]
457

458
        consistent = True
2✔
459
        for s in [set(cnts) for cnts in type_cnts]:
2✔
460
            # inconsistent if > 1 type per column & any non-numerics
461
            if len(s) > 1 and not s.issubset({float, int, complex}):
2✔
UNCOV
462
                consistent = False
×
UNCOV
463
                break
×
464

465
        common_types = [cnt.most_common(1)[0][0] for cnt in type_cnts]
2✔
466

467
        return common_types, consistent
2✔
468

469
    # no mutation of exclude list here
470
    # pylint: disable-next=dangerous-default-value
471
    def datetime_formats(
2✔
472
        self,
473
        poll: int,
474
        exclude: list[str] = ['', ' ', '-', 'nan', 'NaN', 'NAN'],
475
    ) -> tuple[list[str | None], bool]:
476
        """Infer time, date or datetime formats from last poll count rows.
477

478
        Args:
479
            poll:
480
                The number of last sample rows to poll for type and format
481
                consistency.
482

483
        Returns:
484
            A tuple containing a list of formats the same length as last polled
485
            row and a boolean indicating if the formats are consistent across
486
            the polled rows. Columns that are not time, date or datetime type
487
            have a format of None.
488
        """
489

490
        fmts = {
2✔
491
            time: parsing.time_formats(),
492
            date: parsing.date_formats(),
493
            datetime: parsing.datetime_formats(),
494
        }
495
        polled = []
2✔
496
        for row in self.rows[-poll:]:
2✔
497
            row_fmts = []
2✔
498
            for astring, tp in zip(row, self.types(poll, exclude)[0]):
2✔
499
                fmt = (
2✔
500
                    parsing.find_format(astring, fmts[tp])
501
                    if tp in fmts
502
                    else None
503
                )
504
                row_fmts.append(fmt)
2✔
505
            polled.append(row_fmts)
2✔
506

507
        # consistency within each column of polled
508
        consistent = all(len(set(col)) == 1 for col in list(zip(*polled)))
2✔
509

510
        return polled[-1], consistent
2✔
511

512
    def _length_diff(
2✔
513
        self,
514
        poll: int,
515
        exclude: list[str],
516
    ) -> tuple[int | None, list[str] | None]:
517
        """Locates metadata by identifying the first row from the end of the
518
        sample whose length does not match the length of the last poll rows.
519

520
        This method assumes that the metadata row lengths do not match the data
521
        row lengths. This can obviously be untrue but detecting the difference
522
        between a header row whose length must match the number of data columns
523
        from a metadata row with the same number of columns is challenging.
524

525
        Args:
526
            poll:
527
                The number of last sample rows to poll for common types.
528
            exclude:
529
                A sequence of characters that indicate missing values. Rows
530
                containing these strings will be ignored.
531

532
        Returns:
533
            A 2-tuple of integer line number and the metadata row if found and
534
            a 2-tuple of Nones otherwise.
535
        """
536

537
        types, _ = self.types(poll, exclude)
2✔
538
        for idx, row in reversed(list(zip(self.lines, self.rows))):
2✔
539
            if len(row) != len(types):
2✔
540
                return idx, row
2✔
541

542
        return None, None
2✔
543

544
    def _type_diff(
2✔
545
        self,
546
        poll: int,
547
        exclude: list[str],
548
    ) -> tuple[int | None, list[str] | None]:
549
        """Locates a header row by looking for the first row from the last of
550
        this Sniffer's rows whose types do not match the last polled row types.
551

552
        This heuristic assumes a consistent type within a column of data. If
553
        this is found to be untrue it returns a two-tuple of Nones. Ints, floats
554
        and complex are treated as consistent by type_diff.
555

556
        Args:
557
            poll:
558
                The number of last sample rows to poll for common types.
559
            exclude:
560
                A sequence of characters that indicate missing values. Rows
561
                containing these strings will be ignored.
562

563
        Returns:
564
            A 2-tuple integer line number and header row or a 2-tuple of Nones.
565
        """
566

567
        types, consistent = self.types(poll, exclude)
2✔
568

569
        if not consistent:
2✔
UNCOV
570
            msg = 'Detection failure due to inconsistent column data types'
×
UNCOV
571
            warnings.warn(msg)
×
UNCOV
572
            return None, None
×
573

574
        # int, float and complex mismatches are not type mismatches
575
        numerics = {int, float, complex}
2✔
576
        for idx, row in reversed(list(zip(self.lines, self.rows))):
2✔
577

578
            # ignore blank rows
579
            if set(row) == {''}:
2✔
580
                continue
2✔
581

582
            # ignore rows that have missing values
583
            if bool(set(exclude).intersection(row)):
2✔
UNCOV
584
                continue
×
585

586
            if len(row) != len(types):
2✔
587
                # we've encountered a metadata row without hitting a header
588
                return None, None
2✔
589

590
            row_types = [type(parsing.convert(el, self.decimal)) for el in row]
2✔
591
            # check types
592
            for typ, expect in zip(row_types, types):
2✔
593
                if typ != expect and not {typ, expect}.issubset(numerics):
2✔
594
                    return idx, row
2✔
595

596
        return None, None
2✔
597

598
    def _string_diff(
2✔
599
        self,
600
        poll: int,
601
        exclude: list[str],
602
        len_requirement: bool = True,
603
    ) -> tuple[int | None, list[str] | None]:
604
        """Locates first row from last whose strings have no overlap with
605
        strings in the last poll rows.
606

607
        Args:
608
            poll:
609
                The number of last sample rows to poll for string values.
610

611
            exclude:
612
                A sequence of characters that indicate missing values. Rows
613
                containing these strings will be ignored.
614
            len_requirement:
615
                A boolean indicating if the first row from last with a type
616
                mismatch must have the same length as the last row of the
617
                sample. This will be True for headers and False for metadata.
618

619
        Returns:
620
            An integer line number and header row or a 2-tuple of Nones
621
        """
622

623
        observed = set(chain.from_iterable(self.rows[-poll:]))
2✔
624
        for idx, row in reversed(list(zip(self.lines, self.rows))):
2✔
625

626
            items = set(row)
2✔
627
            # ignore rows with missing values
628
            if bool(set(exclude).intersection(items)):
2✔
629
                continue
2✔
630

631
            # check disjoint with observed and completeness
632
            disjoint = items.isdisjoint(observed)
2✔
633
            complete = len(row) == len(self.rows[-1])
2✔
634

635
            if not len_requirement:
2✔
636
                # complete is always True if no length requirement
UNCOV
637
                complete = True
×
638

639
            if disjoint and complete:
2✔
640
                return idx, row
2✔
641

642
            # add unseen items to observed
643
            observed.update(items)
2✔
644

UNCOV
645
        return None, None
×
646

647
    # no mutation of exclude list here
648
    # pylint: disable-next=dangerous-default-value
649
    def header(
2✔
650
        self,
651
        poll: int,
652
        exclude: list[str] = ['', ' ', '-', 'nan', 'NaN', 'NAN'],
653
    ) -> Header:
654
        """Detects the header row (if any) from this Sniffers sample rows.
655

656
        Headers are located using one of two possible methods.
657
            1. If the last row contains mixed types and the last poll rows have
658
               consistent types, then the first row from the last whose types
659
               differ from the last row types and whose length matches the last
660
               row is taken as the header.
661
            2. If the last poll rows are all string type. The first row from the
662
               last with string values that have never been seen in the previous
663
               rows and whose length matches the last row is taken to be the
664
               header. Caution, the poll amount should be sufficiently large
665
               enough to sample the possible string values expected in the data
666
               section. If the header is not correct, consider increasing the
667
               poll rows parameter.
668

669
        Args:
670
            poll:
671
                The number of last sample rows to poll for locating the header
672
                using string or type differences. Poll should be large enough to
673
                capture many of the string values that appear in the data
674
                section.
675
            exclude:
676
                A sequence of characters that indicate missing values. Rows
677
                containing these strings will be ignored.
678

679
        Notes:
680
            If no header is detected this method constructs a header. The names
681
            in this header are of the form; 'Column_1', ... 'Column_n' where
682
            n is the expected number of columns from the last row of the sample
683
            rows.  Just like all other file sniffers, this heuristic will make
684
            mistakes.  A judicious sample choice that ignores problematic rows
685
            via the skip parameter may aide detection.
686

687
        Returns:
688
            A Header dataclass instance.
689
        """
690

691
        types, _ = self.types(poll, exclude)
2✔
692
        if all(typ == str for typ in types):
2✔
693
            line, row = self._string_diff(poll, exclude)
2✔
694

695
        else:
696
            line, row = self._type_diff(poll, exclude)
2✔
697

698
        if line is None:
2✔
699
            row = [f'Column_{i}' for i in range(len(self.rows[-1]))]
2✔
700

701
        # type-narrow for mypy check-- row can no longer be None
702
        assert isinstance(row, list)
2✔
703
        # get original string if line
704
        if line is not None:
2✔
705
            # string should include the rows we skipped so use sample not rows
706
            s = self.sample.splitlines()[self.lines.index(line)]
2✔
707
        else:
708
            s = None
2✔
709

710
        return Header(line=line, names=row, string=s)
2✔
711

712
    # no mutation of exclude list here
713
    # pylint: disable-next=dangerous-default-value
714
    def metadata(
2✔
715
        self,
716
        header: Header | None,
717
        poll: int | None = None,
718
        exclude: list[str] = ['', ' ', '-', 'nan', 'NaN', 'NAN'],
719
    ) -> MetaData:
720
        """Detects the metadata section (if any) in this Sniffer's sample.
721

722
        Args:
723
            header:
724
                A Header dataclass instance.
725
            poll:
726
                The number of last sample rows to poll for locating metadata by
727
                length differences if the header arg is None.
728
            exclude:
729
                A sequence of characters that indicate missing values. Rows
730
                containing these strings will be ignored during metadata
731
                detection. This is ignored if a header is given.
732

733
        Returns:
734
            A MetaData dataclass instance.
735
        """
736

737
        # if header provided get lines upto header line
738
        if header and header.line:
2✔
739
            idx = self.lines.index(header.line)
2✔
740
            s = '\n'.join(self.sample.splitlines()[0:idx])
2✔
741
            return MetaData((0, header.line), s)
2✔
742

743
        if not header and poll is None:
2✔
UNCOV
744
            msg = 'Arguments header and poll cannot both be None type'
×
UNCOV
745
            raise ValueError(msg)
×
746

747
        # type narrow poll to int type for mypy
748
        assert isinstance(poll, int)
2✔
749
        line, _ = self._length_diff(poll, exclude)
2✔
750
        if line is not None:
2✔
751
            metarows = self.sample.splitlines()[: line + 1]
2✔
752
            string = '\n'.join(metarows)
2✔
753
            return MetaData((0, line + 1), string)
2✔
754

755
        return MetaData((0, None), None)
2✔
756

757

758
if __name__ == '__main__':
2✔
759

UNCOV
760
    import doctest
×
761

UNCOV
762
    doctest.testmod()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc