• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

palcarazm / batchjs / 14982653942

12 May 2025 09:07PM UTC coverage: 87.809% (-11.6%) from 99.401%
14982653942

push

github

web-flow
fix: remove duplicates push on back-pressure (#38)

* fix: remove duplicates push on back-pressure

* test: remove drain event test

100 of 130 branches covered (76.92%)

Branch coverage included in aggregate %.

50 of 51 new or added lines in 11 files covered. (98.04%)

39 existing lines in 8 files now uncovered.

433 of 477 relevant lines covered (90.78%)

40.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.39
/src/streams/classes/ParallelStream.ts
1
import { TransformCallback  } from "stream";
2
import { ObjectDuplex, ObjectDuplexOptions } from "../interfaces/_index";
108✔
3

4
/**
5
 * @interface
6
 * Options for the ParallelStream.
7
 * @extends ObjectDuplexOptions
8
 * @template TInput The type of the input data.
9
 * @template TOutput The type of the output data.
10
 */
11
export interface ParallelStreamOptions<TInput, TOutput> extends ObjectDuplexOptions {
12
    maxConcurrent: number;
13
    transform: (chunk: TInput) => Promise<TOutput>;
14
}
15

16
/**
17
 * @class
18
 * Class that allows you to transform and stream data in parallel.
19
 * @extends ObjectDuplex
20
 * @template TInput The type of the input data.
21
 * @template TOutput The type of the output data.
22
 * @example
23
 * ```typescript
24
 * const stream:ParallelStream<string,string> = new ParallelStream({
25
 *     objectMode: true,
26
 *     maxConcurrent: 2,
27
 *     transform(chunk: string) {
28
 *         return Promise.resolve(chunk.toUpperCase());
29
 *     },
30
 * });
31
 * 
32
 * stream.write("data1");
33
 * stream.write("data2");
34
 * stream.write("data3");
35
 * stream.end();
36
 * 
37
 * stream.on("data", (chunk: string) => {
38
 *     console.log(``Pushed chunk: ${chunk}```);
39
 * });
40
 * ```
41
 * ```shell
42
 * >> Pushed chunk: DATA1
43
 * >> Pushed chunk: DATA2
44
 * >> Pushed chunk: DATA3
45
 * ```
46
 */
47
export class ParallelStream<TInput, TOutput> extends ObjectDuplex {
108✔
48
    private queue: Array<TInput> = [];
24✔
49
    private buffer: Array<TOutput> = [];
24✔
50
    private pool: Set<Promise<void>> = new Set();
24✔
51
    private readonly maxConcurrent: number;
52
    private readonly transform: (chunk: TInput) => Promise<TOutput>;
53

54
    /**
55
     * @constructor
56
     * @param {ParallelStreamOptions<TInput, TOutput>} options - The options for the ParallelStream.
57
     * @param [options.maxConcurrent] {number} - The maximum number of concurrent promises.
58
     * @param [options.transform] {Function} - The function to transform the data returning a promise.
59
     */
60
    constructor(options: ParallelStreamOptions<TInput, TOutput>) {
61
        super(options);
24✔
62
        this.maxConcurrent = options.maxConcurrent;
24✔
63
        this.transform = options.transform;
24✔
64
    }
65

66
    /**
67
     * A method to write data to the stream, push the chunk to the queue, transform it, and then execute the callback.
68
     *
69
     * @param {TInput} chunk - The data chunk to write to the stream.
70
     * @param {BufferEncoding} encoding - The encoding of the data.
71
     * @param {TransformCallback} callback - The callback function to be executed after writing the data.
72
     * @return {void} This function does not return anything.
73
     */
74
    _write(chunk: TInput, encoding: BufferEncoding, callback: TransformCallback): void {
75
        this.queue.push(chunk);
66✔
76
        this._transform();
66✔
77
        callback();
66✔
78
    }
79

80
    /**
81
     * Asynchronously finalizes the stream by draining the queue and buffer, pushing any remaining chunks to the stream,
82
     * and calling the provided callback when complete. If the stream is unable to push a chunk, the chunk is placed back
83
     * into the buffer and a PushError is passed to the callback.
84
     *
85
     * @param {TransformCallback} callback - The callback to be called when the stream is finalized.
86
     * @return {Promise<void>} A promise that resolves when the stream is finalized.
87
     */
88
    async _final(callback: TransformCallback): Promise<void> {
89
        const pushData = ()=>{
18✔
90
            while (this.buffer.length > 0) {
12✔
UNCOV
91
                const chunk = this.buffer.shift() as TOutput;
×
UNCOV
92
                if (!this.push(chunk)) {
×
UNCOV
93
                    this.once("drain", pushData);
×
UNCOV
94
                    return;
×
95
                }
96
            }
97
            this.push(null);
12✔
98
            callback();
12✔
99
        };
100

101
        while (this.queue.length > 0 || this.pool.size > 0) {
18✔
102
            await new Promise(resolve => setImmediate(resolve));
18✔
103
        }
104

105
        pushData();
12✔
106
    }
107

108
    /**
109
     * Pushes the ready chunks to the consumer stream since the buffer is empty or the size limit is reached.
110
     *
111
     * @param {number} size - The size parameter for controlling the read operation.
112
     * @return {void} This function does not return anything.
113
     */
114
    _read(size: number): void {
115
        const handleDrain = () => this._read(size);
66✔
116

117
        while (this.buffer.length > 0 && size > 0) {
66✔
118
            const chunk = this.buffer.shift() as TOutput;
54✔
119
            if (!this.push(chunk)) {
54!
UNCOV
120
                this.once("drain", handleDrain);
×
UNCOV
121
                return;
×
122
            }
123
            size--;
54✔
124
        }
125
    }
126

127
    /**
128
     * Loop through the pool and queue to process chunks, adding promises to the pool.
129
     */
130
    private _transform(): void {
131
        while (this.pool.size < this.maxConcurrent && this.queue.length > 0) {
132✔
132
            const chunk = this.queue.shift() as TInput; // Get the next chunk
66✔
133
            const promise = this.transform(chunk)
66✔
134
                .then((result: TOutput) => {
135
                    this.buffer.push(result);
54✔
136
                    this._read(1);
54✔
137
                })
138
                .catch((err: Error) => {
139
                    this.emit("error", err);
12✔
140
                })
141
                .finally(() => {
142
                    this.pool.delete(promise); // Remove promise from pool
66✔
143
                    this._transform(); // Continue processing remaining chunks
66✔
144
                });
145

146
            this.pool.add(promise); // Add promise to pool
66✔
147
        }
148
    }
149
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc