• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

palcarazm / batchjs / 17580371047

09 Sep 2025 10:57AM UTC coverage: 98.387% (-1.4%) from 99.825%
17580371047

push

github

web-flow
Merge pull request #58 from palcarazm/fix/1.2.2

fix: prevent write callback call before flushing

89 of 90 branches covered (98.89%)

Branch coverage included in aggregate %.

31 of 39 new or added lines in 8 files covered. (79.49%)

460 of 468 relevant lines covered (98.29%)

75.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.12
/src/streams/classes/DistinctStream.ts
1
import { TransformCallback } from "stream";
2
import { DiscardingInternalBufferDuplex, ObjectDuplexOptions } from "../interfaces/_index";
120✔
3

4
/**
5
 * @interface
6
 * Options for the FilterStream.
7
 * @extends DuplexOptions
8
 * @template TInput
9
 * @template TKey
10
 */
11
export interface DistinctStreamOptions<TInput,TKey> extends ObjectDuplexOptions {
12
    keyExtractor: (chunk: TInput) => TKey;
13
}
14

15
/**
16
 * @class
17
 * Class that allows you to discard repeated data in a stream in base on a key.
18
 * Data with duplicated key will be emitted through the discard event.
19
 * @extends DiscardingInternalBufferDuplex
20
 * @template TInput
21
 * @template TKey
22
 * @example
23
 * ```typescript
24
 * const stream:DistinctStream<string,string> = new DistinctStream({
25
 *     objectMode: true,
26
 *     keyExtractor: (chunk: string) => chunk,
27
 * });
28
 * 
29
 * stream.write("data1");
30
 * stream.write("data2");
31
 * stream.write("data1"); //Duplicated
32
 * stream.end();
33
 * 
34
 * stream.on("data", (chunk: string) => {
35
 *     console.log(``Pushed chunk: ${chunk}```);
36
 * });
37
 * stream.on("discard", (chunk: string) => {
38
 *     console.log(``Duplicated chunk: ${chunk}```);
39
 * });
40
 * ```
41
 * ```shell
42
 * >> Pushed chunk: data1
43
 * >> Pushed chunk: data2
44
 * >> Duplicated chunk: data1
45
 * ```
46
 */
47
export class DistinctStream<TInput,TKey> extends DiscardingInternalBufferDuplex<TInput,TInput> {
120✔
48
    private readonly keySet: Set<TKey> = new Set();
12✔
49
    private readonly _keyExtractor: (chunk: TInput) => TKey;
50

51
    /**
52
     * @constructor
53
     * @param {DistinctStreamOptions} options - The options for the FilterStream.
54
     * @param [options.keyExtractor] {Function} - The key extractor function for determining the key of the data to be filtered.
55
     */
56
    constructor(options: DistinctStreamOptions<TInput,TKey>) {
57
        super(options);
12✔
58
        this._keyExtractor = options.keyExtractor;
12✔
59
    }
60

61
    /**
62
     * A method to write data to the stream, get the key of the data, and if the key is not in the set, push the data to the buffer, otherwise discard it.
63
     *
64
     * @param {TInput} chunk - The data chunk to write to the stream.
65
     * @param {BufferEncoding} encoding - The encoding of the data.
66
     * @param {TransformCallback} callback - The callback function to be executed after writing the data.
67
     * @return {void} This function does not return anything.
68
     */
69
    _write(chunk: TInput, encoding: BufferEncoding, callback: TransformCallback): void {
70
        const key = this._keyExtractor(chunk);
42✔
71
        if(!this.keySet.has(key)){
42✔
72
            this.push(chunk);
24✔
73
            this._flush()
24✔
74
                .then(()=>callback())
24✔
NEW
75
                .catch((e)=>callback(e));
×
76
            this.keySet.add(key);
24✔
77
        }else{
78
            this.emit("discard", chunk);
18✔
79
            callback();
18✔
80
        }
81
    }
82
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc