• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

albe / node-event-storage / 26195028502

20 May 2026 11:03PM UTC coverage: 98.111% (+0.005%) from 98.106%
26195028502

Pull #316

github

web-flow
Merge 43a9935a1 into 03ae7bb7f
Pull Request #316: Add `event-storage-http` REST API layer with NDJSON reads, consumer endpoints, and raw buffer streaming

1093 of 1141 branches covered (95.79%)

Branch coverage included in aggregate %.

356 of 363 new or added lines in 10 files covered. (98.07%)

11 existing lines in 2 files now uncovered.

5556 of 5636 relevant lines covered (98.58%)

850.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.07
/src/EventStream.js
1
import stream from 'stream';
4✔
2
import { assert } from './util.js';
4✔
3

4✔
4
const NDJSON_NEWLINE = Buffer.from('\n');
4✔
5

4✔
6
/**
4✔
7
 * Calculate the actual version number from a possibly relative (negative) version number.
4✔
8
 *
4✔
9
 * @param {number} version The version to normalize.
4✔
10
 * @param {number} length The maximum version number
4✔
11
 * @returns {number} The absolute version number.
4✔
12
 */
4✔
13
function normalizeVersion(version, length) {
2,384✔
14
    return version < 0 ? version + length + 1 : version;
2,384✔
15
}
2,384✔
16

4✔
17
/**
4✔
18
 * Return the lower absolute version given a version and a maxVersion constraint.
4✔
19
 * @param {number} version
4✔
20
 * @param {number} maxVersion
4✔
21
 * @returns {number}
4✔
22
 */
4✔
23
function minVersion(version, maxVersion) {
1,216✔
24
    return Math.min(version, maxVersion < 0 ? version + maxVersion + 1 : maxVersion);
1,216✔
25
}
1,216✔
26

4✔
27
/**
4✔
28
 * An event stream is a simple wrapper around an iterator over storage documents.
4✔
29
 * It implements a node readable stream interface.
4✔
30
 */
4✔
31
class EventStream extends stream.Readable {
4✔
32

4✔
33
    /**
4✔
34
     * @param {string} name The name of the stream.
4✔
35
     * @param {EventStore} eventStore The event store to get the stream from.
4✔
36
     * @param {number} [minRevision] The minimum revision to include in the events (inclusive).
4✔
37
     * @param {number} [maxRevision] The maximum revision to include in the events (inclusive).
4✔
38
     * @param {(function(object, object): boolean)|true|null} [predicate] An optional filter function
4✔
39
     *   `(payload, metadata) => boolean`. Pass `true` to activate raw-buffer mode: the stream emits
4✔
40
     *   NDJSON Buffers instead of event objects and can be piped directly into an HTTP response.
4✔
41
     *   When `true`, `next()`/`forEach()`/`filter()` must not be used.
4✔
42
     */
4✔
43
    constructor(name, eventStore, minRevision = 1, maxRevision = -1, predicate = null) {
4✔
44
        const raw = predicate === true;
1,348✔
45
        super({ objectMode: !raw });
1,348✔
46
        assert(typeof name === 'string' && name !== '', 'Need to specify a stream name.');
1,348✔
47
        assert(typeof eventStore === 'object' && eventStore !== null, `Need to provide EventStore instance to create EventStream ${name}.`);
1,348✔
48

1,348✔
49
        this.name = name;
1,348✔
50
        this.raw = raw;
1,348✔
51
        this.predicate = raw ? null : (predicate || null);
1,348✔
52
        if (eventStore.streams[name]) {
1,348✔
53
            this.streamIndex = eventStore.streams[name].index;
1,180✔
54
            this.minRevision = normalizeVersion(minRevision, this.streamIndex.length);
1,180✔
55
            this.maxRevision = normalizeVersion(maxRevision, this.streamIndex.length);
1,180✔
56
            this.version = minVersion(this.streamIndex.length, maxRevision);
1,180✔
57
            this._iterator = null;
1,180✔
58
            this.fetch = () => eventStore.storage.readRange(this.minRevision, this.maxRevision, this.streamIndex, raw);
1,180✔
59
        } else {
1,348✔
60
            this.streamIndex = { length: 0 };
148✔
61
            this.version = -1;
148✔
62
            this._iterator = { next() { return { done: true }; } };
148✔
63
        }
148✔
64
    }
1,348✔
65

4✔
66
    /**
4✔
67
     * @api
4✔
68
     * @param {number} revision The event revision to start reading from (inclusive).
4✔
69
     * @returns {EventStream}
4✔
70
     */
4✔
71
    from(revision) {
4✔
72
        this.minRevision = normalizeVersion(revision, this.streamIndex.length);
20✔
73
        return this;
20✔
74
    }
20✔
75

4✔
76
    /**
4✔
77
     * @api
4✔
78
     * @param {number} revision The event revision to read until (inclusive).
4✔
79
     * @returns {EventStream}
4✔
80
     */
4✔
81
    until(revision) {
4✔
82
        this.maxRevision = normalizeVersion(revision, this.streamIndex.length);
4✔
83
        this.version = minVersion(this.streamIndex.length, this.maxRevision);
4✔
84
        return this;
4✔
85
    }
4✔
86

4✔
87
    /**
4✔
88
     * @api
4✔
89
     * @param {number} amount The amount of events at the start of the stream to return in chronological order.
4✔
90
     * @returns {EventStream}
4✔
91
     */
4✔
92
    first(amount) {
4✔
93
        return this.fromStart().following(amount);
12✔
94
    }
12✔
95

4✔
96
    /**
4✔
97
     * @api
4✔
98
     * @param {number} amount The amount of events at the end of the stream to return in chronological order.
4✔
99
     * @returns {EventStream}
4✔
100
     */
4✔
101
    last(amount) {
4✔
102
        return this.fromEnd().previous(amount).forwards();
12✔
103
    }
12✔
104

4✔
105
    /**
4✔
106
     * @api
4✔
107
     * @returns {EventStream}
4✔
108
     */
4✔
109
    fromStart() {
4✔
110
        this.minRevision = 1;
36✔
111
        return this;
36✔
112
    }
36✔
113

4✔
114
    /**
4✔
115
     * @api
4✔
116
     * @returns {EventStream}
4✔
117
     */
4✔
118
    fromEnd() {
4✔
119
        this.minRevision = this.streamIndex.length;
28✔
120
        return this;
28✔
121
    }
28✔
122

4✔
123
    /**
4✔
124
     * @param {number} amount The amount of events to return in reverse chronological order.
4✔
125
     * @returns {EventStream}
4✔
126
     */
4✔
127
    previous(amount) {
4✔
128
        this.maxRevision = Math.max(1, this.minRevision - amount + 1);
20✔
129
        return this;
20✔
130
    }
20✔
131

4✔
132
    /**
4✔
133
     * @param {number} amount The amount of events to return in chronological order.
4✔
134
     * @returns {EventStream}
4✔
135
     */
4✔
136
    following(amount) {
4✔
137
        this.maxRevision = Math.min(this.streamIndex.length, this.minRevision + amount - 1);
16✔
138
        return this;
16✔
139
    }
16✔
140

4✔
141
    /**
4✔
142
     * @api
4✔
143
     * @returns {EventStream}
4✔
144
     */
4✔
145
    toEnd() {
4✔
146
        this.maxRevision = this.version = this.streamIndex.length;
32✔
147
        return this;
32✔
148
    }
32✔
149

4✔
150
    /**
4✔
151
     * @api
4✔
152
     * @returns {EventStream}
4✔
153
     */
4✔
154
    toStart() {
4✔
155
        this.maxRevision = 1;
12✔
156
        return this;
12✔
157
    }
12✔
158

4✔
159
    /**
4✔
160
     * Reverse the current range of events, no matter which direction it currently has.
4✔
161
     * @returns {EventStream}
4✔
162
     */
4✔
163
    reverse() {
4✔
164
        let tmp = this.maxRevision;
32✔
165
        this.maxRevision = this.minRevision;
32✔
166
        this.minRevision = tmp;
32✔
167
        this.version = minVersion(this.streamIndex.length, this.maxRevision);
32✔
168
        return this;
32✔
169
    }
32✔
170

4✔
171
    /**
4✔
172
     * Make the current range of events read in forward chronological order.
4✔
173
     * @api
4✔
174
     * @param {number} [amount] Amount of events to read forward. If not specified, will read forward until the previously set limit.
4✔
175
     * @returns {EventStream}
4✔
176
     */
4✔
177
    forwards(amount = 0) {
4✔
178
        if (amount > 0) {
24✔
179
            this.following(amount);
4✔
180
        }
4✔
181
        if (this.maxRevision < this.minRevision) {
24✔
182
            this.reverse();
16✔
183
        }
16✔
184
        return this;
24✔
185
    }
24✔
186

4✔
187
    /**
4✔
188
     * Make the current range of events read in backward chronological order.
4✔
189
     * @api
4✔
190
     * @param {number} [amount] Amount of events to read backward. If not specified, will read backward until the previously set limit.
4✔
191
     * @returns {EventStream}
4✔
192
     */
4✔
193
    backwards(amount = 0) {
4✔
194
        if (amount > 0) {
24✔
195
            this.previous(amount);
4✔
196
        }
4✔
197
        if (this.maxRevision > this.minRevision) {
24✔
198
            this.reverse();
16✔
199
        }
16✔
200
        return this;
24✔
201
    }
24✔
202

4✔
203
    /**
4✔
204
     * Will iterate over all events in this stream and return an array of the events.
4✔
205
     *
4✔
206
     * @returns {Array<object>}
4✔
207
     */
4✔
208
    get events() {
4✔
209
        if (this._events instanceof Array) {
352✔
210
            return this._events;
132✔
211
        }
132✔
212
        this._events = [];
220✔
213
        let next;
220✔
214
        while ((next = this.next()) !== false) {
352✔
215
            this._events.push(next.payload);
972✔
216
        }
972✔
217
        return this._events;
220✔
218
    }
352✔
219

4✔
220
    /**
4✔
221
     * Iterate over the events in this stream with a callback.
4✔
222
     * This method is useful to gain access to the event metadata.
4✔
223
     *
4✔
224
     * @api
4✔
225
     * @param {function(object, object, string)} callback A callback function that will receive the event, the storage metadata and the original stream name for every event in this stream.
4✔
226
     */
4✔
227
    forEach(callback) {
4✔
228
        let next;
8✔
229
        while ((next = this.next()) !== false) {
8✔
230
            callback(next.payload, next.metadata, next.stream);
24✔
231
        }
24✔
232
    }
8✔
233

4✔
234
    /**
4✔
235
     * Iterator implementation. Iterate over the stream in a `for ... of` loop.
4✔
236
     */
4✔
237
    *[Symbol.iterator]() {
4✔
238
        let next;
144✔
239
        while ((next = this.next()) !== false) {
144✔
240
            yield this.raw ? this.toRawBuffer(next) : next.payload;
588✔
241
        }
580✔
242
    }
144✔
243

4✔
244
    /**
4✔
245
     * @param {{ buffer: Buffer }} entry
4✔
246
     * @returns {Buffer}
4✔
247
     */
4✔
248
    toRawBuffer(entry) {
4✔
249
        return Buffer.concat([entry.buffer, NDJSON_NEWLINE]);
20✔
250
    }
20✔
251

4✔
252
    /**
4✔
253
     * Reset this stream to the start so it can be iterated again.
4✔
254
     * @returns {EventStream}
4✔
255
     */
4✔
256
    reset() {
4✔
257
        this._iterator = null;
48✔
258
        this._events = null;
48✔
259
        return this;
48✔
260
    }
48✔
261

4✔
262
    /**
4✔
263
     * Apply a filter predicate to this stream.  Only events for which `predicate(payload, metadata)`
4✔
264
     * returns a truthy value will be yielded.  The predicate is stored as a first-class property
4✔
265
     * of the stream and applied in {@link EventStream#next}.
4✔
266
     *
4✔
267
     * @api
4✔
268
     * @param {function(object, object): boolean} predicate A function receiving `(payload, metadata)`.
4✔
269
     *   Events for which the predicate returns falsy are skipped.
4✔
270
     * @returns {EventStream} `this`
4✔
271
     */
4✔
272
    filter(predicate) {
4✔
NEW
UNCOV
273
        if (this.raw) {
×
NEW
UNCOV
274
            throw new Error('Cannot apply a filter to a raw event stream.');
×
NEW
UNCOV
275
        }
×
UNCOV
276
        this.predicate = predicate || null;
×
UNCOV
277
        this._iterator = null;
×
278
        this._events = null;
×
279
        return this;
×
280
    }
×
281

4✔
282
    /**
4✔
283
     * @returns {object|boolean} The next event or false if no more events in the stream.
4✔
284
     */
4✔
285
    next() {
4✔
286
        if (!this._iterator) {
1,468✔
287
            this._iterator = this.fetch();
284✔
288
        }
284✔
289
        try {
1,468✔
290
            while (true) {
1,468✔
291
                const result = this._iterator.next();
1,476✔
292
                if (result.done) return false;
1,476✔
293
                if (!this.predicate || this.predicate(result.value.payload, result.value.metadata)) {
1,476✔
294
                    return result.value;
1,196✔
295
                }
1,196✔
296
            }
1,476✔
297
        } catch(e) {
1,468✔
298
            return false;
8✔
299
        }
8✔
300
    }
1,468✔
301

4✔
302
    // noinspection JSUnusedGlobalSymbols
4✔
303
    /**
4✔
304
     * Readable stream implementation.
4✔
305
     * @private
4✔
306
     */
4✔
307
    _read() {
4✔
308
        const next = this.next();
32✔
309
        this.push(next ? (this.raw ? this.toRawBuffer(next) : next.payload) : null);
32✔
310
    }
32✔
311

4✔
312
}
4✔
313

4✔
314
export default EventStream;
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc