• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

albe / node-event-storage / 26331781152

23 May 2026 11:43AM UTC coverage: 98.028% (-0.08%) from 98.106%
26331781152

Pull #316

github

web-flow
Merge 88893fee7 into 03ae7bb7f
Pull Request #316: Add `event-storage-http` REST API layer with NDJSON reads, consumer endpoints, and raw buffer streaming

1188 of 1244 branches covered (95.5%)

Branch coverage included in aggregate %.

582 of 604 new or added lines in 11 files covered. (96.36%)

10 existing lines in 2 files now uncovered.

5771 of 5855 relevant lines covered (98.57%)

841.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.92
/src/EventStream.js
1
import stream from 'stream';
4✔
2
import { assert } from './util.js';
4✔
3
import { buildRawBufferMatcher, matches } from './metadataUtil.js';
4✔
4

4✔
5
const NDJSON_NEWLINE = Buffer.from('\n');
4✔
6

4✔
7
/**
4✔
8
 * Calculate the actual version number from a possibly relative (negative) version number.
4✔
9
 *
4✔
10
 * @param {number} version The version to normalize.
4✔
11
 * @param {number} length The maximum version number
4✔
12
 * @returns {number} The absolute version number.
4✔
13
 */
4✔
14
function normalizeVersion(version, length) {
2,568✔
15
    return version < 0 ? version + length + 1 : version;
2,568✔
16
}
2,568✔
17

4✔
18
/**
4✔
19
 * Return the lower absolute version given a version and a maxVersion constraint.
4✔
20
 * @param {number} version
4✔
21
 * @param {number} maxVersion
4✔
22
 * @returns {number}
4✔
23
 */
4✔
24
function minVersion(version, maxVersion) {
1,308✔
25
    return Math.min(version, maxVersion < 0 ? version + maxVersion + 1 : maxVersion);
1,308✔
26
}
1,308✔
27

4✔
28
/**
4✔
29
 * An event stream is a simple wrapper around an iterator over storage documents.
4✔
30
 * It implements a node readable stream interface.
4✔
31
 */
4✔
32
class EventStream extends stream.Readable {
4✔
33

4✔
34
    /**
4✔
35
     * @param {string} name The name of the stream.
4✔
36
     * @param {EventStore} eventStore The event store to get the stream from.
4✔
37
     * @param {number} [minRevision] The minimum revision to include in the events (inclusive).
4✔
38
     * @param {number} [maxRevision] The maximum revision to include in the events (inclusive).
4✔
39
     * @param {function|object|null} [predicate] Optional matcher:
4✔
40
     *   - object mode: function `(payload, metadata) => boolean` or object matcher against `{ stream, payload, metadata }`
4✔
41
     *   - raw mode: function `(buffer) => boolean` or object matcher against compact NDJSON bytes.
4✔
42
     * @param {boolean} [raw=false] If true, emit NDJSON Buffers instead of event payload objects.
4✔
43
     */
4✔
44
    constructor(name, eventStore, minRevision = 1, maxRevision = -1, predicate = null, raw = false) {
4✔
45
        if (typeof predicate === 'boolean' && raw === false) {
1,452!
NEW
46
            raw = predicate;
×
NEW
47
            predicate = null;
×
NEW
48
        }
×
49
        super({ objectMode: !raw });
1,452✔
50
        assert(typeof name === 'string' && name !== '', 'Need to specify a stream name.');
1,452✔
51
        assert(typeof eventStore === 'object' && eventStore !== null, `Need to provide EventStore instance to create EventStream ${name}.`);
1,452✔
52

1,452✔
53
        this.name = name;
1,452✔
54
        this.raw = raw;
1,452✔
55
        this.predicate = predicate || null;
1,452✔
56
        this.rawMatcher = null;
1,452✔
57
        if (eventStore.streams[name]) {
1,452✔
58
            this.streamIndex = eventStore.streams[name].index;
1,272✔
59
            this.minRevision = normalizeVersion(minRevision, this.streamIndex.length);
1,272✔
60
            this.maxRevision = normalizeVersion(maxRevision, this.streamIndex.length);
1,272✔
61
            this.version = minVersion(this.streamIndex.length, maxRevision);
1,272✔
62
            this._iterator = null;
1,272✔
63
            this.fetch = () => eventStore.storage.readRange(this.minRevision, this.maxRevision, this.streamIndex, raw);
1,272✔
64
        } else {
1,452✔
65
            this.streamIndex = { length: 0 };
160✔
66
            this.version = -1;
160✔
67
            this._iterator = { next() { return { done: true }; } };
160✔
68
        }
160✔
69
    }
1,452✔
70

4✔
71
    /**
4✔
72
     * @api
4✔
73
     * @param {number} revision The event revision to start reading from (inclusive).
4✔
74
     * @returns {EventStream}
4✔
75
     */
4✔
76
    from(revision) {
4✔
77
        this.minRevision = normalizeVersion(revision, this.streamIndex.length);
20✔
78
        return this;
20✔
79
    }
20✔
80

4✔
81
    /**
4✔
82
     * @api
4✔
83
     * @param {number} revision The event revision to read until (inclusive).
4✔
84
     * @returns {EventStream}
4✔
85
     */
4✔
86
    until(revision) {
4✔
87
        this.maxRevision = normalizeVersion(revision, this.streamIndex.length);
4✔
88
        this.version = minVersion(this.streamIndex.length, this.maxRevision);
4✔
89
        return this;
4✔
90
    }
4✔
91

4✔
92
    /**
4✔
93
     * @api
4✔
94
     * @param {number} amount The amount of events at the start of the stream to return in chronological order.
4✔
95
     * @returns {EventStream}
4✔
96
     */
4✔
97
    first(amount) {
4✔
98
        return this.fromStart().following(amount);
12✔
99
    }
12✔
100

4✔
101
    /**
4✔
102
     * @api
4✔
103
     * @param {number} amount The amount of events at the end of the stream to return in chronological order.
4✔
104
     * @returns {EventStream}
4✔
105
     */
4✔
106
    last(amount) {
4✔
107
        return this.fromEnd().previous(amount).forwards();
12✔
108
    }
12✔
109

4✔
110
    /**
4✔
111
     * @api
4✔
112
     * @returns {EventStream}
4✔
113
     */
4✔
114
    fromStart() {
4✔
115
        this.minRevision = 1;
36✔
116
        return this;
36✔
117
    }
36✔
118

4✔
119
    /**
4✔
120
     * @api
4✔
121
     * @returns {EventStream}
4✔
122
     */
4✔
123
    fromEnd() {
4✔
124
        this.minRevision = this.streamIndex.length;
28✔
125
        return this;
28✔
126
    }
28✔
127

4✔
128
    /**
4✔
129
     * @param {number} amount The amount of events to return in reverse chronological order.
4✔
130
     * @returns {EventStream}
4✔
131
     */
4✔
132
    previous(amount) {
4✔
133
        this.maxRevision = Math.max(1, this.minRevision - amount + 1);
20✔
134
        return this;
20✔
135
    }
20✔
136

4✔
137
    /**
4✔
138
     * @param {number} amount The amount of events to return in chronological order.
4✔
139
     * @returns {EventStream}
4✔
140
     */
4✔
141
    following(amount) {
4✔
142
        this.maxRevision = Math.min(this.streamIndex.length, this.minRevision + amount - 1);
16✔
143
        return this;
16✔
144
    }
16✔
145

4✔
146
    /**
4✔
147
     * @api
4✔
148
     * @returns {EventStream}
4✔
149
     */
4✔
150
    toEnd() {
4✔
151
        this.maxRevision = this.version = this.streamIndex.length;
32✔
152
        return this;
32✔
153
    }
32✔
154

4✔
155
    /**
4✔
156
     * @api
4✔
157
     * @returns {EventStream}
4✔
158
     */
4✔
159
    toStart() {
4✔
160
        this.maxRevision = 1;
12✔
161
        return this;
12✔
162
    }
12✔
163

4✔
164
    /**
4✔
165
     * Reverse the current range of events, no matter which direction it currently has.
4✔
166
     * @returns {EventStream}
4✔
167
     */
4✔
168
    reverse() {
4✔
169
        let tmp = this.maxRevision;
32✔
170
        this.maxRevision = this.minRevision;
32✔
171
        this.minRevision = tmp;
32✔
172
        this.version = minVersion(this.streamIndex.length, this.maxRevision);
32✔
173
        return this;
32✔
174
    }
32✔
175

4✔
176
    /**
4✔
177
     * Make the current range of events read in forward chronological order.
4✔
178
     * @api
4✔
179
     * @param {number} [amount] Amount of events to read forward. If not specified, will read forward until the previously set limit.
4✔
180
     * @returns {EventStream}
4✔
181
     */
4✔
182
    forwards(amount = 0) {
4✔
183
        if (amount > 0) {
24✔
184
            this.following(amount);
4✔
185
        }
4✔
186
        if (this.maxRevision < this.minRevision) {
24✔
187
            this.reverse();
16✔
188
        }
16✔
189
        return this;
24✔
190
    }
24✔
191

4✔
192
    /**
4✔
193
     * Make the current range of events read in backward chronological order.
4✔
194
     * @api
4✔
195
     * @param {number} [amount] Amount of events to read backward. If not specified, will read backward until the previously set limit.
4✔
196
     * @returns {EventStream}
4✔
197
     */
4✔
198
    backwards(amount = 0) {
4✔
199
        if (amount > 0) {
24✔
200
            this.previous(amount);
4✔
201
        }
4✔
202
        if (this.maxRevision > this.minRevision) {
24✔
203
            this.reverse();
16✔
204
        }
16✔
205
        return this;
24✔
206
    }
24✔
207

4✔
208
    /**
4✔
209
     * Will iterate over all events in this stream and return an array of the events.
4✔
210
     *
4✔
211
     * @returns {Array<object>}
4✔
212
     */
4✔
213
    get events() {
4✔
214
        if (this._events instanceof Array) {
352✔
215
            return this._events;
132✔
216
        }
132✔
217
        this._events = [];
220✔
218
        let next;
220✔
219
        while ((next = this.next()) !== false) {
352✔
220
            this._events.push(next.payload);
972✔
221
        }
972✔
222
        return this._events;
220✔
223
    }
352✔
224

4✔
225
    /**
4✔
226
     * Iterate over the events in this stream with a callback.
4✔
227
     * This method is useful to gain access to the event metadata.
4✔
228
     *
4✔
229
     * @api
4✔
230
     * @param {function(object, object, string)} callback A callback function that will receive the event, the storage metadata and the original stream name for every event in this stream.
4✔
231
     */
4✔
232
    forEach(callback) {
4✔
233
        let next;
8✔
234
        while ((next = this.next()) !== false) {
8✔
235
            callback(next.payload, next.metadata, next.stream);
24✔
236
        }
24✔
237
    }
8✔
238

4✔
239
    /**
4✔
240
     * Iterator implementation. Iterate over the stream in a `for ... of` loop.
4✔
241
     */
4✔
242
    *[Symbol.iterator]() {
4✔
243
        let next;
184✔
244
        while ((next = this.next()) !== false) {
184✔
245
            yield this.raw ? this.toRawBuffer(next) : next.payload;
628✔
246
        }
620✔
247
    }
184✔
248

4✔
249
    /**
4✔
250
     * @param {{ buffer: Buffer }} entry
4✔
251
     * @returns {Buffer}
4✔
252
     */
4✔
253
    toRawBuffer(entry) {
4✔
254
        return Buffer.concat([entry.buffer, NDJSON_NEWLINE]);
60✔
255
    }
60✔
256

4✔
257
    /**
4✔
258
     * Reset this stream to the start so it can be iterated again.
4✔
259
     * @returns {EventStream}
4✔
260
     */
4✔
261
    reset() {
4✔
262
        this._iterator = null;
48✔
263
        this._events = null;
48✔
264
        return this;
48✔
265
    }
48✔
266

4✔
267
    /**
4✔
268
     * Apply a filter predicate to this stream.  Only events for which `predicate(payload, metadata)`
4✔
269
     * returns a truthy value will be yielded.  The predicate is stored as a first-class property
4✔
270
     * of the stream and applied in {@link EventStream#next}.
4✔
271
     *
4✔
272
     * @api
4✔
273
     * @param {function(object, object): boolean} predicate A function receiving `(payload, metadata)`.
4✔
274
     *   Events for which the predicate returns falsy are skipped.
4✔
275
     * @returns {EventStream} `this`
4✔
276
     */
4✔
277
    filter(predicate) {
4✔
278
        this.predicate = predicate || null;
×
NEW
279
        this.rawMatcher = null;
×
280
        this._iterator = null;
×
281
        this._events = null;
×
282
        return this;
×
283
    }
×
284

4✔
285
    matchesPredicate(entry) {
4✔
286
        if (!this.predicate) {
1,728✔
287
            return true;
1,604✔
288
        }
1,604✔
289
        if (this.raw) {
1,728✔
290
            if (typeof this.predicate === 'function') {
92✔
291
                return this.predicate(entry.buffer);
36✔
292
            }
36✔
293
            if (!this.rawMatcher) {
92✔
294
                this.rawMatcher = buildRawBufferMatcher(this.predicate);
28✔
295
            }
28✔
296
            return this.rawMatcher(entry.buffer);
56✔
297
        }
56✔
298

32✔
299
        if (typeof this.predicate === 'function') {
32✔
300
            return this.predicate(entry.payload, entry.metadata);
32✔
301
        }
32✔
NEW
302
        return matches(entry, this.predicate);
×
303
    }
1,728✔
304

4✔
305
    /**
4✔
306
     * @returns {object|boolean} The next event or false if no more events in the stream.
4✔
307
     */
4✔
308
    next() {
4✔
309
        if (!this._iterator) {
1,524✔
310
            this._iterator = this.fetch();
316✔
311
        }
316✔
312
        try {
1,524✔
313
            while (true) {
1,524✔
314
                const result = this._iterator.next();
1,564✔
315
                if (result.done) return false;
1,564✔
316
                if (this.matchesPredicate(result.value)) {
1,564✔
317
                    return result.value;
1,224✔
318
                }
1,224✔
319
            }
1,564✔
320
        } catch(e) {
1,524✔
321
            return false;
8✔
322
        }
8✔
323
    }
1,524✔
324

4✔
325
    // noinspection JSUnusedGlobalSymbols
4✔
326
    /**
4✔
327
     * Readable stream implementation.
4✔
328
     * @private
4✔
329
     */
4✔
330
    _read() {
4✔
331
        const next = this.next();
32✔
332
        this.push(next ? (this.raw ? this.toRawBuffer(next) : next.payload) : null);
32✔
333
    }
32✔
334

4✔
335
}
4✔
336

4✔
337
export default EventStream;
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc