• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

palcarazm / batchjs / 14982653942

12 May 2025 09:07PM UTC coverage: 87.809% (-11.6%) from 99.401%
14982653942

push

github

web-flow
fix: remove duplicates push on back-pressure (#38)

* fix: remove duplicates push on back-pressure

* test: remove drain event test

100 of 130 branches covered (76.92%)

Branch coverage included in aggregate %.

50 of 51 new or added lines in 11 files covered. (98.04%)

39 existing lines in 8 files now uncovered.

433 of 477 relevant lines covered (90.78%)

40.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.36
/src/streams/classes/DistinctStream.ts
1
import { TransformCallback } from "stream";
2
import { DiscardingStream, ObjectDuplexOptions } from "../interfaces/_index";
108✔
3

4
/**
5
 * @interface
6
 * Options for the FilterStream.
7
 * @extends DuplexOptions
8
 * @template TInput
9
 * @template TKey
10
 */
11
export interface DistinctStreamOptions<TInput,TKey> extends ObjectDuplexOptions {
12
    keyExtractor: (chunk: TInput) => TKey;
13
}
14

15
/**
16
 * @class
17
 * Class that allows you to discard repeated data in a stream in base on a key.
18
 * Data with duplicated key will be emitted through the discard event.
19
 * @extends DiscardingStream
20
 * @template TInput
21
 * @template TKey
22
 * @example
23
 * ```typescript
24
 * const stream:DistinctStream<string,string> = new DistinctStream({
25
 *     objectMode: true,
26
 *     keyExtractor: (chunk: string) => chunk,
27
 * });
28
 * 
29
 * stream.write("data1");
30
 * stream.write("data2");
31
 * stream.write("data1"); //Duplicated
32
 * stream.end();
33
 * 
34
 * stream.on("data", (chunk: string) => {
35
 *     console.log(``Pushed chunk: ${chunk}```);
36
 * });
37
 * stream.on("discard", (chunk: string) => {
38
 *     console.log(``Duplicated chunk: ${chunk}```);
39
 * });
40
 * ```
41
 * ```shell
42
 * >> Pushed chunk: data1
43
 * >> Pushed chunk: data2
44
 * >> Duplicated chunk: data1
45
 * ```
46
 */
47
export class DistinctStream<TInput,TKey> extends DiscardingStream<TInput> {
108✔
48
    protected buffer: Array<TInput> = [];
18✔
49
    private readonly keySet: Set<TKey> = new Set();
18✔
50
    private readonly _keyExtractor: (chunk: TInput) => TKey;
51

52
    /**
53
     * @constructor
54
     * @param {DistinctStreamOptions} options - The options for the FilterStream.
55
     * @param [options.keyExtractor] {Function} - The key extractor function for determining the key of the data to be filtered.
56
     */
57
    constructor(options: DistinctStreamOptions<TInput,TKey>) {
58
        super(options);
18✔
59
        this._keyExtractor = options.keyExtractor;
18✔
60
    }
61

62
    /**
63
     * A method to write data to the stream, get the key of the data, and if the key is not in the set, push the data to the buffer, otherwise discard it.
64
     *
65
     * @param {TInput} chunk - The data chunk to write to the stream.
66
     * @param {BufferEncoding} encoding - The encoding of the data.
67
     * @param {TransformCallback} callback - The callback function to be executed after writing the data.
68
     * @return {void} This function does not return anything.
69
     */
70
    _write(chunk: TInput, encoding: BufferEncoding, callback: TransformCallback): void {
71
        const key = this._keyExtractor(chunk);
54✔
72
        if(!this.keySet.has(key)){
54✔
73
            this.buffer.push(chunk);
36✔
74
            this.keySet.add(key);
36✔
75
            this._read(1);
36✔
76
        }else{
77
            this.emit("discard", chunk);
18✔
78
        }
79
        callback();
54✔
80
    }
81

82

83
    /**
84
     * Finalizes the stream by pushing remaining data, handling errors,
85
     * and executing the final callback.
86
     *
87
     * @param {TransformCallback} callback - The callback function to be executed after finalizing the stream.
88
     * @return {void} This function does not return anything.
89
     */
90
    _final(callback: TransformCallback): void {
91
        const pushData = ()=>{
18✔
92
            while (this.buffer.length > 0) {
18✔
UNCOV
93
                const chunk = this.buffer.shift() as TInput;
×
UNCOV
94
                if (!this.push(chunk)) {
×
UNCOV
95
                    this.once("drain", pushData);
×
UNCOV
96
                    return;
×
97
                }
98
            }
99
            this.push(null);
18✔
100
            callback();
18✔
101
        };
102

103
        pushData();
18✔
104
    }
105

106
    /**
107
     * Pushes the ready chunks to the consumer stream since the buffer is empty or the size limit is reached.
108
     *
109
     * @param {number} size - The size parameter for controlling the read operation.
110
     * @return {void} This function does not return anything.
111
     */
112
    _read(size: number): void {
113
        const handleDrain = () => this._read(size);
36✔
114

115
        while (this.buffer.length > 0 && size > 0) {
36✔
116
            const chunk = this.buffer.shift() as TInput;
36✔
117
            if (!this.push(chunk)) {
36!
UNCOV
118
                this.once("drain", handleDrain);
×
UNCOV
119
                return;
×
120
            }
121
            size--;
36✔
122
        }
123
    }
124
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc