• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

C2FO / fast-csv / 13550863665

26 Feb 2025 06:30PM CUT coverage: 95.907%. Remained the same
13550863665

Pull #1101

github

web-flow
Merge 78434839f into 947bf442a
Pull Request #1101: chore(deps): update node.js to v22

325 of 353 branches covered (92.07%)

Branch coverage included in aggregate %.

753 of 771 relevant lines covered (97.67%)

2176197.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.07
/packages/parse/src/CsvParserStream.ts
1
import { StringDecoder } from 'string_decoder';
120✔
2
import { Transform, TransformCallback } from 'stream';
120✔
3
import { ParserOptions } from './ParserOptions';
4
import { HeaderTransformer, RowTransformerValidator } from './transforms';
120✔
5
import { Parser } from './parser';
120✔
6
import { Row, RowArray, RowTransformFunction, RowValidate, RowValidatorCallback } from './types';
7

8
export class CsvParserStream<I extends Row, O extends Row> extends Transform {
120✔
9
    private readonly parserOptions: ParserOptions;
10

11
    private readonly decoder: StringDecoder;
12

13
    private readonly parser: Parser;
14

15
    private readonly headerTransformer: HeaderTransformer<I>;
16

17
    private readonly rowTransformerValidator: RowTransformerValidator<I, O>;
18

19
    private lines = '';
468✔
20

21
    private rowCount = 0;
468✔
22

23
    private parsedRowCount = 0;
468✔
24

25
    private parsedLineCount = 0;
468✔
26

27
    private endEmitted = false;
468✔
28

29
    private headersEmitted = false;
468✔
30

31
    public constructor(parserOptions: ParserOptions) {
32
        super({ objectMode: parserOptions.objectMode });
468✔
33
        this.parserOptions = parserOptions;
468✔
34
        this.parser = new Parser(parserOptions);
468✔
35
        this.headerTransformer = new HeaderTransformer(parserOptions);
468✔
36
        this.decoder = new StringDecoder(parserOptions.encoding);
468✔
37
        this.rowTransformerValidator = new RowTransformerValidator();
468✔
38
    }
39

40
    private get hasHitRowLimit(): boolean {
41
        return this.parserOptions.limitRows && this.rowCount >= this.parserOptions.maxRows;
730,986✔
42
    }
43

44
    private get shouldEmitRows(): boolean {
45
        return this.parsedRowCount > this.parserOptions.skipRows;
728,376✔
46
    }
47

48
    private get shouldSkipLine(): boolean {
49
        return this.parsedLineCount <= this.parserOptions.skipLines;
728,808✔
50
    }
51

52
    public transform(transformFunction: RowTransformFunction<I, O>): CsvParserStream<I, O> {
53
        this.rowTransformerValidator.rowTransform = transformFunction;
60✔
54
        return this;
54✔
55
    }
56

57
    public validate(validateFunction: RowValidate<O>): CsvParserStream<I, O> {
58
        this.rowTransformerValidator.rowValidator = validateFunction;
54✔
59
        return this;
48✔
60
    }
61

62
    // eslint-disable-next-line @typescript-eslint/no-explicit-any
63
    public emit(event: string | symbol, ...rest: any[]): boolean {
64
        if (event === 'end') {
731,478✔
65
            if (!this.endEmitted) {
360✔
66
                this.endEmitted = true;
360✔
67
                super.emit('end', this.rowCount);
360✔
68
            }
69
            return false;
342✔
70
        }
71
        return super.emit(event, ...rest);
731,118✔
72
    }
73

74
    public _transform(data: Buffer, encoding: string, done: TransformCallback): void {
75
        // if we have hit our maxRows parsing limit then skip parsing
76
        if (this.hasHitRowLimit) {
1,782!
77
            return done();
×
78
        }
79
        const wrappedCallback = CsvParserStream.wrapDoneCallback(done);
1,782✔
80
        try {
1,782✔
81
            const { lines } = this;
1,782✔
82
            const newLine = lines + this.decoder.write(data);
1,782✔
83
            const rows = this.parse(newLine, true);
1,782✔
84
            return this.processRows(rows, wrappedCallback);
1,770✔
85
        } catch (e) {
86
            return wrappedCallback(e);
12✔
87
        }
88
    }
89

90
    public _flush(done: TransformCallback): void {
91
        const wrappedCallback = CsvParserStream.wrapDoneCallback(done);
390✔
92
        // if we have hit our maxRows parsing limit then skip parsing
93
        if (this.hasHitRowLimit) {
390✔
94
            return wrappedCallback();
6✔
95
        }
96
        try {
384✔
97
            const newLine = this.lines + this.decoder.end();
384✔
98
            const rows = this.parse(newLine, false);
384✔
99
            return this.processRows(rows, wrappedCallback);
384✔
100
        } catch (e) {
101
            return wrappedCallback(e);
×
102
        }
103
    }
104

105
    private parse(data: string, hasMoreData: boolean): string[][] {
106
        if (!data) {
2,166✔
107
            return [];
6✔
108
        }
109
        const { line, rows } = this.parser.parse(data, hasMoreData);
2,160✔
110
        this.lines = line;
2,148✔
111
        return rows;
2,148✔
112
    }
113

114
    private processRows(rows: string[][], cb: TransformCallback): void {
115
        const rowsLength = rows.length;
2,154✔
116
        const iterate = (i: number): void => {
2,154✔
117
            const callNext = (err?: Error): void => {
730,884✔
118
                if (err) {
728,808✔
119
                    return cb(err);
78✔
120
                }
121
                if (i % 100 === 0) {
728,730✔
122
                    // incase the transform are sync insert a next tick to prevent stack overflow
123
                    setImmediate((): void => {
8,304✔
124
                        return iterate(i + 1);
8,304✔
125
                    });
126
                    return undefined;
8,304✔
127
                }
128
                return iterate(i + 1);
720,426✔
129
            };
130
            this.checkAndEmitHeaders();
730,884✔
131
            // if we have emitted all rows or we have hit the maxRows limit option
132
            // then end
133
            if (i >= rowsLength || this.hasHitRowLimit) {
730,884✔
134
                return cb();
2,076✔
135
            }
136
            this.parsedLineCount += 1;
728,808✔
137
            if (this.shouldSkipLine) {
728,808✔
138
                return callNext();
48✔
139
            }
140
            const row = rows[i];
728,760✔
141
            this.rowCount += 1;
728,760✔
142
            this.parsedRowCount += 1;
728,760✔
143
            const nextRowCount = this.rowCount;
728,760✔
144
            return this.transformRow(row, (err, transformResult): void => {
728,760✔
145
                if (err) {
728,760✔
146
                    this.rowCount -= 1;
72✔
147
                    return callNext(err);
72✔
148
                }
149
                if (!transformResult) {
728,688!
150
                    return callNext(new Error('expected transform result'));
×
151
                }
152
                if (!transformResult.isValid) {
728,688✔
153
                    this.emit('data-invalid', transformResult.row, nextRowCount, transformResult.reason);
102✔
154
                } else if (transformResult.row) {
728,586✔
155
                    return this.pushRow(transformResult.row, callNext);
728,094✔
156
                }
157
                return callNext();
594✔
158
            });
159
        };
160
        iterate(0);
2,154✔
161
    }
162

163
    private transformRow(parsedRow: RowArray, cb: RowValidatorCallback<O>): void {
164
        try {
728,760✔
165
            this.headerTransformer.transform(parsedRow, (err, withHeaders): void => {
728,760✔
166
                if (err) {
728,736!
167
                    return cb(err);
×
168
                }
169
                if (!withHeaders) {
728,736!
170
                    return cb(new Error('Expected result from header transform'));
×
171
                }
172
                if (!withHeaders.isValid) {
728,736✔
173
                    if (this.shouldEmitRows) {
24✔
174
                        return cb(null, { isValid: false, row: parsedRow as never as O });
12✔
175
                    }
176
                    // skipped because of skipRows option remove from total row count
177
                    return this.skipRow(cb);
12✔
178
                }
179
                if (withHeaders.row) {
728,712✔
180
                    if (this.shouldEmitRows) {
728,352✔
181
                        return this.rowTransformerValidator.transformAndValidate(withHeaders.row, cb);
728,232✔
182
                    }
183
                    // skipped because of skipRows option remove from total row count
184
                    return this.skipRow(cb);
120✔
185
                }
186
                // this is a header row dont include in the rowCount or parsedRowCount
187
                this.rowCount -= 1;
360✔
188
                this.parsedRowCount -= 1;
360✔
189
                return cb(null, { row: null, isValid: true });
360✔
190
            });
191
        } catch (e) {
192
            cb(e);
48✔
193
        }
194
    }
195

196
    private checkAndEmitHeaders(): void {
197
        if (!this.headersEmitted && this.headerTransformer.headers) {
730,884✔
198
            this.headersEmitted = true;
384✔
199
            this.emit('headers', this.headerTransformer.headers);
384✔
200
        }
201
    }
202

203
    private skipRow(cb: RowValidatorCallback<O>): void {
204
        // skipped because of skipRows option remove from total row count
205
        this.rowCount -= 1;
132✔
206
        return cb(null, { row: null, isValid: true });
132✔
207
    }
208

209
    private pushRow(row: Row, cb: (err?: Error) => void): void {
210
        try {
728,094✔
211
            if (!this.parserOptions.objectMode) {
728,094✔
212
                this.push(JSON.stringify(row));
54✔
213
            } else {
214
                this.push(row);
728,040✔
215
            }
216
            cb();
728,088✔
217
        } catch (e) {
218
            cb(e);
6✔
219
        }
220
    }
221

222
    private static wrapDoneCallback(done: TransformCallback): TransformCallback {
223
        let errorCalled = false;
2,172✔
224
        // eslint-disable-next-line @typescript-eslint/no-explicit-any
225
        return (err: Error | null | undefined, ...args: any[]): void => {
2,172✔
226
            if (err) {
2,172✔
227
                if (errorCalled) {
90!
228
                    throw err;
×
229
                }
230
                errorCalled = true;
90✔
231
                done(err);
90✔
232
                return;
90✔
233
            }
234
            done(...args);
2,082✔
235
        };
236
    }
237
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc