• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

albe / node-event-storage / 26133545023

20 May 2026 12:24AM UTC coverage: 97.956% (-0.2%) from 98.106%
26133545023

Pull #316

github

web-flow
Merge 515a2b27f into 03ae7bb7f
Pull Request #316: Add `event-storage-http` REST API layer with NDJSON reads, consumer endpoints, and raw buffer streaming

1078 of 1131 branches covered (95.31%)

Branch coverage included in aggregate %.

258 of 270 new or added lines in 10 files covered. (95.56%)

22 existing lines in 5 files now uncovered.

5536 of 5621 relevant lines covered (98.49%)

823.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.75
/src/EventStream.js
1
import stream from 'stream';
4✔
2
import { assert } from './util.js';
4✔
3

4✔
4
/**
4✔
5
 * Calculate the actual version number from a possibly relative (negative) version number.
4✔
6
 *
4✔
7
 * @param {number} version The version to normalize.
4✔
8
 * @param {number} length The maximum version number
4✔
9
 * @returns {number} The absolute version number.
4✔
10
 */
4✔
11
function normalizeVersion(version, length) {
2,360✔
12
    return version < 0 ? version + length + 1 : version;
2,360✔
13
}
2,360✔
14

4✔
15
/**
4✔
16
 * Return the lower absolute version given a version and a maxVersion constraint.
4✔
17
 * @param {number} version
4✔
18
 * @param {number} maxVersion
4✔
19
 * @returns {number}
4✔
20
 */
4✔
21
function minVersion(version, maxVersion) {
1,204✔
22
    return Math.min(version, maxVersion < 0 ? version + maxVersion + 1 : maxVersion);
1,204✔
23
}
1,204✔
24

4✔
25
/**
4✔
26
 * An event stream is a simple wrapper around an iterator over storage documents.
4✔
27
 * It implements a node readable stream interface.
4✔
28
 */
4✔
29
class EventStream extends stream.Readable {
4✔
30

4✔
31
    /**
4✔
32
     * @param {string} name The name of the stream.
4✔
33
     * @param {EventStore} eventStore The event store to get the stream from.
4✔
34
     * @param {number} [minRevision] The minimum revision to include in the events (inclusive).
4✔
35
     * @param {number} [maxRevision] The maximum revision to include in the events (inclusive).
4✔
36
     * @param {function(object, object): boolean|true|null} [predicate] An optional filter function
4✔
37
     *   `(payload, metadata) => boolean`. Pass `true` to activate raw-buffer mode: the stream emits
4✔
38
     *   NDJSON Buffers instead of event objects and can be piped directly into an HTTP response.
4✔
39
     *   When `true`, `next()`/`forEach()`/`filter()` must not be used.
4✔
40
     */
4✔
41
    constructor(name, eventStore, minRevision = 1, maxRevision = -1, predicate = null) {
4✔
42
        const raw = predicate === true;
1,332✔
43
        super({ objectMode: !raw });
1,332✔
44
        assert(typeof name === 'string' && name !== '', 'Need to specify a stream name.');
1,332✔
45
        assert(typeof eventStore === 'object' && eventStore !== null, `Need to provide EventStore instance to create EventStream ${name}.`);
1,332✔
46

1,332✔
47
        this.name = name;
1,332✔
48
        this.raw = raw;
1,332✔
49
        this.predicate = raw ? null : (predicate || null);
1,332!
50
        if (eventStore.streams[name]) {
1,332✔
51
            this.streamIndex = eventStore.streams[name].index;
1,168✔
52
            this.minRevision = normalizeVersion(minRevision, this.streamIndex.length);
1,168✔
53
            this.maxRevision = normalizeVersion(maxRevision, this.streamIndex.length);
1,168✔
54
            this.version = minVersion(this.streamIndex.length, maxRevision);
1,168✔
55
            this._iterator = null;
1,168✔
56
            this.fetch = () => eventStore.storage.readRange(this.minRevision, this.maxRevision, this.streamIndex, raw);
1,168✔
57
        } else {
1,332✔
58
            this.streamIndex = { length: 0 };
144✔
59
            this.version = -1;
144✔
60
            this._iterator = { next() { return { done: true }; } };
144✔
61
        }
144✔
62
    }
1,332✔
63

4✔
64
    /**
4✔
65
     * @api
4✔
66
     * @param {number} revision The event revision to start reading from (inclusive).
4✔
67
     * @returns {EventStream}
4✔
68
     */
4✔
69
    from(revision) {
4✔
70
        this.minRevision = normalizeVersion(revision, this.streamIndex.length);
20✔
71
        return this;
20✔
72
    }
20✔
73

4✔
74
    /**
4✔
75
     * @api
4✔
76
     * @param {number} revision The event revision to read until (inclusive).
4✔
77
     * @returns {EventStream}
4✔
78
     */
4✔
79
    until(revision) {
4✔
80
        this.maxRevision = normalizeVersion(revision, this.streamIndex.length);
4✔
81
        this.version = minVersion(this.streamIndex.length, this.maxRevision);
4✔
82
        return this;
4✔
83
    }
4✔
84

4✔
85
    /**
4✔
86
     * @api
4✔
87
     * @param {number} amount The amount of events at the start of the stream to return in chronological order.
4✔
88
     * @returns {EventStream}
4✔
89
     */
4✔
90
    first(amount) {
4✔
91
        return this.fromStart().following(amount);
12✔
92
    }
12✔
93

4✔
94
    /**
4✔
95
     * @api
4✔
96
     * @param {number} amount The amount of events at the end of the stream to return in chronological order.
4✔
97
     * @returns {EventStream}
4✔
98
     */
4✔
99
    last(amount) {
4✔
100
        return this.fromEnd().previous(amount).forwards();
12✔
101
    }
12✔
102

4✔
103
    /**
4✔
104
     * @api
4✔
105
     * @returns {EventStream}
4✔
106
     */
4✔
107
    fromStart() {
4✔
108
        this.minRevision = 1;
36✔
109
        return this;
36✔
110
    }
36✔
111

4✔
112
    /**
4✔
113
     * @api
4✔
114
     * @returns {EventStream}
4✔
115
     */
4✔
116
    fromEnd() {
4✔
117
        this.minRevision = this.streamIndex.length;
28✔
118
        return this;
28✔
119
    }
28✔
120

4✔
121
    /**
4✔
122
     * @param {number} amount The amount of events to return in reverse chronological order.
4✔
123
     * @returns {EventStream}
4✔
124
     */
4✔
125
    previous(amount) {
4✔
126
        this.maxRevision = Math.max(1, this.minRevision - amount + 1);
20✔
127
        return this;
20✔
128
    }
20✔
129

4✔
130
    /**
4✔
131
     * @param {number} amount The amount of events to return in chronological order.
4✔
132
     * @returns {EventStream}
4✔
133
     */
4✔
134
    following(amount) {
4✔
135
        this.maxRevision = Math.min(this.streamIndex.length, this.minRevision + amount - 1);
16✔
136
        return this;
16✔
137
    }
16✔
138

4✔
139
    /**
4✔
140
     * @api
4✔
141
     * @returns {EventStream}
4✔
142
     */
4✔
143
    toEnd() {
4✔
144
        this.maxRevision = this.version = this.streamIndex.length;
32✔
145
        return this;
32✔
146
    }
32✔
147

4✔
148
    /**
4✔
149
     * @api
4✔
150
     * @returns {EventStream}
4✔
151
     */
4✔
152
    toStart() {
4✔
153
        this.maxRevision = 1;
12✔
154
        return this;
12✔
155
    }
12✔
156

4✔
157
    /**
4✔
158
     * Reverse the current range of events, no matter which direction it currently has.
4✔
159
     * @returns {EventStream}
4✔
160
     */
4✔
161
    reverse() {
4✔
162
        let tmp = this.maxRevision;
32✔
163
        this.maxRevision = this.minRevision;
32✔
164
        this.minRevision = tmp;
32✔
165
        this.version = minVersion(this.streamIndex.length, this.maxRevision);
32✔
166
        return this;
32✔
167
    }
32✔
168

4✔
169
    /**
4✔
170
     * Make the current range of events read in forward chronological order.
4✔
171
     * @api
4✔
172
     * @param {number} [amount] Amount of events to read forward. If not specified, will read forward until the previously set limit.
4✔
173
     * @returns {EventStream}
4✔
174
     */
4✔
175
    forwards(amount = 0) {
4✔
176
        if (amount > 0) {
24✔
177
            this.following(amount);
4✔
178
        }
4✔
179
        if (this.maxRevision < this.minRevision) {
24✔
180
            this.reverse();
16✔
181
        }
16✔
182
        return this;
24✔
183
    }
24✔
184

4✔
185
    /**
4✔
186
     * Make the current range of events read in backward chronological order.
4✔
187
     * @api
4✔
188
     * @param {number} [amount] Amount of events to read backward. If not specified, will read backward until the previously set limit.
4✔
189
     * @returns {EventStream}
4✔
190
     */
4✔
191
    backwards(amount = 0) {
4✔
192
        if (amount > 0) {
24✔
193
            this.previous(amount);
4✔
194
        }
4✔
195
        if (this.maxRevision > this.minRevision) {
24✔
196
            this.reverse();
16✔
197
        }
16✔
198
        return this;
24✔
199
    }
24✔
200

4✔
201
    /**
4✔
202
     * Will iterate over all events in this stream and return an array of the events.
4✔
203
     *
4✔
204
     * @returns {Array<object>}
4✔
205
     */
4✔
206
    get events() {
4✔
207
        if (this._events instanceof Array) {
352✔
208
            return this._events;
132✔
209
        }
132✔
210
        this._events = [];
220✔
211
        let next;
220✔
212
        while ((next = this.next()) !== false) {
352✔
213
            this._events.push(next.payload);
972✔
214
        }
972✔
215
        return this._events;
220✔
216
    }
352✔
217

4✔
218
    /**
4✔
219
     * Iterate over the events in this stream with a callback.
4✔
220
     * This method is useful to gain access to the event metadata.
4✔
221
     *
4✔
222
     * @api
4✔
223
     * @param {function(object, object, string)} callback A callback function that will receive the event, the storage metadata and the original stream name for every event in this stream.
4✔
224
     */
4✔
225
    forEach(callback) {
4✔
226
        let next;
8✔
227
        while ((next = this.next()) !== false) {
8✔
228
            callback(next.payload, next.metadata, next.stream);
24✔
229
        }
24✔
230
    }
8✔
231

4✔
232
    /**
4✔
233
     * Iterator implementation. Iterate over the stream in a `for ... of` loop.
4✔
234
     */
4✔
235
    *[Symbol.iterator]() {
4✔
236
        let next;
136✔
237
        while ((next = this.next()) !== false) {
136✔
238
            yield next.payload;
568✔
239
        }
560✔
240
    }
136✔
241

4✔
242
    /**
4✔
243
     * Reset this stream to the start so it can be iterated again.
4✔
244
     * @returns {EventStream}
4✔
245
     */
4✔
246
    reset() {
4✔
247
        this._iterator = null;
48✔
248
        this._events = null;
48✔
249
        return this;
48✔
250
    }
48✔
251

4✔
252
    /**
4✔
253
     * Apply a filter predicate to this stream.  Only events for which `predicate(payload, metadata)`
4✔
254
     * returns a truthy value will be yielded.  The predicate is stored as a first-class property
4✔
255
     * of the stream and applied in {@link EventStream#next}.
4✔
256
     *
4✔
257
     * @api
4✔
258
     * @param {function(object, object): boolean} predicate A function receiving `(payload, metadata)`.
4✔
259
     *   Events for which the predicate returns falsy are skipped.
4✔
260
     * @returns {EventStream} `this`
4✔
261
     */
4✔
262
    filter(predicate) {
4✔
NEW
UNCOV
263
        if (this.raw) {
×
NEW
UNCOV
264
            throw new Error('Cannot apply a filter to a raw event stream.');
×
NEW
UNCOV
265
        }
×
UNCOV
266
        this.predicate = predicate || null;
×
UNCOV
267
        this._iterator = null;
×
UNCOV
268
        this._events = null;
×
UNCOV
269
        return this;
×
UNCOV
270
    }
×
271

4✔
272
    /**
4✔
273
     * @returns {object|boolean} The next event or false if no more events in the stream.
4✔
274
     */
4✔
275
    next() {
4✔
276
        if (!this._iterator) {
1,456✔
277
            this._iterator = this.fetch();
280✔
278
        }
280✔
279
        try {
1,456✔
280
            while (true) {
1,456✔
281
                const result = this._iterator.next();
1,464✔
282
                if (result.done) return false;
1,464✔
283
                if (!this.predicate || this.predicate(result.value.payload, result.value.metadata)) {
1,464✔
284
                    return result.value;
1,188✔
285
                }
1,188✔
286
            }
1,464✔
287
        } catch(e) {
1,456✔
288
            return false;
8✔
289
        }
8✔
290
    }
1,456✔
291

4✔
292
    // noinspection JSUnusedGlobalSymbols
4✔
293
    /**
4✔
294
     * Readable stream implementation.
4✔
295
     * @private
4✔
296
     */
4✔
297
    _read() {
4✔
298
        const next = this.next();
32✔
299
        this.push(next ? (this.raw ? next : next.payload) : null);
32✔
300
    }
32✔
301

4✔
302
}
4✔
303

4✔
304
export default EventStream;
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc