• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

albe / node-event-storage / 25625230942

10 May 2026 09:29AM UTC coverage: 97.684% (-0.4%) from 98.106%
25625230942

Pull #303

github

web-flow
Merge 76b11b498 into b9161664a
Pull Request #303: feat(Storage): async partition + index scan in open() with 'opened' event and callback hook

1036 of 1086 branches covered (95.4%)

Branch coverage included in aggregate %.

100 of 102 new or added lines in 4 files covered. (98.04%)

39 existing lines in 5 files now uncovered.

5416 of 5519 relevant lines covered (98.13%)

808.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.72
/src/EventStream.js
1
import stream from 'stream';
4✔
2
import { assert } from './util.js';
4✔
3

4✔
4
/**
4✔
5
 * Calculate the actual version number from a possibly relative (negative) version number.
4✔
6
 *
4✔
7
 * @param {number} version The version to normalize.
4✔
8
 * @param {number} length The maximum version number
4✔
9
 * @returns {number} The absolute version number.
4✔
10
 */
4✔
11
function normalizeVersion(version, length) {
2,328✔
12
    return version < 0 ? version + length + 1 : version;
2,328✔
13
}
2,328✔
14

4✔
15
/**
4✔
16
 * Return the lower absolute version given a version and a maxVersion constraint.
4✔
17
 * @param {number} version
4✔
18
 * @param {number} maxVersion
4✔
19
 * @returns {number}
4✔
20
 */
4✔
21
function minVersion(version, maxVersion) {
1,188✔
22
    return Math.min(version, maxVersion < 0 ? version + maxVersion + 1 : maxVersion);
1,188✔
23
}
1,188✔
24

4✔
25
/**
4✔
26
 * An event stream is a simple wrapper around an iterator over storage documents.
4✔
27
 * It implements a node readable stream interface.
4✔
28
 */
4✔
29
class EventStream extends stream.Readable {
4✔
30

4✔
31
    /**
4✔
32
     * @param {string} name The name of the stream.
4✔
33
     * @param {EventStore} eventStore The event store to get the stream from.
4✔
34
     * @param {number} [minRevision] The minimum revision to include in the events (inclusive).
4✔
35
     * @param {number} [maxRevision] The maximum revision to include in the events (inclusive).
4✔
36
     * @param {function(object, object): boolean|null} [predicate] An optional filter function
4✔
37
     *   `(payload, metadata) => boolean`.  Only events for which this returns truthy are yielded.
4✔
38
     */
4✔
39
    constructor(name, eventStore, minRevision = 1, maxRevision = -1, predicate = null) {
4✔
40
        super({ objectMode: true });
1,316✔
41
        assert(typeof name === 'string' && name !== '', 'Need to specify a stream name.');
1,316✔
42
        assert(typeof eventStore === 'object' && eventStore !== null, `Need to provide EventStore instance to create EventStream ${name}.`);
1,316✔
43

1,316✔
44
        this.name = name;
1,316✔
45
        this.predicate = predicate || null;
1,316✔
46
        if (eventStore.streams[name]) {
1,316✔
47
            this.streamIndex = eventStore.streams[name].index;
1,152✔
48
            this.minRevision = normalizeVersion(minRevision, this.streamIndex.length);
1,152✔
49
            this.maxRevision = normalizeVersion(maxRevision, this.streamIndex.length);
1,152✔
50
            this.version = minVersion(this.streamIndex.length, maxRevision);
1,152✔
51
            this._iterator = null;
1,152✔
52
            this.fetch = function() {
1,152✔
53
                return eventStore.storage.readRange(this.minRevision, this.maxRevision, this.streamIndex);
280✔
54
            }
280✔
55
        } else {
1,316✔
56
            this.streamIndex = { length: 0 };
144✔
57
            this.version = -1;
144✔
58
            this._iterator = { next() { return { done: true }; } };
144✔
59
        }
144✔
60
    }
1,316✔
61

4✔
62
    /**
4✔
63
     * @api
4✔
64
     * @param {number} revision The event revision to start reading from (inclusive).
4✔
65
     * @returns {EventStream}
4✔
66
     */
4✔
67
    from(revision) {
4✔
68
        this.minRevision = normalizeVersion(revision, this.streamIndex.length);
20✔
69
        return this;
20✔
70
    }
20✔
71

4✔
72
    /**
4✔
73
     * @api
4✔
74
     * @param {number} revision The event revision to read until (inclusive).
4✔
75
     * @returns {EventStream}
4✔
76
     */
4✔
77
    until(revision) {
4✔
78
        this.maxRevision = normalizeVersion(revision, this.streamIndex.length);
4✔
79
        this.version = minVersion(this.streamIndex.length, this.maxRevision);
4✔
80
        return this;
4✔
81
    }
4✔
82

4✔
83
    /**
4✔
84
     * @api
4✔
85
     * @param {number} amount The amount of events at the start of the stream to return in chronological order.
4✔
86
     * @returns {EventStream}
4✔
87
     */
4✔
88
    first(amount) {
4✔
89
        return this.fromStart().following(amount);
12✔
90
    }
12✔
91

4✔
92
    /**
4✔
93
     * @api
4✔
94
     * @param {number} amount The amount of events at the end of the stream to return in chronological order.
4✔
95
     * @returns {EventStream}
4✔
96
     */
4✔
97
    last(amount) {
4✔
98
        return this.fromEnd().previous(amount).forwards();
12✔
99
    }
12✔
100

4✔
101
    /**
4✔
102
     * @api
4✔
103
     * @returns {EventStream}
4✔
104
     */
4✔
105
    fromStart() {
4✔
106
        this.minRevision = 1;
36✔
107
        return this;
36✔
108
    }
36✔
109

4✔
110
    /**
4✔
111
     * @api
4✔
112
     * @returns {EventStream}
4✔
113
     */
4✔
114
    fromEnd() {
4✔
115
        this.minRevision = this.streamIndex.length;
28✔
116
        return this;
28✔
117
    }
28✔
118

4✔
119
    /**
4✔
120
     * @param {number} amount The amount of events to return in reverse chronological order.
4✔
121
     * @returns {EventStream}
4✔
122
     */
4✔
123
    previous(amount) {
4✔
124
        this.maxRevision = Math.max(1, this.minRevision - amount + 1);
20✔
125
        return this;
20✔
126
    }
20✔
127

4✔
128
    /**
4✔
129
     * @param {number} amount The amount of events to return in chronological order.
4✔
130
     * @returns {EventStream}
4✔
131
     */
4✔
132
    following(amount) {
4✔
133
        this.maxRevision = Math.min(this.streamIndex.length, this.minRevision + amount - 1);
16✔
134
        return this;
16✔
135
    }
16✔
136

4✔
137
    /**
4✔
138
     * @api
4✔
139
     * @returns {EventStream}
4✔
140
     */
4✔
141
    toEnd() {
4✔
142
        this.maxRevision = this.version = this.streamIndex.length;
32✔
143
        return this;
32✔
144
    }
32✔
145

4✔
146
    /**
4✔
147
     * @api
4✔
148
     * @returns {EventStream}
4✔
149
     */
4✔
150
    toStart() {
4✔
151
        this.maxRevision = 1;
12✔
152
        return this;
12✔
153
    }
12✔
154

4✔
155
    /**
4✔
156
     * Reverse the current range of events, no matter which direction it currently has.
4✔
157
     * @returns {EventStream}
4✔
158
     */
4✔
159
    reverse() {
4✔
160
        let tmp = this.maxRevision;
32✔
161
        this.maxRevision = this.minRevision;
32✔
162
        this.minRevision = tmp;
32✔
163
        this.version = minVersion(this.streamIndex.length, this.maxRevision);
32✔
164
        return this;
32✔
165
    }
32✔
166

4✔
167
    /**
4✔
168
     * Make the current range of events read in forward chronological order.
4✔
169
     * @api
4✔
170
     * @param {number} [amount] Amount of events to read forward. If not specified, will read forward until the previously set limit.
4✔
171
     * @returns {EventStream}
4✔
172
     */
4✔
173
    forwards(amount = 0) {
4✔
174
        if (amount > 0) {
24✔
175
            this.following(amount);
4✔
176
        }
4✔
177
        if (this.maxRevision < this.minRevision) {
24✔
178
            this.reverse();
16✔
179
        }
16✔
180
        return this;
24✔
181
    }
24✔
182

4✔
183
    /**
4✔
184
     * Make the current range of events read in backward chronological order.
4✔
185
     * @api
4✔
186
     * @param {number} [amount] Amount of events to read backward. If not specified, will read backward until the previously set limit.
4✔
187
     * @returns {EventStream}
4✔
188
     */
4✔
189
    backwards(amount = 0) {
4✔
190
        if (amount > 0) {
24✔
191
            this.previous(amount);
4✔
192
        }
4✔
193
        if (this.maxRevision > this.minRevision) {
24✔
194
            this.reverse();
16✔
195
        }
16✔
196
        return this;
24✔
197
    }
24✔
198

4✔
199
    /**
4✔
200
     * Will iterate over all events in this stream and return an array of the events.
4✔
201
     *
4✔
202
     * @returns {Array<object>}
4✔
203
     */
4✔
204
    get events() {
4✔
205
        if (this._events instanceof Array) {
352✔
206
            return this._events;
132✔
207
        }
132✔
208
        this._events = [];
220✔
209
        let next;
220✔
210
        while ((next = this.next()) !== false) {
352✔
211
            this._events.push(next.payload);
972✔
212
        }
972✔
213
        return this._events;
220✔
214
    }
352✔
215

4✔
216
    /**
4✔
217
     * Iterate over the events in this stream with a callback.
4✔
218
     * This method is useful to gain access to the event metadata.
4✔
219
     *
4✔
220
     * @api
4✔
221
     * @param {function(object, object, string)} callback A callback function that will receive the event, the storage metadata and the original stream name for every event in this stream.
4✔
222
     */
4✔
223
    forEach(callback) {
4✔
224
        let next;
8✔
225
        while ((next = this.next()) !== false) {
8✔
226
            callback(next.payload, next.metadata, next.stream);
24✔
227
        }
24✔
228
    }
8✔
229

4✔
230
    /**
4✔
231
     * Iterator implementation. Iterate over the stream in a `for ... of` loop.
4✔
232
     */
4✔
233
    *[Symbol.iterator]() {
4✔
234
        let next;
136✔
235
        while ((next = this.next()) !== false) {
136✔
236
            yield next.payload;
568✔
237
        }
560✔
238
    }
136✔
239

4✔
240
    /**
4✔
241
     * Reset this stream to the start so it can be iterated again.
4✔
242
     * @returns {EventStream}
4✔
243
     */
4✔
244
    reset() {
4✔
245
        this._iterator = null;
48✔
246
        this._events = null;
48✔
247
        return this;
48✔
248
    }
48✔
249

4✔
250
    /**
4✔
251
     * Apply a filter predicate to this stream.  Only events for which `predicate(payload, metadata)`
4✔
252
     * returns a truthy value will be yielded.  The predicate is stored as a first-class property
4✔
253
     * of the stream and applied in {@link EventStream#next}.
4✔
254
     *
4✔
255
     * @api
4✔
256
     * @param {function(object, object): boolean} predicate A function receiving `(payload, metadata)`.
4✔
257
     *   Events for which the predicate returns falsy are skipped.
4✔
258
     * @returns {EventStream} `this`
4✔
259
     */
4✔
260
    filter(predicate) {
4✔
UNCOV
261
        this.predicate = predicate || null;
×
UNCOV
262
        this._iterator = null;
×
UNCOV
263
        this._events = null;
×
UNCOV
264
        return this;
×
UNCOV
265
    }
×
266

4✔
267
    /**
4✔
268
     * @returns {object|boolean} The next event or false if no more events in the stream.
4✔
269
     */
4✔
270
    next() {
4✔
271
        if (!this._iterator) {
1,456✔
272
            this._iterator = this.fetch();
280✔
273
        }
280✔
274
        try {
1,456✔
275
            while (true) {
1,456✔
276
                const result = this._iterator.next();
1,464✔
277
                if (result.done) return false;
1,464✔
278
                if (!this.predicate || this.predicate(result.value.payload, result.value.metadata)) {
1,464✔
279
                    return result.value;
1,188✔
280
                }
1,188✔
281
            }
1,464✔
282
        } catch(e) {
1,456✔
283
            return false;
8✔
284
        }
8✔
285
    }
1,456✔
286

4✔
287
    // noinspection JSUnusedGlobalSymbols
4✔
288
    /**
4✔
289
     * Readable stream implementation.
4✔
290
     * @private
4✔
291
     */
4✔
292
    _read() {
4✔
293
        const next = this.next();
32✔
294
        this.push(next ? next.payload : null);
32✔
295
    }
32✔
296

4✔
297
}
4✔
298

4✔
299
export default EventStream;
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc