• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

albe / node-event-storage / 25625230942

10 May 2026 09:29AM UTC coverage: 97.684% (-0.4%) from 98.106%
25625230942

Pull #303

github

web-flow
Merge 76b11b498 into b9161664a
Pull Request #303: feat(Storage): async partition + index scan in open() with 'opened' event and callback hook

1036 of 1086 branches covered (95.4%)

Branch coverage included in aggregate %.

100 of 102 new or added lines in 4 files covered. (98.04%)

39 existing lines in 5 files now uncovered.

5416 of 5519 relevant lines covered (98.13%)

808.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.08
/src/JoinEventStream.js
1
import EventStream from './EventStream.js';
4✔
2
import { wrapAndCheck } from './util.js';
4✔
3

4✔
4
/** Reusable sentinel used for missing or empty per-stream iterators. */
4✔
5
const emptyIterator = Object.freeze({ next() { return { done: true }; } });
4✔
6

4✔
7
/**
4✔
8
 * Calculate the actual version number from a possibly relative (negative) version number.
4✔
9
 *
4✔
10
 * @param {number} version The version to normalize.
4✔
11
 * @param {number} length The maximum version number
4✔
12
 * @returns {number} The absolute version number.
4✔
13
 */
4✔
14
function normalizeVersion(version, length) {
168✔
15
    return version < 0 ? version + length + 1 : version;
168✔
16
}
168✔
17

4✔
18
/**
4✔
19
 * An event stream is a simple wrapper around an iterator over storage documents.
4✔
20
 * It implements a node readable stream interface.
4✔
21
 */
4✔
22
class JoinEventStream extends EventStream {
4✔
23

4✔
24
    /**
4✔
25
     * @param {string} name The name of the stream.
4✔
26
     * @param {Array<string>} streams The name of the streams to join together.
4✔
27
     * @param {EventStore} eventStore The event store to get the stream from.
4✔
28
     * @param {number} [minRevision] The 1-based minimum revision to include in the events (inclusive).
4✔
29
     * @param {number} [maxRevision] The 1-based maximum revision to include in the events (inclusive).
4✔
30
     * @param {function(object, object): boolean|null} [predicate] An optional filter function
4✔
31
     *   `(payload, metadata) => boolean`.  Only events for which this returns truthy are yielded.
4✔
32
     */
4✔
33
    constructor(name, streams, eventStore, minRevision = 1, maxRevision = -1, predicate = null) {
4✔
34
        super(name, eventStore, minRevision, maxRevision, predicate);
100✔
35
        if (!(streams instanceof Array) || streams.length === 0) {
100✔
36
            throw new Error(`Invalid list of streams supplied to JoinStream ${name}.`);
8✔
37
        }
8✔
38

84✔
39
        this.streamIndex = eventStore.storage.index;
84✔
40
        // Translate revisions to index numbers (1-based) and wrap around negatives
84✔
41
        this.minRevision = normalizeVersion(minRevision, eventStore.length);
84✔
42
        this.maxRevision = normalizeVersion(maxRevision, eventStore.length);
84✔
43
        this.fetch = function() {
84✔
44
            this._next = new Array(streams.length).fill(undefined);
100✔
45
            return streams.map(streamName => {
100✔
46
                const streamIndex = eventStore.streams[streamName]?.index;
308✔
47
                if (!streamIndex || streamIndex.length === 0) {
308✔
48
                    return emptyIterator;
4✔
49
                }
4✔
50
                const from = streamIndex.find(this.minRevision, this.minRevision <= this.maxRevision);
304✔
51
                const until = streamIndex.find(this.maxRevision, this.minRevision > this.maxRevision);
304✔
52
                if (from === 0 || until === 0) {
308!
UNCOV
53
                    // find() returns 0 when the requested revision is outside the stream's range
×
UNCOV
54
                    // (e.g. minRevision > all entries, or maxRevision < all entries).
×
UNCOV
55
                    return emptyIterator;
×
UNCOV
56
                }
×
57
                return eventStore.storage.readRange(from, until, streamIndex);
304✔
58
            });
100✔
59
        }
100✔
60
        this._iterator = null;
84✔
61
    }
100✔
62

4✔
63
    /**
4✔
64
     * Returns the value of the iterator at position `index`
4✔
65
     * @param {number} index The iterator position for which to return the next value
4✔
66
     * @returns {*}
4✔
67
     */
4✔
68
    getValue(index) {
4✔
69
        const next = this._iterator[index].next();
728✔
70
        return next.done ? false : next.value;
728✔
71
    }
728✔
72

4✔
73
    /**
4✔
74
     * @private
4✔
75
     * @param {number} first
4✔
76
     * @param {number} second
4✔
77
     * @returns {boolean} If the first item follows after the second in the given read order determined by this.minRevision and this.maxRevision.
4✔
78
     */
4✔
79
    follows(first, second) {
4✔
80
        return (this.minRevision > this.maxRevision ? first < second : first > second);
1,164✔
81
    }
1,164✔
82

4✔
83
    /**
4✔
84
     * @private
4✔
85
     * @returns {object|boolean} The next event or false if no more events in the stream.
4✔
86
     */
4✔
87
    next() {
4✔
88
        if (!this._iterator) {
520✔
89
            this._iterator = this.fetch();
100✔
90
        }
100✔
91
        while (true) {
520✔
92
            let nextIndex = -1;
520✔
93
            this._next.forEach((value, index) => {
520✔
94
                if (typeof value === 'undefined') {
2,940✔
95
                    value = this._next[index] = this.getValue(index);
728✔
96
                }
728✔
97
                if (value === false) {
2,940✔
98
                    return;
1,356✔
99
                }
1,356✔
100
                if (nextIndex === -1 || this.follows(this._next[nextIndex].metadata.commitId, value.metadata.commitId)) {
2,940✔
101
                    nextIndex = index;
512✔
102
                }
512✔
103
            });
520✔
104

520✔
105
            if (nextIndex === -1) {
520✔
106
                return false;
100✔
107
            }
100✔
108
            const next = this._next[nextIndex];
420✔
109
            this._next[nextIndex] = undefined;
420✔
110
            if (!this.predicate || this.predicate(next.payload, next.metadata)) {
520!
111
                return next;
420✔
112
            }
420✔
113
        }
520✔
114
    }
520✔
115

4✔
116
}
4✔
117

4✔
118
export default JoinEventStream;
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc