• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

palcarazm / batchjs / 17580344153

09 Sep 2025 10:55AM UTC coverage: 98.387% (-1.4%) from 99.825%
17580344153

Pull #58

github

web-flow
Merge 5a02e8636 into a2981500d
Pull Request #58: fix: prevent write callback call before flushing

89 of 90 branches covered (98.89%)

Branch coverage included in aggregate %.

31 of 39 new or added lines in 8 files covered. (79.49%)

460 of 468 relevant lines covered (98.29%)

75.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.86
/src/streams/classes/FilterStream.ts
1
import { TransformCallback } from "stream";
2
import { DiscardingInternalBufferDuplex, ObjectDuplexOptions } from "../interfaces/_index";
120✔
3

4
/**
5
 * @interface
6
 * Options for the FilterStream.
7
 * @extends ObjectDuplexOptions
8
 * @template T
9
 */
10
export interface FilterStreamOptions<T> extends ObjectDuplexOptions {
11
    filter: (chunk: T) => boolean;
12
}
13

14
/**
15
 * @class
16
 * Class that allows you to filter data in a stream.
17
 * @extends DiscardingInternalBufferDuplex
18
 * @template T
19
 * @example
20
 * ```typescript
21
 * const stream:FilterStream<string> = new FilterStream({
22
 *     objectMode: true,
23
 *     filter: (chunk: string) => chunk === "data1" || chunk === "data2",
24
 * });
25
 * 
26
 * stream.write("data1");
27
 * stream.write("data2");
28
 * stream.write("data3");// Discarded
29
 * stream.end();
30
 * 
31
 * stream.on("data", (chunk: string) => {
32
 *     console.log(``Pushed chunk: ${chunk}```);
33
 * });
34
 * stream.on("discard", (chunk: string) => {
35
 *     console.log(``Discarded chunk: ${chunk}```);
36
 * });
37
 * ```
38
 * ```shell
39
 * >> Pushed chunk: data1
40
 * >> Pushed chunk: data2
41
 * >> Discarded chunk: data3
42
 * ```
43
 */
44
export class FilterStream<T> extends DiscardingInternalBufferDuplex<T,T> {
120✔
45
    private readonly _filter: (chunk: T) => boolean;
46

47
    /**
48
     * @constructor
49
     * @param {FilterStreamOptions} options - The options for the FilterStream.
50
     * @param [options.filter] {Function} - The filter function for pushing data to the stream or discarding it.
51
     */
52
    constructor(options: FilterStreamOptions<T>) {
53
        super(options);
12✔
54
        this._filter = options.filter;
12✔
55
    }
56

57
    /**
58
     * A method to write data to the stream, filter the chunk and push it to the buffer or discard it, and execute the callback.
59
     *
60
     * @param {T} chunk - The data chunk to write to the stream.
61
     * @param {BufferEncoding} encoding - The encoding of the data.
62
     * @param {TransformCallback} callback - The callback function to be executed after writing the data.
63
     * @return {void} This function does not return anything.
64
     */
65
    _write(chunk: T, encoding: BufferEncoding, callback: TransformCallback): void {
66
        if(this._filter(chunk)){
36✔
67
            this.buffer.push(chunk);
12✔
68
            this._flush()
12✔
69
                .then(()=>callback())
12✔
NEW
70
                .catch((e)=>callback(e));
×
71
        }else{
72
            this.emit("discard", chunk);
24✔
73
            callback();
24✔
74
        }
75
    }
76
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc