• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

palcarazm / batchjs / 14983613232

12 May 2025 10:09PM UTC coverage: 90.019% (+2.2%) from 87.809%
14983613232

push

github

palcarazm
1.1.3

87 of 111 branches covered (78.38%)

Branch coverage included in aggregate %.

391 of 420 relevant lines covered (93.1%)

43.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.67
/src/streams/classes/DistinctStream.ts
1
import { TransformCallback } from "stream";
2
import { DiscardingStream, ObjectDuplexOptions } from "../interfaces/_index";
108✔
3

4
/**
5
 * @interface
6
 * Options for the FilterStream.
7
 * @extends DuplexOptions
8
 * @template TInput
9
 * @template TKey
10
 */
11
export interface DistinctStreamOptions<TInput,TKey> extends ObjectDuplexOptions {
12
    keyExtractor: (chunk: TInput) => TKey;
13
}
14

15
/**
16
 * @class
17
 * Class that allows you to discard repeated data in a stream in base on a key.
18
 * Data with duplicated key will be emitted through the discard event.
19
 * @extends DiscardingStream
20
 * @template TInput
21
 * @template TKey
22
 * @example
23
 * ```typescript
24
 * const stream:DistinctStream<string,string> = new DistinctStream({
25
 *     objectMode: true,
26
 *     keyExtractor: (chunk: string) => chunk,
27
 * });
28
 * 
29
 * stream.write("data1");
30
 * stream.write("data2");
31
 * stream.write("data1"); //Duplicated
32
 * stream.end();
33
 * 
34
 * stream.on("data", (chunk: string) => {
35
 *     console.log(``Pushed chunk: ${chunk}```);
36
 * });
37
 * stream.on("discard", (chunk: string) => {
38
 *     console.log(``Duplicated chunk: ${chunk}```);
39
 * });
40
 * ```
41
 * ```shell
42
 * >> Pushed chunk: data1
43
 * >> Pushed chunk: data2
44
 * >> Duplicated chunk: data1
45
 * ```
46
 */
47
export class DistinctStream<TInput,TKey> extends DiscardingStream<TInput> {
108✔
48
    protected buffer: Array<TInput> = [];
18✔
49
    private readonly keySet: Set<TKey> = new Set();
18✔
50
    private readonly _keyExtractor: (chunk: TInput) => TKey;
51

52
    /**
53
     * @constructor
54
     * @param {DistinctStreamOptions} options - The options for the FilterStream.
55
     * @param [options.keyExtractor] {Function} - The key extractor function for determining the key of the data to be filtered.
56
     */
57
    constructor(options: DistinctStreamOptions<TInput,TKey>) {
58
        super(options);
18✔
59
        this._keyExtractor = options.keyExtractor;
18✔
60
    }
61

62
    /**
63
     * A method to write data to the stream, get the key of the data, and if the key is not in the set, push the data to the buffer, otherwise discard it.
64
     *
65
     * @param {TInput} chunk - The data chunk to write to the stream.
66
     * @param {BufferEncoding} encoding - The encoding of the data.
67
     * @param {TransformCallback} callback - The callback function to be executed after writing the data.
68
     * @return {void} This function does not return anything.
69
     */
70
    _write(chunk: TInput, encoding: BufferEncoding, callback: TransformCallback): void {
71
        const key = this._keyExtractor(chunk);
54✔
72
        if(!this.keySet.has(key)){
54✔
73
            this.buffer.push(chunk);
36✔
74
            this.keySet.add(key);
36✔
75
        }else{
76
            this.emit("discard", chunk);
18✔
77
        }
78
        callback();
54✔
79
    }
80

81

82
    /**
83
     * Finalizes the stream by pushing remaining data, handling errors,
84
     * and executing the final callback.
85
     *
86
     * @param {TransformCallback} callback - The callback function to be executed after finalizing the stream.
87
     * @return {void} This function does not return anything.
88
     */
89
    _final(callback: TransformCallback): void {
90
        while (this.buffer.length > 0) {
18✔
91
            const chunk = this.buffer.shift() as TInput;
36✔
92
            this.push(chunk);
36✔
93
        }
94
        this.push(null);
18✔
95
        callback();
18✔
96
    }
97

98
    /**
99
     * Pushes the ready chunks to the consumer stream since the buffer is empty or the size limit is reached.
100
     *
101
     * @param {number} size - The size parameter for controlling the read operation.
102
     * @return {void} This function does not return anything.
103
     */
104
    _read(size: number): void {
105
        while (this.buffer.length > 0 && size > 0) {
×
106
            const chunk = this.buffer.shift() as TInput;
×
107
            if(!this.push(chunk)){
×
108
                return;
×
109
            };
110
            size--;
×
111
        }
112
    }
113
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc