• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

albe / node-event-storage / 26133545023

20 May 2026 12:24AM UTC coverage: 97.956% (-0.2%) from 98.106%
26133545023

Pull #316

github

web-flow
Merge 515a2b27f into 03ae7bb7f
Pull Request #316: Add `event-storage-http` REST API layer with NDJSON reads, consumer endpoints, and raw buffer streaming

1078 of 1131 branches covered (95.31%)

Branch coverage included in aggregate %.

258 of 270 new or added lines in 10 files covered. (95.56%)

22 existing lines in 5 files now uncovered.

5536 of 5621 relevant lines covered (98.49%)

823.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.19
/src/JoinEventStream.js
1
import EventStream from './EventStream.js';
4✔
2

4✔
3
/** Reusable sentinel used for missing or empty per-stream iterators. */
4✔
4
const emptyIterator = Object.freeze({ next() { return { done: true }; } });
4✔
5

4✔
6
/**
4✔
7
 * Calculate the actual version number from a possibly relative (negative) version number.
4✔
8
 *
4✔
9
 * @param {number} version The version to normalize.
4✔
10
 * @param {number} length The maximum version number
4✔
11
 * @returns {number} The absolute version number.
4✔
12
 */
4✔
13
function normalizeVersion(version, length) {
168✔
14
    return version < 0 ? version + length + 1 : version;
168✔
15
}
168✔
16

4✔
17
/**
4✔
18
 * An event stream is a simple wrapper around an iterator over storage documents.
4✔
19
 * It implements a node readable stream interface.
4✔
20
 */
4✔
21
class JoinEventStream extends EventStream {
4✔
22

4✔
23
    /**
4✔
24
     * @param {string} name The name of the stream.
4✔
25
     * @param {Array<string>} streams The name of the streams to join together.
4✔
26
     * @param {EventStore} eventStore The event store to get the stream from.
4✔
27
     * @param {number} [minRevision] The 1-based minimum revision to include in the events (inclusive).
4✔
28
     * @param {number} [maxRevision] The 1-based maximum revision to include in the events (inclusive).
4✔
29
     * @param {function(object, object): boolean|true|null} [predicate] An optional filter function, or
4✔
30
     *   `true` to activate raw-buffer mode (see {@link EventStream}).
4✔
31
     */
4✔
32
    constructor(name, streams, eventStore, minRevision = 1, maxRevision = -1, predicate = null) {
4✔
33
        super(name, eventStore, minRevision, maxRevision, predicate);
100✔
34
        if (!(streams instanceof Array) || streams.length === 0) {
100✔
35
            throw new Error(`Invalid list of streams supplied to JoinStream ${name}.`);
8✔
36
        }
8✔
37

84✔
38
        this.streamIndex = eventStore.storage.index;
84✔
39
        // Translate revisions to index numbers (1-based) and wrap around negatives
84✔
40
        this.minRevision = normalizeVersion(minRevision, eventStore.length);
84✔
41
        this.maxRevision = normalizeVersion(maxRevision, eventStore.length);
84✔
42
        this.fetch = function() {
84✔
43
            this._next = new Array(streams.length).fill(undefined);
100✔
44
            return streams.map(streamName => {
100✔
45
                const streamIndex = eventStore.streams[streamName]?.index;
308✔
46
                if (!streamIndex || streamIndex.length === 0) {
308✔
47
                    return emptyIterator;
4✔
48
                }
4✔
49
                const from = streamIndex.find(this.minRevision, this.minRevision <= this.maxRevision);
304✔
50
                const until = streamIndex.find(this.maxRevision, this.minRevision > this.maxRevision);
304✔
51
                if (from === 0 || until === 0) {
308!
52
                    // find() returns 0 when the requested revision is outside the stream's range
×
53
                    // (e.g. minRevision > all entries, or maxRevision < all entries).
×
54
                    return emptyIterator;
×
55
                }
×
56
                // Raw mode: get { buffer, time64, sequenceNumber } for binary-header ordering.
304✔
57
                // Object mode: storage deserializes for us and we order by metadata.commitId.
304✔
58
                return eventStore.storage.readRange(from, until, streamIndex, this.raw);
304✔
59
            });
100✔
60
        }
100✔
61
        this._iterator = null;
84✔
62
    }
100✔
63

4✔
64
    /**
4✔
65
     * Returns the value of the iterator at position `index`
4✔
66
     * @param {number} index The iterator position for which to return the next value
4✔
67
     * @returns {*}
4✔
68
     */
4✔
69
    getValue(index) {
4✔
70
        const next = this._iterator[index].next();
728✔
71
        return next.done ? false : next.value;
728✔
72
    }
728✔
73

4✔
74
    /**
4✔
75
     * Returns true if `first` follows `second` in the read direction, meaning `second` should be yielded first.
4✔
76
     *
4✔
77
     * In raw mode: compares by `time64` (epoch-denormalized, from binary header) with `sequenceNumber`
4✔
78
     * as a globally-unique tiebreaker.
4✔
79
     * In object mode: compares by `metadata.commitId` (the global sequence number from the JSON body).
4✔
80
     * @private
4✔
81
     * @param {object} first
4✔
82
     * @param {object} second
4✔
83
     * @returns {boolean}
4✔
84
     */
4✔
85
    follows(first, second) {
4✔
86
        const descending = this.minRevision > this.maxRevision;
1,164✔
87
        const follows = (a, b) => descending ? a < b : a > b;
1,164✔
88
        if (this.raw) {
1,164!
NEW
UNCOV
89
            if (first.time64 !== second.time64) return follows(first.time64, second.time64);
×
NEW
90
            return follows(first.sequenceNumber, second.sequenceNumber);
×
NEW
91
        }
×
92
        return follows(first.metadata.commitId, second.metadata.commitId);
1,164✔
93
    }
1,164✔
94

4✔
95
    /**
4✔
96
     * Returns the next event in merge order.
4✔
97
     *
4✔
98
     * In raw mode: returns `{ buffer, time64, sequenceNumber }` from the binary header — no JSON
4✔
99
     * deserialization. In object mode: returns a deserialized `{ stream, payload, metadata }` document
4✔
100
     * produced by the storage layer.
4✔
101
     * @returns {object|false}
4✔
102
     */
4✔
103
    next() {
4✔
104
        if (!this._iterator) {
520✔
105
            this._iterator = this.fetch();
100✔
106
        }
100✔
107
        while (true) {
520✔
108
            let nextIndex = -1;
520✔
109
            this._next.forEach((value, index) => {
520✔
110
                if (typeof value === 'undefined') {
2,940✔
111
                    value = this._next[index] = this.getValue(index);
728✔
112
                }
728✔
113
                if (value === false) {
2,940✔
114
                    return;
1,356✔
115
                }
1,356✔
116
                if (nextIndex === -1 || this.follows(this._next[nextIndex], value)) {
2,940✔
117
                    nextIndex = index;
512✔
118
                }
512✔
119
            });
520✔
120

520✔
121
            if (nextIndex === -1) {
520✔
122
                return false;
100✔
123
            }
100✔
124
            const next = this._next[nextIndex];
420✔
125
            this._next[nextIndex] = undefined;
420✔
126

420✔
127
            if (this.raw || !this.predicate || this.predicate(next.payload, next.metadata)) {
520!
128
                return next;
420✔
129
            }
420✔
130
        }
520✔
131
    }
520✔
132

4✔
133
}
4✔
134

4✔
135
export default JoinEventStream;
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc