• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

albe / node-event-storage / 28643799422

03 Jul 2026 06:51AM UTC coverage: 98.257% (-0.06%) from 98.321%
28643799422

Pull #327

github

web-flow
Merge 14f2c2b30 into 4ded1f51c
Pull Request #327: Add support for algebraic stream joins

1438 of 1496 branches covered (96.12%)

Branch coverage included in aggregate %.

327 of 332 new or added lines in 3 files covered. (98.49%)

1 existing line in 1 file now uncovered.

7073 of 7166 relevant lines covered (98.7%)

842.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.13
/src/JoinEventStream.js
1
import EventStream from './EventStream.js';
4✔
2
import { assert, iterate } from './utils/util.js';
4✔
3
import { union, intersect } from './utils/indexUtil.js';
4✔
4
import { normalizeRevision } from './utils/apiHelpers.js';
4✔
5

4✔
6
/**
4✔
7
 * JoinEventStream is a virtual stream over one or multiple physical stream indexes.
4✔
8
 *
4✔
9
 * It is the canonical implementation behind `EventStore.fromStreams(...)` and supports
4✔
10
 * nested selector algebra with alternating operators by depth:
4✔
11
 *
4✔
12
 * - depth 0 (top-level array): OR
4✔
13
 * - depth 1: AND
4✔
14
 * - depth 2: OR
4✔
15
 * - ... alternating by depth parity
4✔
16
 *
4✔
17
 * Flat arrays (e.g. `['a', 'b']`) therefore keep the legacy join semantics (OR).
4✔
18
 *
4✔
19
 * @extends EventStream
4✔
20
 */
4✔
21
class JoinEventStream extends EventStream {
4✔
22

4✔
23
    /**
4✔
24
     * @param {string} name The name of this virtual stream.
4✔
25
     * @param {Array<string|Array>} selector Stream selector as flat or nested arrays.
4✔
26
     * @param {EventStore} eventStore The event store instance.
4✔
27
     * @param {number} [minRevision=1] Global minimum revision (inclusive).
4✔
28
     * @param {number} [maxRevision=-1] Global maximum revision (inclusive).
4✔
29
     * @param {function|object|null} [predicate] Optional matcher (same semantics as EventStream).
4✔
30
     * @param {boolean} [raw=false] If true, emit NDJSON buffers.
4✔
31
     */
4✔
32
    constructor(name, selector, eventStore, minRevision = 1, maxRevision = -1, predicate = null, raw = false) {
4✔
33
        super(name, eventStore, minRevision, maxRevision, predicate, raw);
208✔
34
        assert(Array.isArray(selector) && selector.length > 0, `Invalid selector supplied to JoinEventStream ${name}.`);
208✔
35

208✔
36
        this.eventStore = eventStore;
208✔
37
        this.selector = this.normalizeSelector(selector);
208✔
38
        this.streamIndex = eventStore.storage.index;
208✔
39
        this.minRevision = normalizeRevision(minRevision, eventStore.length);
208✔
40
        this.maxRevision = normalizeRevision(maxRevision, eventStore.length);
208✔
41
        this.version = this.streamIndex.length;
208✔
42

208✔
43
        this._combinedRanges = null;
208✔
44

208✔
45
        this.fetch = () => this.iterateDocuments();
208✔
46
        this._iterator = null;
208✔
47
    }
208✔
48

4✔
49
    /**
4✔
50
     * Normalize selector shape to nested arrays and validate leaf types.
4✔
51
     * Missing streams are allowed here to keep direct JoinEventStream construction compatible;
4✔
52
     * they are treated as empty ranges.
4✔
53
     *
4✔
54
     * @private
4✔
55
     * @param {Array<string|Array>} selector
4✔
56
     * @returns {Array<string|Array>}
4✔
57
     */
4✔
58
    normalizeSelector(selector) {
4✔
59
        const normalized = [];
208✔
60
        for (const node of selector) {
208✔
61
            if (typeof node === 'string') {
444✔
62
                assert(node.length > 0, 'Stream names must be non-empty strings.');
428✔
63
                normalized.push(node);
428✔
64
                continue;
428✔
65
            }
428✔
66

16✔
67
            assert(Array.isArray(node) && node.length > 0, 'Each selector node must be a non-empty stream name array or string.');
444✔
68
            normalized.push(this.normalizeSelector(node));
444✔
69
        }
444✔
70
        return normalized;
208✔
71
    }
208✔
72

4✔
73
    /**
4✔
74
     * Resolve and cache the combined index-entry ranges for the full selector.
4✔
75
     *
4✔
76
     * @private
4✔
77
     * @returns {Array<Array<number>>}
4✔
78
     */
4✔
79
    resolveCombinedRanges() {
4✔
80
        if (this._combinedRanges) {
200!
NEW
81
            return this._combinedRanges;
×
UNCOV
82
        }
×
83

200✔
84
        this._combinedRanges = this.resolveSelectorRanges(this.selector);
200✔
85
        return this._combinedRanges;
200✔
86
    }
200✔
87

4✔
88
    /**
4✔
89
     * Optimize a selector tree at the given depth.
4✔
90
     * Currently, optimizes away occurrences of _all.
4✔
91
     *
4✔
92
     * @private
4✔
93
     * @param {Array<string|Array>} selectorNode
4✔
94
     * @param {number} depth
4✔
95
     * @returns {Array<Array<number>>|string}
4✔
96
     */
4✔
97
    optimize(selectorNode, depth) {
4✔
98
        if (depth % 2 !== 0) {
216✔
99
            return selectorNode.filter(node => node !== '_all');
12✔
100
        }
12✔
101
        if (selectorNode.some(node => node === '_all')) {
216!
NEW
102
            return '_all';
×
NEW
103
        }
×
104
        return selectorNode;
204✔
105
    }
216✔
106

4✔
107
    /**
4✔
108
     * Resolve one selector node into a sorted index-entry range.
4✔
109
     *
4✔
110
     * @private
4✔
111
     * @param {string|Array<string|Array>} selectorNode
4✔
112
     * @param {number} [depth=0]
4✔
113
     * @returns {Array<Array<number>>}
4✔
114
     */
4✔
115
    resolveSelectorRanges(selectorNode, depth = 0) {
4✔
116
        if (typeof selectorNode === 'string') {
660✔
117
            const index = this.eventStore.streams[selectorNode]?.index;
444✔
118
            return this.resolveIndexRange(index);
444✔
119
        }
444✔
120
        selectorNode = this.optimize(selectorNode, depth);
216✔
121

216✔
122
        const childRanges = selectorNode.map(node => this.resolveSelectorRanges(node, depth + 1));
216✔
123
        return depth % 2 === 0 ? union(...childRanges) : intersect(...childRanges);
660✔
124
    }
660✔
125

4✔
126
    /**
4✔
127
     * Resolve one stream index to the global revision-bounded entry range.
4✔
128
     *
4✔
129
     * @private
4✔
130
     * @param {object|undefined} index
4✔
131
     * @returns {Array<Array<number>>}
4✔
132
     */
4✔
133
    resolveIndexRange(index) {
4✔
134
        if (!index || index.length === 0) {
444✔
135
            return [];
4✔
136
        }
4✔
137

440✔
138
        const ascending = this.minRevision <= this.maxRevision;
440✔
139
        const from = index.find(this.minRevision, ascending);
440✔
140
        const until = index.find(this.maxRevision, !ascending);
440✔
141
        if (
440✔
142
            from === 0 ||
440✔
143
            until === 0 ||
444✔
144
            (ascending ? from > until : from < until)
440✔
145
        ) {
444✔
146
            return [];
4✔
147
        }
4✔
148

436✔
149
        const rangeFrom = ascending ? from : until;
444✔
150
        const rangeUntil = ascending ? until : from;
444✔
151
        return index.range(rangeFrom, rangeUntil) || [];
444!
152
    }
444✔
153

4✔
154
    /**
4✔
155
     * Iterate matching index entries in requested direction.
4✔
156
     *
4✔
157
     * @private
4✔
158
     * @returns {Generator<Array<number>>}
4✔
159
     */
4✔
160
    *iterateEntries() {
4✔
161
        const entries = this.resolveCombinedRanges();
200✔
162
        const forwards = this.minRevision <= this.maxRevision;
200✔
163
        yield* iterate(entries, forwards);
200✔
164
    }
200✔
165

4✔
166
    /**
4✔
167
     * Iterate storage documents lazily from combined index entries.
4✔
168
     *
4✔
169
     * @private
4✔
170
     * @returns {Generator<object|{ buffer: Buffer, time64: number, sequenceNumber: number }>}
4✔
171
     */
4✔
172
    *iterateDocuments() {
4✔
173
        const forwards = this.minRevision <= this.maxRevision;
200✔
174
        for (const entry of this.iterateEntries()) {
200✔
175
            const event = this.eventStore.storage.readFrom(
592✔
176
                entry.partition,
592✔
177
                entry.position,
592✔
178
                entry.size,
592✔
179
                this.raw,
592✔
180
                !forwards
592✔
181
            );
592✔
182
            if (event) {
592✔
183
                yield event;
592✔
184
            }
580✔
185
        }
592✔
186
    }
200✔
187

4✔
188
    /**
4✔
189
     * Reset fetch state and cached selector ranges when stream boundaries changed.
4✔
190
     *
4✔
191
     * @returns {JoinEventStream}
4✔
192
     */
4✔
193
    reset() {
4✔
194
        this._combinedRanges = null;
20✔
195
        return super.reset();
20✔
196
    }
20✔
197
}
4✔
198

4✔
199
export default JoinEventStream;
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc