• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

palcarazm / batchjs / 14983613232

12 May 2025 10:09PM UTC coverage: 90.019% (+2.2%) from 87.809%
14983613232

push

github

palcarazm
1.1.3

87 of 111 branches covered (78.38%)

Branch coverage included in aggregate %.

391 of 420 relevant lines covered (93.1%)

43.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.1
/src/streams/classes/ParallelStream.ts
1
import { TransformCallback  } from "stream";
2
import { ObjectDuplex, ObjectDuplexOptions } from "../interfaces/_index";
108✔
3

4
/**
5
 * @interface
6
 * Options for the ParallelStream.
7
 * @extends ObjectDuplexOptions
8
 * @template TInput The type of the input data.
9
 * @template TOutput The type of the output data.
10
 */
11
export interface ParallelStreamOptions<TInput, TOutput> extends ObjectDuplexOptions {
12
    maxConcurrent: number;
13
    transform: (chunk: TInput) => Promise<TOutput>;
14
}
15

16
/**
17
 * @class
18
 * Class that allows you to transform and stream data in parallel.
19
 * @extends ObjectDuplex
20
 * @template TInput The type of the input data.
21
 * @template TOutput The type of the output data.
22
 * @example
23
 * ```typescript
24
 * const stream:ParallelStream<string,string> = new ParallelStream({
25
 *     objectMode: true,
26
 *     maxConcurrent: 2,
27
 *     transform(chunk: string) {
28
 *         return Promise.resolve(chunk.toUpperCase());
29
 *     },
30
 * });
31
 * 
32
 * stream.write("data1");
33
 * stream.write("data2");
34
 * stream.write("data3");
35
 * stream.end();
36
 * 
37
 * stream.on("data", (chunk: string) => {
38
 *     console.log(``Pushed chunk: ${chunk}```);
39
 * });
40
 * ```
41
 * ```shell
42
 * >> Pushed chunk: DATA1
43
 * >> Pushed chunk: DATA2
44
 * >> Pushed chunk: DATA3
45
 * ```
46
 */
47
export class ParallelStream<TInput, TOutput> extends ObjectDuplex {
108✔
48
    private queue: Array<TInput> = [];
24✔
49
    private buffer: Array<TOutput> = [];
24✔
50
    private pool: Set<Promise<void>> = new Set();
24✔
51
    private readonly maxConcurrent: number;
52
    private readonly transform: (chunk: TInput) => Promise<TOutput>;
53

54
    /**
55
     * @constructor
56
     * @param {ParallelStreamOptions<TInput, TOutput>} options - The options for the ParallelStream.
57
     * @param [options.maxConcurrent] {number} - The maximum number of concurrent promises.
58
     * @param [options.transform] {Function} - The function to transform the data returning a promise.
59
     */
60
    constructor(options: ParallelStreamOptions<TInput, TOutput>) {
61
        super(options);
24✔
62
        this.maxConcurrent = options.maxConcurrent;
24✔
63
        this.transform = options.transform;
24✔
64
    }
65

66
    /**
67
     * A method to write data to the stream, push the chunk to the queue, transform it, and then execute the callback.
68
     *
69
     * @param {TInput} chunk - The data chunk to write to the stream.
70
     * @param {BufferEncoding} encoding - The encoding of the data.
71
     * @param {TransformCallback} callback - The callback function to be executed after writing the data.
72
     * @return {void} This function does not return anything.
73
     */
74
    _write(chunk: TInput, encoding: BufferEncoding, callback: TransformCallback): void {
75
        this.queue.push(chunk);
66✔
76
        this._transform();
66✔
77
        callback();
66✔
78
    }
79

80
    /**
81
     * Asynchronously finalizes the stream by draining the queue and buffer, pushing any remaining chunks to the stream,
82
     * and calling the provided callback when complete. If the stream is unable to push a chunk, the chunk is placed back
83
     * into the buffer and a PushError is passed to the callback.
84
     *
85
     * @param {TransformCallback} callback - The callback to be called when the stream is finalized.
86
     * @return {Promise<void>} A promise that resolves when the stream is finalized.
87
     */
88
    async _final(callback: TransformCallback): Promise<void> {
89
        while (this.queue.length > 0 || this.pool.size > 0) {
18✔
90
            await new Promise(resolve => setImmediate(resolve));
18✔
91
        }
92

93
        while (this.buffer.length > 0) {
12✔
94
            const chunk = this.buffer.shift() as TOutput;
×
95
            this.push(chunk);
×
96
        }
97
        this.push(null);
12✔
98
        callback();
12✔
99
    }
100

101
    /**
102
     * Pushes the ready chunks to the consumer stream since the buffer is empty or the size limit is reached.
103
     *
104
     * @param {number} size - The size parameter for controlling the read operation.
105
     * @return {void} This function does not return anything.
106
     */
107
    _read(size: number): void {
108
        while (this.buffer.length > 0 && size > 0) {
24✔
109
            const chunk = this.buffer.shift() as TOutput;
54✔
110
            if (!this.push(chunk)) {
54!
111
                return;
×
112
            }
113
            size--;
54✔
114
        }
115
    }
116

117
    /**
118
     * Loop through the pool and queue to process chunks, adding promises to the pool.
119
     */
120
    private _transform(): void {
121
        while (this.pool.size < this.maxConcurrent && this.queue.length > 0) {
132✔
122
            const chunk = this.queue.shift() as TInput; // Get the next chunk
66✔
123
            const promise = this.transform(chunk)
66✔
124
                .then((result: TOutput) => {
125
                    this.buffer.push(result);
54✔
126
                })
127
                .catch((err: Error) => {
128
                    this.emit("error", err);
12✔
129
                })
130
                .finally(() => {
131
                    this.pool.delete(promise); // Remove promise from pool
66✔
132
                    this._transform(); // Continue processing remaining chunks
66✔
133
                });
134

135
            this.pool.add(promise); // Add promise to pool
66✔
136
        }
137
    }
138
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc