• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

albe / node-event-storage / 23361017298

20 Mar 2026 08:19PM UTC coverage: 97.391% (-0.4%) from 97.826%
23361017298

Pull #107

github

web-flow
Merge b56a2679a into 28d4e34f0
Pull Request #107: Start implementing auto-repair

606 of 644 branches covered (94.1%)

Branch coverage included in aggregate %.

18 of 24 new or added lines in 5 files covered. (75.0%)

2 existing lines in 1 file now uncovered.

1447 of 1464 relevant lines covered (98.84%)

1200.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.6
/src/EventStore.js
1
const EventStream = require('./EventStream');
4✔
2
const JoinEventStream = require('./JoinEventStream');
4✔
3
const fs = require('fs');
4✔
4
const path = require('path');
4✔
5
const events = require('events');
4✔
6
const Storage = require('./Storage');
4✔
7
const Consumer = require('./Consumer');
4✔
8
const { assert } = require('./util');
4✔
9

10
const ExpectedVersion = {
4✔
11
    Any: -1,
12
    EmptyStream: 0
13
};
14

15
class OptimisticConcurrencyError extends Error {}
16

17
/**
18
 * An event store optimized for working with many streams.
19
 * An event stream is implemented as an iterator over an index on the storage, therefore indexes need to be lightweight
20
 * and highly performant in read-only mode.
21
 */
22
class EventStore extends events.EventEmitter {
23

24
    /**
25
     * @param {string} [storeName] The name of the store which will be used as storage prefix. Default 'eventstore'.
26
     * @param {object} [config] An object with config options.
27
     * @param {string} [config.storageDirectory] The directory where the data should be stored. Default './data'.
28
     * @param {string} [config.streamsDirectory] The directory where the streams should be stored. Default '{storageDirectory}/streams'.
29
     * @param {object} [config.storageConfig] Additional config options given to the storage backend. See `Storage`.
30
     * @param {boolean} [config.readOnly] If the storage should be mounted in read-only mode.
31
     */
32
    constructor(storeName = 'eventstore', config = {}) {
280!
33
        super();
284✔
34
        if (typeof storeName !== 'string') {
284✔
35
            config = storeName;
280✔
36
            storeName = 'eventstore';
280✔
37
        }
38

39
        this.storageDirectory = path.resolve(config.storageDirectory || /* istanbul ignore next */ './data');
284✔
40
        let defaults = {
284✔
41
            dataDirectory: this.storageDirectory,
42
            indexDirectory: config.streamsDirectory || path.join(this.storageDirectory, 'streams'),
568✔
43
            partitioner: (event) => event.stream,
784✔
44
            readOnly: config.readOnly || false
536✔
45
        };
46
        const storageConfig = Object.assign(defaults, config.storageConfig);
284✔
47
        this.initialize(storeName, storageConfig);
284✔
48
    }
49

50
    /**
51
     * @private
52
     * @param {string} storeName
53
     * @param {object} storageConfig
54
     */
55
    initialize(storeName, storageConfig) {
56
        this.streamsDirectory = path.resolve(storageConfig.indexDirectory);
284✔
57

58
        this.storeName = storeName;
284✔
59
        this.storage = (storageConfig.readOnly === true) ?
284✔
60
                        new Storage.ReadOnly(storeName, storageConfig)
61
                        : new Storage(storeName, storageConfig);
62
        this.storage.open();
280✔
63
        this.streams = Object.create(null);
280✔
64
        this.streams._all = { index: this.storage.index };
280✔
65

66
        this.scanStreams((err) => {
280✔
67
            if (err) {
280✔
68
                this.storage.close();
4✔
69
                throw err;
4✔
70
            }
71
            this.checkUnfinishedCommits();
276✔
72
            this.emit('ready');
276✔
73
        });
74
    }
75

76
    /**
77
     * Check if the last commit in the store was unfinished, which is the case if not all events of the commit have been written.
78
     * Torn writes are handled at the storage level, so this method only deals with unfinished commits.
79
     * @private
80
     */
81
    checkUnfinishedCommits() {
82
        let position = this.storage.length;
276✔
83
        let lastEvent;
84
        let truncateIndex = false;
276✔
85
        while (position > 0 && (lastEvent = this.storage.read(position)) === false) {
276✔
86
            truncateIndex = true;
4✔
87
            position--;
4✔
88
        }
89

90
        if (lastEvent && lastEvent.metadata.commitSize && lastEvent.metadata.commitVersion !== lastEvent.metadata.commitSize - 1) {
276✔
91
            this.emit('unfinished-commit', lastEvent);
4✔
92
            // commitId = global sequence number at which the commit started
93
            this.storage.truncate(lastEvent.metadata.commitId);
4✔
94
        } else if (truncateIndex) {
272!
95
            // The index contained items that are not in the storage file, so truncate it after the last valid event
NEW
96
            this.storage.truncate(position + 1);
×
97
        }
98
    }
99

100
    /**
101
     * @private
102
     * @param {string} name
103
     * @param {object} config
104
     * @returns {ReadableStorage|WritableStorage}
105
     */
106
    createStorage(name, config) {
NEW
107
        if (config.readOnly === true) {
×
NEW
UNCOV
108
            return new Storage.ReadOnly(name, config);
×
109
        }
NEW
UNCOV
110
        return new Storage(name, config);
×
111
    }
112

113
    /**
114
     * Scan the streams directory for existing streams so they are ready for `getEventStream()`.
115
     *
116
     * @private
117
     * @param {function} callback A callback that will be called when all existing streams are found.
118
     */
119
    scanStreams(callback) {
120
        /* istanbul ignore if */
121
        if (typeof callback !== 'function') {
280✔
122
            callback = () => {};
123
        }
124
        // Find existing streams by scanning dir for filenames starting with 'stream-'
125
        fs.readdir(this.streamsDirectory, (err, files) => {
280✔
126
            if (err) {
280✔
127
                return callback(err);
4✔
128
            }
129
            let match;
130
            for (let file of files) {
276✔
131
                if ((match = file.match(/(stream-.*)\.index$/)) !== null) {
309✔
132
                    this.registerStream(match[1]);
33✔
133
                }
134
            }
135
            callback();
276✔
136
        });
137
        this.storage.on('index-created', this.registerStream.bind(this));
276✔
138
    }
139

140
    /**
141
     * @private
142
     * @param {string} name The full stream name, including the `stream-` prefix (and optional `.closed` suffix).
143
     */
144
    registerStream(name) {
145
        /* istanbul ignore if */
146
        if (!name.startsWith('stream-')) {
433✔
147
            return;
148
        }
149
        let streamName = name.slice(7);
433✔
150
        // Detect the `.closed` suffix — present both in the initial scan and when the directory
151
        // watcher emits 'index-created' after a writer renames the file (e.g. 'stream-foo-bar.closed').
152
        let isClosed = false;
433✔
153
        if (streamName.endsWith('.closed')) {
433✔
154
            streamName = streamName.slice(0, -7);
8✔
155
            isClosed = true;
8✔
156
        }
157
        if (streamName in this.streams) {
433✔
158
            if (isClosed && !this.streams[streamName].closed) {
9✔
159
                // The stream was renamed to .closed while this instance had it open.
160
                // The old ReadOnlyIndex was already closed via onRename, so we open the new one.
161
                const closedIndexName = 'stream-' + streamName + '.closed';
4✔
162
                const closedIndex = this.storage.openReadonlyIndex(closedIndexName);
4✔
163
                // deepcode ignore PrototypePollutionFunctionParams: streams is a Map
164
                this.streams[streamName] = { index: closedIndex, closed: true };
4✔
165
                this.emit('stream-closed', streamName);
4✔
166
            }
167
            return;
9✔
168
        }
169
        const index = isClosed
424✔
170
            ? this.storage.openReadonlyIndex(name)
171
            : this.storage.openIndex(name);
172
        // deepcode ignore PrototypePollutionFunctionParams: streams is a Map
173
        this.streams[streamName] = { index, closed: isClosed };
424✔
174
        this.emit('stream-available', streamName);
424✔
175
    }
176

177
    /**
178
     * Close the event store and free up all resources.
179
     *
180
     * @api
181
     */
182
    close() {
183
        this.storage.close();
276✔
184
    }
185

186
    /**
187
     * Get the number of events stored.
188
     *
189
     * @api
190
     * @returns {number}
191
     */
192
    get length() {
193
        return this.storage.length;
884✔
194
    }
195

196
    /**
197
     * This method makes it so the last three arguments can be given either as:
198
     *  - expectedVersion, metadata, callback
199
     *  - expectedVersion, callback
200
     *  - metadata, callback
201
     *  - callback
202
     *
203
     * @private
204
     * @param {Array<object>|object} events
205
     * @param {number} [expectedVersion]
206
     * @param {object|function} [metadata]
207
     * @param {function} [callback]
208
     * @returns {{events: Array<object>, metadata: object, callback: function, expectedVersion: number}}
209
     */
210
    static fixArgumentTypes(events, expectedVersion, metadata, callback) {
211
        if (!(events instanceof Array)) {
768✔
212
            events = [events];
84✔
213
        }
214
        if (typeof expectedVersion !== 'number') {
768✔
215
            callback = metadata;
124✔
216
            metadata = expectedVersion;
124✔
217
            expectedVersion = ExpectedVersion.Any;
124✔
218
        }
219
        if (typeof metadata !== 'object') {
768✔
220
            callback = metadata;
124✔
221
            metadata = {};
124✔
222
        }
223
        if (typeof callback !== 'function') {
768✔
224
            callback = () => {};
636✔
225
        }
226
        return { events, expectedVersion, metadata, callback };
768✔
227
    }
228

229
    /**
230
     * Commit a list of events for the given stream name, which is expected to be at the given version.
231
     * Note that the events committed may still appear in other streams too - the given stream name is only
232
     * relevant for optimistic concurrency checks with the given expected version.
233
     *
234
     * @api
235
     * @param {string} streamName The name of the stream to commit the events to.
236
     * @param {Array<object>|object} events The events to commit or a single event.
237
     * @param {number} [expectedVersion] One of ExpectedVersion constants or a positive version number that the stream is supposed to be at before commit.
238
     * @param {object} [metadata] The commit metadata to use as base. Useful for replication and adding storage metadata.
239
     * @param {function} [callback] A function that will be executed when all events have been committed.
240
     * @throws {OptimisticConcurrencyError} if the stream is not at the expected version.
241
     */
242
    commit(streamName, events, expectedVersion = ExpectedVersion.Any, metadata = {}, callback = null) {
2,168✔
243
        assert(!(this.storage instanceof Storage.ReadOnly), 'The storage was opened in read-only mode. Can not commit to it.');
780✔
244
        assert(typeof streamName === 'string' && streamName !== '', 'Must specify a stream name for commit.');
776✔
245
        assert(typeof events !== 'undefined' && events !== null, 'No events specified for commit.');
772✔
246

247
        ({ events, expectedVersion, metadata, callback } = EventStore.fixArgumentTypes(events, expectedVersion, metadata, callback));
768✔
248

249
        if (!(streamName in this.streams)) {
768✔
250
            this.createEventStream(streamName, { stream: streamName });
344✔
251
        }
252
        assert(!this.streams[streamName].closed, `Stream "${streamName}" is closed and cannot be written to.`);
768✔
253
        let streamVersion = this.streams[streamName].index.length;
760✔
254
        if (expectedVersion !== ExpectedVersion.Any && streamVersion !== expectedVersion) {
760✔
255
            throw new OptimisticConcurrencyError(`Optimistic Concurrency error. Expected stream "${streamName}" at version ${expectedVersion} but is at version ${streamVersion}.`);
12✔
256
        }
257

258
        if (events.length > 1) {
748✔
259
            delete metadata.commitVersion;
20✔
260
        }
261

262
        const commitId = this.length;
748✔
263
        let commitVersion = 0;
748✔
264
        const commitSize = events.length;
748✔
265
        const committedAt = Date.now();
748✔
266
        const commit = Object.assign({
748✔
267
            commitId,
268
            committedAt
269
        }, metadata, {
270
            streamName,
271
            streamVersion,
272
            events: []
273
        });
274
        const commitCallback = () => {
748✔
275
            this.emit('commit', commit);
740✔
276
            callback(commit);
740✔
277
        };
278
        for (let event of events) {
748✔
279
            const eventMetadata = Object.assign({ commitId, committedAt, commitVersion, commitSize }, metadata, { streamVersion });
784✔
280
            const storedEvent = { stream: streamName, payload: event, metadata: eventMetadata };
784✔
281
            commitVersion++;
784✔
282
            streamVersion++;
784✔
283
            commit.events.push(event);
784✔
284
            this.storage.write(storedEvent, commitVersion !== events.length ? undefined : commitCallback);
784✔
285
        }
286
    }
287

288
    /**
289
     * @api
290
     * @param {string} streamName The name of the stream to get the version for.
291
     * @returns {number} The version that the given stream is at currently, or -1 if the stream does not exist.
292
     */
293
    getStreamVersion(streamName) {
294
        if (!(streamName in this.streams)) {
36✔
295
            return -1;
8✔
296
        }
297
        return this.streams[streamName].index.length;
28✔
298
    }
299

300
    /**
301
     * Get an event stream for the given stream name within the revision boundaries.
302
     *
303
     * @api
304
     * @param {string} streamName The name of the stream to get.
305
     * @param {number} [minRevision] The 1-based minimum revision to include in the events (inclusive).
306
     * @param {number} [maxRevision] The 1-based maximum revision to include in the events (inclusive).
307
     * @returns {EventStream|boolean} The event stream or false if a stream with the name doesn't exist.
308
     */
309
    getEventStream(streamName, minRevision = 1, maxRevision = -1) {
168✔
310
        if (!(streamName in this.streams)) {
104✔
311
            return false;
4✔
312
        }
313
        return new EventStream(streamName, this, minRevision, maxRevision);
100✔
314
    }
315

316
    /**
317
     * Get a stream for all events within the revision boundaries.
318
     * This is the same as `getEventStream('_all', ...)`.
319
     *
320
     * @api
321
     * @param {number} [minRevision] The 1-based minimum revision to include in the events (inclusive).
322
     * @param {number} [maxRevision] The 1-based maximum revision to include in the events (inclusive).
323
     * @returns {EventStream} The event stream.
324
     */
325
    getAllEvents(minRevision = 1, maxRevision = -1) {
8✔
326
        return this.getEventStream('_all', minRevision, maxRevision);
4✔
327
    }
328

329
    /**
330
     * Create a virtual event stream from existing streams by joining them.
331
     *
332
     * @param {string} streamName The (transient) name of the joined stream.
333
     * @param {Array<string>} streamNames An array of the stream names to join.
334
     * @param {number} [minRevision] The 1-based minimum revision to include in the events (inclusive).
335
     * @param {number} [maxRevision] The 1-based maximum revision to include in the events (inclusive).
336
     * @returns {EventStream} The joined event stream.
337
     * @throws {Error} if any of the streams doesn't exist.
338
     */
339
    fromStreams(streamName, streamNames, minRevision = 1, maxRevision = -1) {
32✔
340
        assert(streamNames instanceof Array, 'Must specify an array of stream names.');
24✔
341

342
        for (let stream of streamNames) {
16✔
343
            assert(stream in this.streams, `Stream "${stream}" does not exist.`);
100✔
344
        }
345
        return new JoinEventStream(streamName, streamNames, this, minRevision, maxRevision);
12✔
346
    }
347

348
    /**
349
     * Get a stream for a category of streams. This will effectively return a joined stream of all streams that start
350
     * with the given `categoryName` followed by a dash.
351
     * If you frequently use this for a category consisting of a lot of streams (e.g. `users`), consider creating a
352
     * dedicated physical stream for the category:
353
     *
354
     *    `eventstore.createEventStream('users', e => e.stream.startsWith('users-'))`
355
     *
356
     * @api
357
     * @param {string} categoryName The name of the category to get a stream for. A category is a stream name prefix.
358
     * @param {number} [minRevision] The 1-based minimum revision to include in the events (inclusive).
359
     * @param {number} [maxRevision] The 1-based maximum revision to include in the events (inclusive).
360
     * @returns {EventStream} The joined event stream for all streams of the given category.
361
     * @throws {Error} If no stream for this category exists.
362
     */
363
    getEventStreamForCategory(categoryName, minRevision = 1, maxRevision = -1) {
24✔
364
        if (categoryName in this.streams) {
12✔
365
            return this.getEventStream(categoryName, minRevision, maxRevision);
4✔
366
        }
367
        const categoryStreams = Object.keys(this.streams).filter(streamName => streamName.startsWith(categoryName + '-'));
96✔
368

369
        if (categoryStreams.length === 0) {
8✔
370
            throw new Error(`No streams for category '${categoryName}' exist.`);
4✔
371
        }
372
        return this.fromStreams(categoryName, categoryStreams, minRevision, maxRevision);
4✔
373
    }
374

375
    /**
376
     * Create a new stream with the given matcher.
377
     *
378
     * @api
379
     * @param {string} streamName The name of the stream to create.
380
     * @param {object|function(event)} matcher A matcher object, denoting the properties that need to match on an event a function that takes the event and returns true if the event should be added.
381
     * @returns {EventStream} The EventStream with all existing events matching the matcher.
382
     * @throws {Error} If a stream with that name already exists.
383
     * @throws {Error} If the stream could not be created.
384
     */
385
    createEventStream(streamName, matcher) {
386
        assert(!(this.storage instanceof Storage.ReadOnly), 'The storage was opened in read-only mode. Can not create new stream on it.');
404✔
387
        assert(!(streamName in this.streams), 'Can not recreate stream!');
400✔
388

389
        const streamIndexName = 'stream-' + streamName;
388✔
390
        const index = this.storage.ensureIndex(streamIndexName, matcher);
388✔
391
        assert(index !== null, `Error creating stream index ${streamName}.`);
388✔
392

393
        // deepcode ignore PrototypePollutionFunctionParams: streams is a Map
394
        this.streams[streamName] = { index, matcher };
388✔
395
        this.emit('stream-created', streamName);
388✔
396
        return new EventStream(streamName, this);
388✔
397
    }
398

399
    /**
400
     * Delete an event stream. Will do nothing if the stream with the name doesn't exist.
401
     *
402
     * Note that you can delete a write stream, but that will not delete the events written to it.
403
     * Also, on next write, that stream will be rebuilt from all existing events, which might take some time.
404
     *
405
     * @api
406
     * @param {string} streamName The name of the stream to delete.
407
     * @returns void
408
     */
409
    deleteEventStream(streamName) {
410
        assert(!(this.storage instanceof Storage.ReadOnly), 'The storage was opened in read-only mode. Can not delete a stream on it.');
12✔
411

412
        if (!(streamName in this.streams)) {
8✔
413
            return;
4✔
414
        }
415
        this.streams[streamName].index.destroy();
4✔
416
        delete this.streams[streamName];
4✔
417
        this.emit('stream-deleted', streamName);
4✔
418
    }
419

420
    /**
421
     * Close a stream so that no new events are indexed into it.
422
     * The stream will still be readable, but any attempt to write to it will throw an error.
423
     * A closed stream is persisted by renaming its index file to include a `.closed` marker
424
     * (e.g. `stream-X.closed.index`), so it will be recognized as closed when the store is reopened.
425
     *
426
     * @api
427
     * @param {string} streamName The name of the stream to close.
428
     * @returns void
429
     * @throws {Error} If the storage is read-only.
430
     * @throws {Error} If the stream does not exist.
431
     * @throws {Error} If the stream is already closed.
432
     */
433
    closeEventStream(streamName) {
434
        assert(!(this.storage instanceof Storage.ReadOnly), 'The storage was opened in read-only mode. Can not close a stream on it.');
48✔
435
        assert(streamName in this.streams, `Stream "${streamName}" does not exist.`);
44✔
436
        assert(!this.streams[streamName].closed, `Stream "${streamName}" is already closed.`);
40✔
437

438
        const indexName = 'stream-' + streamName;
36✔
439
        const { index } = this.streams[streamName];
36✔
440

441
        // Flush and close the index before renaming the file
442
        index.close();
36✔
443

444
        // Rename the index file to mark it as closed (e.g. stream-foo.index -> stream-foo.closed.index)
445
        const closedFileName = index.fileName.replace(/\.index$/, '.closed.index');
36✔
446
        fs.renameSync(index.fileName, closedFileName);
36✔
447

448
        // Remove from secondary indexes so that new writes are no longer indexed into this stream
449
        delete this.storage.secondaryIndexes[indexName];
36✔
450

451
        // Reopen the renamed index for read access, outside the secondary indexes write path
452
        const closedIndexName = indexName + '.closed';
36✔
453
        const closedIndex = this.storage.openReadonlyIndex(closedIndexName);
36✔
454

455
        // deepcode ignore PrototypePollutionFunctionParams: streams is a Map
456
        this.streams[streamName] = { index: closedIndex, closed: true };
36✔
457
        this.emit('stream-closed', streamName);
36✔
458
    }
459

460
    /**
461
     * Get a durable consumer for the given stream that will keep receiving events from the last position.
462
     *
463
     * @param {string} streamName The name of the stream to consume.
464
     * @param {string} identifier The unique identifying name of this consumer.
465
     * @param {object} [initialState] The initial state of the consumer.
466
     * @param {number} [since] The stream revision to start consuming from.
467
     * @returns {Consumer} A durable consumer for the given stream.
468
     */
469
    getConsumer(streamName, identifier, initialState = {}, since = 0) {
40✔
470
        const consumer = new Consumer(this.storage, streamName === '_all' ? '_all' : 'stream-' + streamName, identifier, initialState, since);
20✔
471
        consumer.streamName = streamName;
20✔
472
        return consumer;
20✔
473
    }
474

475
    /**
476
     * Scan the existing consumers on this EventStore and asynchronously return a list of their names.
477
     * @param {function(error: Error, consumers: array)} callback A callback that will receive an error as first and the list of consumers as second argument.
478
     */
479
    scanConsumers(callback) {
480
        const consumersPath = path.join(this.storage.indexDirectory, 'consumers');
8✔
481
        if (!fs.existsSync(consumersPath)) {
8✔
482
            callback(null, []);
4✔
483
            return;
4✔
484
        }
485
        fs.readdir(consumersPath, (err, files) => {
4✔
486
            /* istanbul ignore if */
487
            if (err) {
4✔
488
                return callback(err, []);
489
            }
490
            let matches;
491
            const regex = new RegExp(`^${this.storage.storageFile}\.([^.]*\..*)$`);
4✔
492
            const consumers = [];
4✔
493
            for (let file of files) {
4✔
494
                if ((matches = file.match(regex)) !== null) {
8!
495
                    consumers.push(matches[1]);
8✔
496
                }
497
            }
498
            callback(null, consumers);
4✔
499
        });
500
    }
501
}
502

503
module.exports = EventStore;
4✔
504
module.exports.ExpectedVersion = ExpectedVersion;
4✔
505
module.exports.OptimisticConcurrencyError = OptimisticConcurrencyError;
4✔
506
module.exports.LOCK_THROW = Storage.LOCK_THROW;
4✔
507
module.exports.LOCK_RECLAIM = Storage.LOCK_RECLAIM;
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc