• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

albe / node-event-storage / 23897068196

02 Apr 2026 10:56AM UTC coverage: 98.151% (+0.1%) from 98.054%
23897068196

Pull #257

github

web-flow
Merge b52671064 into 50d3642f2
Pull Request #257: Move to ES Modules (ESM)

890 of 929 branches covered (95.8%)

Branch coverage included in aggregate %.

118 of 118 new or added lines in 21 files covered. (100.0%)

55 existing lines in 12 files now uncovered.

4684 of 4750 relevant lines covered (98.61%)

787.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.57
/src/EventStore.js
1
import EventStream from './EventStream.js';
4✔
2
import JoinEventStream from './JoinEventStream.js';
4✔
3
import fs from 'fs';
4✔
4
import path from 'path';
4✔
5
import events from 'events';
4✔
6
import Storage, { ReadOnly as ReadOnlyStorage, LOCK_THROW, LOCK_RECLAIM } from './Storage.js';
4✔
7
import Consumer from './Consumer.js';
4✔
8
import { assert, scanForFiles } from './util.js';
4✔
9

4✔
10
const ExpectedVersion = {
4✔
11
    Any: -1,
4✔
12
    EmptyStream: 0
4✔
13
};
4✔
14

4✔
15
class OptimisticConcurrencyError extends Error {}
4✔
16

4✔
17
/**
4✔
18
 * An event store optimized for working with many streams.
4✔
19
 * An event stream is implemented as an iterator over an index on the storage, therefore indexes need to be lightweight
4✔
20
 * and highly performant in read-only mode.
4✔
21
 */
4✔
22
class EventStore extends events.EventEmitter {
4✔
23

4✔
24
    /**
4✔
25
     * @param {string} [storeName] The name of the store which will be used as storage prefix. Default 'eventstore'.
4✔
26
     * @param {object} [config] An object with config options.
4✔
27
     * @param {string} [config.storageDirectory] The directory where the data should be stored. Default './data'.
4✔
28
     * @param {string} [config.streamsDirectory] The directory where the streams should be stored. Default '{storageDirectory}/streams'.
4✔
29
     * @param {object} [config.storageConfig] Additional config options given to the storage backend. See `Storage`.
4✔
30
     * @param {boolean} [config.readOnly] If the storage should be mounted in read-only mode.
4✔
31
     * @param {object|function(string): object} [config.streamMetadata] A metadata object or a function `(streamName) => object`
4✔
32
     *   that is called whenever a new stream partition is created. The returned object is stored once in the partition
4✔
33
     *   file header and surfaced to `preCommit` / `preRead` hooks. Takes precedence only when
4✔
34
     *   `config.storageConfig.metadata` is not also set.
4✔
35
     */
4✔
36
    constructor(storeName = 'eventstore', config = {}) {
4✔
37
        super();
408✔
38
        if (typeof storeName !== 'string') {
408✔
39
            config = storeName;
404✔
40
            storeName = 'eventstore';
404✔
41
        }
404✔
42

408✔
43
        this.storageDirectory = path.resolve(config.storageDirectory || /* istanbul ignore next */ './data');
408!
44
        let defaults = {
408✔
45
            dataDirectory: this.storageDirectory,
408✔
46
            indexDirectory: config.streamsDirectory || path.join(this.storageDirectory, 'streams'),
408✔
47
            partitioner: (event) => event.stream,
408✔
48
            readOnly: config.readOnly || false
408✔
49
        };
408✔
50
        const storageConfig = Object.assign(defaults, config.storageConfig);
408✔
51

408✔
52
        // Translate the high-level streamMetadata option into the storage-level metadata function,
408✔
53
        // but only when the caller has not already provided a lower-level storageConfig.metadata.
408✔
54
        if (config.streamMetadata !== undefined && storageConfig.metadata === undefined) {
408✔
55
            if (typeof config.streamMetadata === 'function') {
76✔
56
                storageConfig.metadata = config.streamMetadata;
4✔
57
            } else {
76✔
58
                storageConfig.metadata = (streamName) => config.streamMetadata[streamName] || {};
72✔
59
            }
72✔
60
        }
76✔
61

408✔
62
        this.initialize(storeName, storageConfig);
408✔
63
    }
408✔
64

4✔
65
    /**
4✔
66
     * @private
4✔
67
     * @param {string} storeName
4✔
68
     * @param {object} storageConfig
4✔
69
     */
4✔
70
    initialize(storeName, storageConfig) {
4✔
71
        this.streamsDirectory = path.resolve(storageConfig.indexDirectory);
408✔
72

408✔
73
        this.storeName = storeName;
408✔
74
        this.storage = (storageConfig.readOnly === true) ?
408✔
75
                        new ReadOnlyStorage(storeName, storageConfig)
44✔
76
                        : new Storage(storeName, storageConfig);
408✔
77
        this.storage.open();
408✔
78
        this.streams = Object.create(null);
408✔
79
        this.streams._all = { index: this.storage.index };
408✔
80

408✔
81
        this.scanStreams((err) => {
408✔
82
            if (err) {
404✔
83
                this.storage.close();
4✔
84
                throw err;
4✔
85
            }
4✔
86
            this.checkUnfinishedCommits();
400✔
87
            this.emit('ready');
400✔
88
        });
408✔
89
    }
408✔
90

4✔
91
    /**
4✔
92
     * Check if the last commit in the store was unfinished, which is the case if not all events of the commit have been written.
4✔
93
     * Torn writes are handled at the storage level, so this method only deals with unfinished commits.
4✔
94
     * @private
4✔
95
     */
4✔
96
    checkUnfinishedCommits() {
4✔
97
        let position = this.storage.length;
400✔
98
        let lastEvent;
400✔
99
        let truncateIndex = false;
400✔
100
        while (position > 0) {
400✔
101
            try {
256✔
102
                lastEvent = this.storage.read(position);
256✔
103
            } catch (e) {
256✔
104
                // A preRead hook may throw (e.g. access control). Stop repair check.
4✔
105
                return;
4✔
106
            }
4✔
107
            if (lastEvent !== false) break;
256✔
108
            truncateIndex = true;
16✔
109
            position--;
16✔
110
        }
16✔
111

396✔
112
        if (lastEvent && lastEvent.metadata.commitSize && lastEvent.metadata.commitVersion !== lastEvent.metadata.commitSize - 1) {
400✔
113
            this.emit('unfinished-commit', lastEvent);
8✔
114
            // commitId = global sequence number at which the commit started
8✔
115
            this.storage.truncate(lastEvent.metadata.commitId);
8✔
116
        } else if (truncateIndex) {
400✔
117
            // The index contained items that are not in the storage file; truncate everything
4✔
118
            // after `position`, the last sequence number that was successfully read.
4✔
119
            this.storage.truncate(position);
4✔
120
        }
4✔
121
    }
400✔
122

4✔
123
    /**
4✔
124
     * Scan the streams directory for existing streams so they are ready for `getEventStream()`.
4✔
125
     *
4✔
126
     * @private
4✔
127
     * @param {function} callback A callback that will be called when all existing streams are found.
4✔
128
     */
4✔
129
    scanStreams(callback) {
4✔
130
        /* istanbul ignore if */
404✔
131
        if (typeof callback !== 'function') {
404!
UNCOV
132
            callback = () => {};
×
UNCOV
133
        }
×
134
        // Find existing streams by scanning dir for filenames starting with 'stream-'
404✔
135
        scanForFiles(this.streamsDirectory, /(stream-.*)\.index$/, this.registerStream.bind(this), callback);
404✔
136
        this.storage.on('index-created', this.registerStream.bind(this));
404✔
137
    }
404✔
138

4✔
139
    /**
4✔
140
     * @private
4✔
141
     * @param {string} name The full stream name, including the `stream-` prefix (and optional `.closed` suffix).
4✔
142
     */
4✔
143
    registerStream(name) {
4✔
144
        /* istanbul ignore if */
572✔
145
        if (!name.startsWith('stream-')) {
572!
UNCOV
146
            return;
×
UNCOV
147
        }
×
148
        let streamName = name.slice(7);
572✔
149
        // Detect the `.closed` suffix — present both in the initial scan and when the directory
572✔
150
        // watcher emits 'index-created' after a writer renames the file (e.g. 'stream-foo-bar.closed').
572✔
151
        let isClosed = false;
572✔
152
        if (streamName.endsWith('.closed')) {
572✔
153
            streamName = streamName.slice(0, -7);
8✔
154
            isClosed = true;
8✔
155
        }
8✔
156
        if (streamName in this.streams) {
572✔
157
            if (isClosed && !this.streams[streamName].closed) {
8✔
158
                // The stream was renamed to .closed while this instance had it open.
4✔
159
                // The old ReadOnlyIndex was already closed via onRename, so we open the new one.
4✔
160
                const closedIndexName = 'stream-' + streamName + '.closed';
4✔
161
                const closedIndex = this.storage.openReadonlyIndex(closedIndexName);
4✔
162
                // deepcode ignore PrototypePollutionFunctionParams: streams is a Map
4✔
163
                this.streams[streamName] = { index: closedIndex, closed: true };
4✔
164
                this.emit('stream-closed', streamName);
4✔
165
            }
4✔
166
            return;
8✔
167
        }
8✔
168
        const index = isClosed
564✔
169
            ? this.storage.openReadonlyIndex(name)
572✔
170
            : this.storage.openIndex(name);
572✔
171
        // deepcode ignore PrototypePollutionFunctionParams: streams is a Map
572✔
172
        this.streams[streamName] = { index, closed: isClosed };
572✔
173
        this.emit('stream-available', streamName);
572✔
174
    }
572✔
175

4✔
176
    /**
4✔
177
     * Close the event store and free up all resources.
4✔
178
     *
4✔
179
     * @api
4✔
180
     */
4✔
181
    close() {
4✔
182
        this.storage.close();
400✔
183
    }
400✔
184

4✔
185
    /**
4✔
186
     * Override EventEmitter.on() to delegate 'preCommit' and 'preRead' event registrations
4✔
187
     * to the underlying storage, so that `eventstore.on('preCommit', handler)` works naturally.
4✔
188
     * All other events are handled by the default EventEmitter.
4✔
189
     *
4✔
190
     * @param {string} event
4✔
191
     * @param {function} listener
4✔
192
     * @returns {this}
4✔
193
     */
4✔
194
    on(event, listener) {
4✔
195
        if (event === 'preCommit' || event === 'preRead') {
192✔
196
            if (event === 'preCommit') {
76✔
197
                assert(!(this.storage instanceof ReadOnlyStorage), 'The storage was opened in read-only mode. Can not register a preCommit handler on it.');
40✔
198
            }
40✔
199
            this.storage.on(event, listener);
68✔
200
            return this;
68✔
201
        }
68✔
202
        return super.on(event, listener);
116✔
203
    }
192✔
204

4✔
205
    /**
4✔
206
     * @inheritDoc
4✔
207
     */
4✔
208
    addListener(event, listener) {
4✔
209
        return this.on(event, listener);
4✔
210
    }
4✔
211

4✔
212
    /**
4✔
213
     * Override EventEmitter.once() to delegate 'preCommit' and 'preRead' to the underlying storage.
4✔
214
     *
4✔
215
     * @param {string} event
4✔
216
     * @param {function} listener
4✔
217
     * @returns {this}
4✔
218
     */
4✔
219
    once(event, listener) {
4✔
220
        if (event === 'preCommit' || event === 'preRead') {
12✔
221
            if (event === 'preCommit') {
8✔
222
                assert(!(this.storage instanceof ReadOnlyStorage), 'The storage was opened in read-only mode. Can not register a preCommit handler on it.');
4✔
223
            }
4✔
224
            this.storage.once(event, listener);
8✔
225
            return this;
8✔
226
        }
8✔
227
        return super.once(event, listener);
4✔
228
    }
12✔
229

4✔
230
    /**
4✔
231
     * Override EventEmitter.off() / removeListener() to delegate 'preCommit' and 'preRead'
4✔
232
     * to the underlying storage.
4✔
233
     *
4✔
234
     * @param {string} event
4✔
235
     * @param {function} listener
4✔
236
     * @returns {this}
4✔
237
     */
4✔
238
    off(event, listener) {
4✔
239
        if (event === 'preCommit' || event === 'preRead') {
24✔
240
            this.storage.off(event, listener);
12✔
241
            return this;
12✔
242
        }
12✔
243
        return super.off(event, listener);
12✔
244
    }
24✔
245

4✔
246
    /**
4✔
247
     * @inheritDoc
4✔
248
     */
4✔
249
    removeListener(event, listener) {
4✔
250
        return this.off(event, listener);
12✔
251
    }
12✔
252

4✔
253
    /**
4✔
254
     * Convenience method to register a handler called before an event is committed to storage.
4✔
255
     * Equivalent to `eventstore.on('preCommit', hook)`.
4✔
256
     * The handler receives `(event, partitionMetadata)` and may throw to abort the write.
4✔
257
     * Multiple handlers can be registered; all run on every write in registration order.
4✔
258
     * The handler is invoked on every write, so its logic should be cheap, fast, and synchronous.
4✔
259
     *
4✔
260
     * @api
4✔
261
     * @param {function(object, object): void} hook A function receiving (event, partitionMetadata).
4✔
262
     * @throws {Error} If the storage was opened in read-only mode.
4✔
263
     */
4✔
264
    preCommit(hook) {
4✔
265
        this.on('preCommit', hook);
20✔
266
    }
20✔
267

4✔
268
    /**
4✔
269
     * Convenience method to register a handler called before an event is read from storage.
4✔
270
     * Equivalent to `eventstore.on('preRead', hook)`.
4✔
271
     * The handler receives `(position, partitionMetadata)` and may throw to abort the read.
4✔
272
     * Multiple handlers can be registered; all run on every read in registration order.
4✔
273
     * The handler is invoked on every read, so its logic should be cheap, fast, and synchronous.
4✔
274
     *
4✔
275
     * @api
4✔
276
     * @param {function(number, object): void} hook A function receiving (position, partitionMetadata).
4✔
277
     */
4✔
278
    preRead(hook) {
4✔
279
        this.on('preRead', hook);
12✔
280
    }
12✔
281

4✔
282
    /**
4✔
283
     * Get the number of events stored.
4✔
284
     *
4✔
285
     * @api
4✔
286
     * @returns {number}
4✔
287
     */
4✔
288
    get length() {
4✔
289
        return this.storage.length;
1,012✔
290
    }
1,012✔
291

4✔
292
    /**
4✔
293
     * This method makes it so the last three arguments can be given either as:
4✔
294
     *  - expectedVersion, metadata, callback
4✔
295
     *  - expectedVersion, callback
4✔
296
     *  - metadata, callback
4✔
297
     *  - callback
4✔
298
     *
4✔
299
     * @private
4✔
300
     * @param {Array<object>|object} events
4✔
301
     * @param {number} [expectedVersion]
4✔
302
     * @param {object|function} [metadata]
4✔
303
     * @param {function} [callback]
4✔
304
     * @returns {{events: Array<object>, metadata: object, callback: function, expectedVersion: number}}
4✔
305
     */
4✔
306
    static fixArgumentTypes(events, expectedVersion, metadata, callback) {
4✔
307
        if (!(events instanceof Array)) {
880✔
308
            events = [events];
84✔
309
        }
84✔
310
        if (typeof expectedVersion !== 'number') {
880✔
311
            callback = metadata;
148✔
312
            metadata = expectedVersion;
148✔
313
            expectedVersion = ExpectedVersion.Any;
148✔
314
        }
148✔
315
        if (typeof metadata !== 'object') {
880✔
316
            callback = metadata;
148✔
317
            metadata = {};
148✔
318
        }
148✔
319
        if (typeof callback !== 'function') {
880✔
320
            callback = () => {};
724✔
321
        }
724✔
322
        return { events, expectedVersion, metadata, callback };
880✔
323
    }
880✔
324

4✔
325
    /**
4✔
326
     * Commit a list of events for the given stream name, which is expected to be at the given version.
4✔
327
     * Note that the events committed may still appear in other streams too - the given stream name is only
4✔
328
     * relevant for optimistic concurrency checks with the given expected version.
4✔
329
     *
4✔
330
     * @api
4✔
331
     * @param {string} streamName The name of the stream to commit the events to.
4✔
332
     * @param {Array<object>|object} events The events to commit or a single event.
4✔
333
     * @param {number} [expectedVersion] One of ExpectedVersion constants or a positive version number that the stream is supposed to be at before commit.
4✔
334
     * @param {object} [metadata] The commit metadata to use as base. Useful for replication and adding storage metadata.
4✔
335
     * @param {function} [callback] A function that will be executed when all events have been committed.
4✔
336
     * @throws {OptimisticConcurrencyError} if the stream is not at the expected version.
4✔
337
     */
4✔
338
    commit(streamName, events, expectedVersion = ExpectedVersion.Any, metadata = {}, callback = null) {
4✔
339
        assert(!(this.storage instanceof ReadOnlyStorage), 'The storage was opened in read-only mode. Can not commit to it.');
892✔
340
        assert(typeof streamName === 'string' && streamName !== '', 'Must specify a stream name for commit.');
892✔
341
        assert(typeof events !== 'undefined' && events !== null, 'No events specified for commit.');
892✔
342

892✔
343
        ({ events, expectedVersion, metadata, callback } = EventStore.fixArgumentTypes(events, expectedVersion, metadata, callback));
892✔
344

892✔
345
        if (!(streamName in this.streams)) {
892✔
346
            this.createEventStream(streamName, { stream: streamName });
448✔
347
        }
448✔
348
        assert(!this.streams[streamName].closed, `Stream "${streamName}" is closed and cannot be written to.`);
880✔
349
        let streamVersion = this.streams[streamName].index.length;
880✔
350
        if (expectedVersion !== ExpectedVersion.Any && streamVersion !== expectedVersion) {
892✔
351
            throw new OptimisticConcurrencyError(`Optimistic Concurrency error. Expected stream "${streamName}" at version ${expectedVersion} but is at version ${streamVersion}.`);
12✔
352
        }
12✔
353

860✔
354
        if (events.length > 1) {
892✔
355
            delete metadata.commitVersion;
44✔
356
        }
44✔
357

860✔
358
        const commitId = this.length;
860✔
359
        let commitVersion = 0;
860✔
360
        const commitSize = events.length;
860✔
361
        const committedAt = Date.now();
860✔
362
        const commit = Object.assign({
860✔
363
            commitId,
860✔
364
            committedAt
860✔
365
        }, metadata, {
860✔
366
            streamName,
860✔
367
            streamVersion,
860✔
368
            events: []
860✔
369
        });
860✔
370
        const commitCallback = () => {
860✔
371
            this.emit('commit', commit);
856✔
372
            callback(commit);
856✔
373
        };
860✔
374
        for (let event of events) {
892✔
375
            const eventMetadata = Object.assign({ commitId, committedAt, commitVersion, commitSize }, metadata, { streamVersion });
924✔
376
            const storedEvent = { stream: streamName, payload: event, metadata: eventMetadata };
924✔
377
            commitVersion++;
924✔
378
            streamVersion++;
924✔
379
            commit.events.push(event);
924✔
380
            this.storage.write(storedEvent, commitVersion !== events.length ? undefined : commitCallback);
924✔
381
        }
924✔
382
    }
892✔
383

4✔
384
    /**
4✔
385
     * @api
4✔
386
     * @param {string} streamName The name of the stream to get the version for.
4✔
387
     * @returns {number} The version that the given stream is at currently, or -1 if the stream does not exist.
4✔
388
     */
4✔
389
    getStreamVersion(streamName) {
4✔
390
        if (!(streamName in this.streams)) {
60✔
391
            return -1;
8✔
392
        }
8✔
393
        return this.streams[streamName].index.length;
52✔
394
    }
60✔
395

4✔
396
    /**
4✔
397
     * Get an event stream for the given stream name within the revision boundaries.
4✔
398
     *
4✔
399
     * @api
4✔
400
     * @param {string} streamName The name of the stream to get.
4✔
401
     * @param {number} [minRevision] The 1-based minimum revision to include in the events (inclusive).
4✔
402
     * @param {number} [maxRevision] The 1-based maximum revision to include in the events (inclusive).
4✔
403
     * @returns {EventStream|boolean} The event stream or false if a stream with the name doesn't exist.
4✔
404
     */
4✔
405
    getEventStream(streamName, minRevision = 1, maxRevision = -1) {
4✔
406
        if (!(streamName in this.streams)) {
160✔
407
            return false;
4✔
408
        }
4✔
409
        return new EventStream(streamName, this, minRevision, maxRevision);
156✔
410
    }
160✔
411

4✔
412
    /**
4✔
413
     * Get a stream for all events within the revision boundaries.
4✔
414
     * This is the same as `getEventStream('_all', ...)`.
4✔
415
     *
4✔
416
     * @api
4✔
417
     * @param {number} [minRevision] The 1-based minimum revision to include in the events (inclusive).
4✔
418
     * @param {number} [maxRevision] The 1-based maximum revision to include in the events (inclusive).
4✔
419
     * @returns {EventStream} The event stream.
4✔
420
     */
4✔
421
    getAllEvents(minRevision = 1, maxRevision = -1) {
4✔
422
        return this.getEventStream('_all', minRevision, maxRevision);
4✔
423
    }
4✔
424

4✔
425
    /**
4✔
426
     * Create a virtual event stream from existing streams by joining them.
4✔
427
     *
4✔
428
     * @param {string} streamName The (transient) name of the joined stream.
4✔
429
     * @param {Array<string>} streamNames An array of the stream names to join.
4✔
430
     * @param {number} [minRevision] The 1-based minimum revision to include in the events (inclusive).
4✔
431
     * @param {number} [maxRevision] The 1-based maximum revision to include in the events (inclusive).
4✔
432
     * @returns {EventStream} The joined event stream.
4✔
433
     * @throws {Error} if any of the streams doesn't exist.
4✔
434
     */
4✔
435
    fromStreams(streamName, streamNames, minRevision = 1, maxRevision = -1) {
4✔
436
        assert(streamNames instanceof Array, 'Must specify an array of stream names.');
24✔
437

24✔
438
        for (let stream of streamNames) {
24✔
439
            assert(stream in this.streams, `Stream "${stream}" does not exist.`);
100✔
440
        }
100✔
441
        return new JoinEventStream(streamName, streamNames, this, minRevision, maxRevision);
12✔
442
    }
24✔
443

4✔
444
    /**
4✔
445
     * Get a stream for a category of streams. This will effectively return a joined stream of all streams that start
4✔
446
     * with the given `categoryName` followed by a dash.
4✔
447
     * If you frequently use this for a category consisting of a lot of streams (e.g. `users`), consider creating a
4✔
448
     * dedicated physical stream for the category:
4✔
449
     *
4✔
450
     *    `eventstore.createEventStream('users', e => e.stream.startsWith('users-'))`
4✔
451
     *
4✔
452
     * @api
4✔
453
     * @param {string} categoryName The name of the category to get a stream for. A category is a stream name prefix.
4✔
454
     * @param {number} [minRevision] The 1-based minimum revision to include in the events (inclusive).
4✔
455
     * @param {number} [maxRevision] The 1-based maximum revision to include in the events (inclusive).
4✔
456
     * @returns {EventStream} The joined event stream for all streams of the given category.
4✔
457
     * @throws {Error} If no stream for this category exists.
4✔
458
     */
4✔
459
    getEventStreamForCategory(categoryName, minRevision = 1, maxRevision = -1) {
4✔
460
        if (categoryName in this.streams) {
12✔
461
            return this.getEventStream(categoryName, minRevision, maxRevision);
4✔
462
        }
4✔
463
        const categoryStreams = Object.keys(this.streams).filter(streamName => streamName.startsWith(categoryName + '-'));
8✔
464

8✔
465
        if (categoryStreams.length === 0) {
12✔
466
            throw new Error(`No streams for category '${categoryName}' exist.`);
4✔
467
        }
4✔
468
        return this.fromStreams(categoryName, categoryStreams, minRevision, maxRevision);
4✔
469
    }
12✔
470

4✔
471
    /**
4✔
472
     * Create a new stream with the given matcher.
4✔
473
     *
4✔
474
     * @api
4✔
475
     * @param {string} streamName The name of the stream to create.
4✔
476
     * @param {object|function(event)} matcher A matcher object, denoting the properties that need to match on an event a function that takes the event and returns true if the event should be added.
4✔
477
     * @returns {EventStream} The EventStream with all existing events matching the matcher.
4✔
478
     * @throws {Error} If a stream with that name already exists.
4✔
479
     * @throws {Error} If the stream could not be created.
4✔
480
     */
4✔
481
    createEventStream(streamName, matcher) {
4✔
482
        assert(!(this.storage instanceof ReadOnlyStorage), 'The storage was opened in read-only mode. Can not create new stream on it.');
508✔
483
        assert(!(streamName in this.streams), 'Can not recreate stream!');
508✔
484

508✔
485
        const streamIndexName = 'stream-' + streamName;
508✔
486
        const index = this.storage.ensureIndex(streamIndexName, matcher);
508✔
487
        assert(index !== null, `Error creating stream index ${streamName}.`);
508✔
488

508✔
489
        // deepcode ignore PrototypePollutionFunctionParams: streams is a Map
508✔
490
        this.streams[streamName] = { index, matcher };
508✔
491
        this.emit('stream-created', streamName);
508✔
492
        return new EventStream(streamName, this);
508✔
493
    }
508✔
494

4✔
495
    /**
4✔
496
     * Delete an event stream. Will do nothing if the stream with the name doesn't exist.
4✔
497
     *
4✔
498
     * Note that you can delete a write stream, but that will not delete the events written to it.
4✔
499
     * Also, on next write, that stream will be rebuilt from all existing events, which might take some time.
4✔
500
     *
4✔
501
     * @api
4✔
502
     * @param {string} streamName The name of the stream to delete.
4✔
503
     * @returns void
4✔
504
     */
4✔
505
    deleteEventStream(streamName) {
4✔
506
        assert(!(this.storage instanceof ReadOnlyStorage), 'The storage was opened in read-only mode. Can not delete a stream on it.');
12✔
507

12✔
508
        if (!(streamName in this.streams)) {
12✔
509
            return;
4✔
510
        }
4✔
511
        this.streams[streamName].index.destroy();
4✔
512
        delete this.streams[streamName];
4✔
513
        this.emit('stream-deleted', streamName);
4✔
514
    }
12✔
515

4✔
516
    /**
4✔
517
     * Close a stream so that no new events are indexed into it.
4✔
518
     * The stream will still be readable, but any attempt to write to it will throw an error.
4✔
519
     * A closed stream is persisted by renaming its index file to include a `.closed` marker
4✔
520
     * (e.g. `stream-X.closed.index`), so it will be recognized as closed when the store is reopened.
4✔
521
     *
4✔
522
     * @api
4✔
523
     * @param {string} streamName The name of the stream to close.
4✔
524
     * @returns void
4✔
525
     * @throws {Error} If the storage is read-only.
4✔
526
     * @throws {Error} If the stream does not exist.
4✔
527
     * @throws {Error} If the stream is already closed.
4✔
528
     */
4✔
529
    closeEventStream(streamName) {
4✔
530
        assert(!(this.storage instanceof ReadOnlyStorage), 'The storage was opened in read-only mode. Can not close a stream on it.');
48✔
531
        assert(streamName in this.streams, `Stream "${streamName}" does not exist.`);
48✔
532
        assert(!this.streams[streamName].closed, `Stream "${streamName}" is already closed.`);
48✔
533

48✔
534
        const indexName = 'stream-' + streamName;
48✔
535
        const { index } = this.streams[streamName];
48✔
536

48✔
537
        // Flush and close the index before renaming the file
48✔
538
        index.close();
48✔
539

48✔
540
        // Rename the index file to mark it as closed (e.g. stream-foo.index -> stream-foo.closed.index)
48✔
541
        const closedFileName = index.fileName.replace(/\.index$/, '.closed.index');
48✔
542
        fs.renameSync(index.fileName, closedFileName);
48✔
543

48✔
544
        // Remove from secondary indexes so that new writes are no longer indexed into this stream
48✔
545
        delete this.storage.secondaryIndexes[indexName];
48✔
546

48✔
547
        // Reopen the renamed index for read access, outside the secondary indexes write path
48✔
548
        const closedIndexName = indexName + '.closed';
48✔
549
        const closedIndex = this.storage.openReadonlyIndex(closedIndexName);
48✔
550

48✔
551
        // deepcode ignore PrototypePollutionFunctionParams: streams is a Map
48✔
552
        this.streams[streamName] = { index: closedIndex, closed: true };
48✔
553
        this.emit('stream-closed', streamName);
48✔
554
    }
48✔
555

4✔
556
    /**
4✔
557
     * Get a durable consumer for the given stream that will keep receiving events from the last position.
4✔
558
     *
4✔
559
     * @param {string} streamName The name of the stream to consume.
4✔
560
     * @param {string} identifier The unique identifying name of this consumer.
4✔
561
     * @param {object} [initialState] The initial state of the consumer.
4✔
562
     * @param {number} [since] The stream revision to start consuming from.
4✔
563
     * @returns {Consumer} A durable consumer for the given stream.
4✔
564
     */
4✔
565
    getConsumer(streamName, identifier, initialState = {}, since = 0) {
4✔
566
        const consumer = new Consumer(this.storage, streamName === '_all' ? '_all' : 'stream-' + streamName, identifier, initialState, since);
20✔
567
        consumer.streamName = streamName;
20✔
568
        return consumer;
20✔
569
    }
20✔
570

4✔
571
    /**
4✔
572
     * Scan the existing consumers on this EventStore and asynchronously return a list of their names.
4✔
573
     * @param {function(error: Error, consumers: array)} callback A callback that will receive an error as first and the list of consumers as second argument.
4✔
574
     */
4✔
575
    scanConsumers(callback) {
4✔
576
        const consumersPath = path.join(this.storage.indexDirectory, 'consumers');
8✔
577
        if (!fs.existsSync(consumersPath)) {
8✔
578
            callback(null, []);
4✔
579
            return;
4✔
580
        }
4✔
581
        const regex = new RegExp(`^${this.storage.storageFile}\\.([^.]*\\..*)$`);
4✔
582
        const consumers = [];
4✔
583
        scanForFiles(consumersPath, regex, consumers.push.bind(consumers), /* istanbul ignore next */ (err) => {
4✔
584
            if (err) {
4!
UNCOV
585
                return callback(err, []);
×
UNCOV
586
            }
×
587
            callback(null, consumers);
4✔
588
        });
4✔
589
    }
8✔
590
}
4✔
591

4✔
592
export default EventStore;
4✔
593
export { ExpectedVersion, OptimisticConcurrencyError, LOCK_THROW, LOCK_RECLAIM };
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc