• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

palcarazm / batchjs / 17580371047

09 Sep 2025 10:57AM UTC coverage: 98.387% (-1.4%) from 99.825%
17580371047

push

github

web-flow
Merge pull request #58 from palcarazm/fix/1.2.2

fix: prevent write callback call before flushing

89 of 90 branches covered (98.89%)

Branch coverage included in aggregate %.

31 of 39 new or added lines in 8 files covered. (79.49%)

460 of 468 relevant lines covered (98.29%)

75.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.44
/src/streams/classes/ReplayStream.ts
1
import { TransformCallback, Readable } from "stream";
120✔
2
import { NotClosedError } from "../errors/NotClosedError";
120✔
3
import { InternalBufferDuplex, ObjectDuplexOptions } from "../interfaces/_index";
120✔
4

5
/**
6
 * @class
7
 * Class that allows you to remit chunks from a stream when the source is finished.
8
 * @extends ObjectDuplex
9
 * @template T
10
 * @example
11
 * ```typescript
12
 * const stream:ReplayStream<string> = new ReplayStream({
13
 *     objectMode: true,
14
 * });
15
 * 
16
 * stream.write("data1");
17
 * stream.write("data2");
18
 * stream.write("data3");
19
 * stream.end();
20
 * 
21
 * stream.on("data", (chunk: string) => {
22
 *     console.log(``Pushed chunk: ${chunk}```);
23
 * }).on("close", () => {
24
 *     stream.replay().on("data", (chunk: string) => {
25
 *         console.log(`Replayed chunk: ${chunk}`);
26
 *     });
27
 * });
28
 * ```
29
 * ```shell
30
 * >> Pushed chunk: data1
31
 * >> Pushed chunk: data2
32
 * >> Pushed chunk: data3
33
 * >> Replayed chunk: data1
34
 * >> Replayed chunk: data2
35
 * >> Replayed chunk: data3
36
 * ```
37
 */
38
export class ReplayStream<T> extends InternalBufferDuplex<T,T> {
120✔
39
    private memory: T[] = [];
18✔
40
    private index:number = 0;
18✔
41

42
    /**
43
     * @constructor
44
     * @param {ObjectDuplexOptions} options - The options for the ReplayStream.
45
     * @param [options.objectMode=true] {true} - Whether the stream should operate in object mode.
46
     */
47
    constructor(options: ObjectDuplexOptions) {
48
        super(options);
18✔
49
    }
50

51
    /**
52
     * A method to write data to the stream, push the chunk to the buffer, and execute the callback.
53
     *
54
     * @param {T} chunk - The data chunk to write to the stream.
55
     * @param {BufferEncoding} encoding - The encoding of the data.
56
     * @param {TransformCallback} callback - The callback function to be executed after writing the data.
57
     * @return {void} This function does not return anything.
58
     */
59
    _write(chunk: T, encoding: BufferEncoding, callback: TransformCallback): void {
60
        this.memory.push(chunk);
54✔
61
        this.buffer.push(chunk);
54✔
62
        this._flush()
54✔
63
            .then(()=>callback())
54✔
NEW
64
            .catch((e)=>callback(e));
×
65
    }
66

67
    /**
68
     * Creates a readable stream from the buffer to replay the data that have been pushed.
69
     * @returns {Readable} The replay stream.
70
     * @throws {NotClosedError} If the stream is not closed, so the buffer is not already completed to be replayed.
71
     */
72
    replay(): Readable {
73
        if (this.closed) {
12✔
74
            return Readable.from(this.memory,{objectMode: true});
6✔
75
        }else{
76
            throw new NotClosedError();
6✔
77
        }
78
    }
79
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc