• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

albe / node-event-storage / 25625880534

10 May 2026 10:02AM UTC coverage: 97.684% (-0.3%) from 98.015%
25625880534

push

github

web-flow
Merge pull request #303 from albe/copilot/sub-pr-301

feat(Storage): async partition + index scan in open() with 'opened' event and callback hook

1036 of 1086 branches covered (95.4%)

Branch coverage included in aggregate %.

100 of 102 new or added lines in 4 files covered. (98.04%)

21 existing lines in 2 files now uncovered.

5416 of 5519 relevant lines covered (98.13%)

808.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.69
/src/EventStore.js
1
import EventStream from './EventStream.js';
4✔
2
import JoinEventStream from './JoinEventStream.js';
4✔
3
import fs from 'fs';
4✔
4
import path from 'path';
4✔
5
import events from 'events';
4✔
6
import Storage, { ReadOnly as ReadOnlyStorage, LOCK_THROW, LOCK_RECLAIM } from './Storage.js';
4✔
7
import Index from './Index.js';
4✔
8
import Consumer from './Consumer.js';
4✔
9
import { assert, ensureDirectory, scanForFiles, getPropertyAtPath } from './util.js';
4✔
10
import { buildTypeMatcherFn } from './metadataUtil.js';
4✔
11

4✔
12
const ExpectedVersion = {
4✔
13
    Any: -1,
4✔
14
    EmptyStream: 0
4✔
15
};
4✔
16

4✔
17
/**
4✔
18
 * Default matcher property paths mirroring the Storage default, used for index optimization.
4✔
19
 */
4✔
20
const DEFAULT_MATCHER_PROPERTIES = ['stream', 'payload.type'];
4✔
21

4✔
22
class OptimisticConcurrencyError extends Error {}
4✔
23

4✔
24
/**
4✔
25
 * An accept condition that captures the global event-log position at the time a {@link EventStore#query}
4✔
26
 * call was made.  Pass it as the `expectedVersion` argument to {@link EventStore#commit} to enforce
4✔
27
 * DCB-style (Dynamic Consistency Boundary) optimistic concurrency: the commit is rejected only when
4✔
28
 * one or more events that match the original query (types + optional matcher) have been appended to
4✔
29
 * the store between the `query` call and the `commit` call.
4✔
30
 *
4✔
31
 * @property {string[]} types   The event types included in the query.
4✔
32
 * @property {function(object, object): boolean|null} matcher An optional function `(payload, metadata) => boolean`
4✔
33
 *   used to narrow the conflict check.  When `null`, any new event of a listed type causes a conflict.
4✔
34
 * @property {number}   noneMatchAfter The global store length (total event count) at the time the query was made.
4✔
35
 */
4✔
36
class CommitCondition {
4✔
37
    /**
4✔
38
     * @param {string[]} types
4✔
39
     * @param {function(object, object): boolean|null} [matcher]
4✔
40
     * @param {number}   noneMatchAfter
4✔
41
     */
4✔
42
    constructor(types, matcher = null, noneMatchAfter) {
4✔
43
        this.types = types;
88✔
44
        this.matcher = matcher;
88✔
45
        this.noneMatchAfter = noneMatchAfter;
88✔
46
    }
88✔
47
}
4✔
48

4✔
49
/**
4✔
50
 * An event store optimized for working with many streams.
4✔
51
 * An event stream is implemented as an iterator over an index on the storage, therefore indexes need to be lightweight
4✔
52
 * and highly performant in read-only mode.
4✔
53
 */
4✔
54
class EventStore extends events.EventEmitter {
4✔
55

4✔
56
    /**
4✔
57
     * @param {string} [storeName] The name of the store which will be used as storage prefix. Default 'eventstore'.
4✔
58
     * @param {object} [config] An object with config options.
4✔
59
     * @param {string} [config.storageDirectory] The directory where the data should be stored. Default './data'.
4✔
60
     * @param {string} [config.streamsDirectory] The directory where the streams should be stored. Default '{storageDirectory}/streams'.
4✔
61
     * @param {object} [config.storageConfig] Additional config options given to the storage backend. See `Storage`.
4✔
62
     * @param {boolean} [config.readOnly] If the storage should be mounted in read-only mode.
4✔
63
     * @param {string|function(object): string} [config.typeAccessor] Dot-notation path (e.g. `'type'`) or
4✔
64
     *   function `(event) => string` identifying the event type. Enables type-based queries via
4✔
65
     *   {@link EventStore#query} and ensures proper index routing for those queries.
4✔
66
     * @param {object|function(string): object} [config.streamMetadata] A metadata object or a function `(streamName) => object`
4✔
67
     *   that is called whenever a new stream partition is created. The returned object is stored once in the partition
4✔
68
     *   file header and surfaced to `preCommit` / `preRead` hooks. Takes precedence only when
4✔
69
     *   `config.storageConfig.metadata` is not also set.
4✔
70
     */
4✔
71
    constructor(storeName = 'eventstore', config = {}) {
4✔
72
        super();
592✔
73
        if (typeof storeName !== 'string') {
592✔
74
            config = storeName;
588✔
75
            storeName = 'eventstore';
588✔
76
        }
588✔
77

592✔
78
        if (typeof config.typeAccessor === 'string' && config.typeAccessor) {
592✔
79
            const accessorPath = config.typeAccessor;
24✔
80
            this.typeAccessor = (event) => getPropertyAtPath(event, accessorPath);
24✔
81
            this.typeMatcherFn = buildTypeMatcherFn(accessorPath);
24✔
82
        } else {
592✔
83
            this.typeAccessor = typeof config.typeAccessor === 'function' ? config.typeAccessor : null;
568✔
84
            this.typeMatcherFn = null;
568✔
85
        }
568✔
86

592✔
87
        this.storageDirectory = path.resolve(config.storageDirectory || /* istanbul ignore next */ './data');
592!
88
        let defaults = {
592✔
89
            dataDirectory: this.storageDirectory,
592✔
90
            indexDirectory: config.streamsDirectory || path.join(this.storageDirectory, 'streams'),
592✔
91
            partitioner: (event) => event.stream,
592✔
92
            readOnly: config.readOnly || false
592✔
93
        };
592✔
94
        const storageConfig = Object.assign(defaults, config.storageConfig);
592✔
95

592✔
96
        // When typeAccessor is a string path, ensure the corresponding full document path
592✔
97
        // (payload.<path>) is present in matcherProperties so the IndexMatcher discriminant
592✔
98
        // table can route type-stream lookups in O(1) on every write.
592✔
99
        if (this.typeMatcherFn) {
592✔
100
            const fullPath = `payload.${config.typeAccessor}`;
24✔
101
            const currentProps = storageConfig.matcherProperties || DEFAULT_MATCHER_PROPERTIES;
24✔
102
            if (!currentProps.includes(fullPath)) {
24✔
103
                storageConfig.matcherProperties = [...currentProps, fullPath];
8✔
104
            }
8✔
105
        }
24✔
106

592✔
107
        // Translate the high-level streamMetadata option into the storage-level metadata function,
592✔
108
        // but only when the caller has not already provided a lower-level storageConfig.metadata.
592✔
109
        if (config.streamMetadata !== undefined && storageConfig.metadata === undefined) {
592✔
110
            if (typeof config.streamMetadata === 'function') {
76✔
111
                storageConfig.metadata = config.streamMetadata;
4✔
112
            } else {
76✔
113
                storageConfig.metadata = (streamName) => config.streamMetadata[streamName] || {};
72✔
114
            }
72✔
115
        }
76✔
116

592✔
117
        this.initialize(storeName, storageConfig);
592✔
118
    }
592✔
119

4✔
120
    /**
4✔
121
     * @private
4✔
122
     * @param {string} storeName
4✔
123
     * @param {object} storageConfig
4✔
124
     */
4✔
125
    initialize(storeName, storageConfig) {
4✔
126
        this.streamsDirectory = path.resolve(storageConfig.indexDirectory);
592✔
127

592✔
128
        this.storeName = storeName;
592✔
129
        this.storage = (storageConfig.readOnly === true) ?
592✔
130
                        new ReadOnlyStorage(storeName, storageConfig)
44✔
131
                        : new Storage(storeName, storageConfig);
592✔
132
        this.streams = Object.create(null);
592✔
133
        this.streams._all = { index: this.storage.index };
592✔
134

592✔
135
        this.storage.on('index-created', this.registerStream.bind(this));
592✔
136

592✔
137
        this.storage.on('opened', () => {
592✔
138
            this.checkUnfinishedCommits();
124✔
139
            this.emit('ready');
124✔
140
        });
592✔
141

592✔
142
        this.storage.open();
592✔
143
    }
592✔
144

4✔
145
    /**
4✔
146
     * Check if the last commit in the store was unfinished, which is the case if not all events of the commit have been written.
4✔
147
     * Torn writes are handled at the storage level, so this method only deals with unfinished commits.
4✔
148
     * @private
4✔
149
     */
4✔
150
    checkUnfinishedCommits() {
4✔
151
        let position = this.storage.length;
124✔
152
        let lastEvent;
124✔
153
        let truncateIndex = false;
124✔
154
        while (position > 0) {
124✔
155
            try {
76✔
156
                lastEvent = this.storage.read(position);
76✔
157
            } catch (e) {
76!
UNCOV
158
                // A preRead hook may throw (e.g. access control). Stop repair check.
×
UNCOV
159
                return;
×
UNCOV
160
            }
×
161
            if (lastEvent !== false) break;
76✔
162
            truncateIndex = true;
16✔
163
            position--;
16✔
164
        }
16✔
165

124✔
166
        if (lastEvent && lastEvent.metadata.commitSize && lastEvent.metadata.commitVersion !== lastEvent.metadata.commitSize - 1) {
124✔
167
            this.emit('unfinished-commit', lastEvent);
8✔
168
            // commitId = global sequence number at which the commit started
8✔
169
            this.storage.truncate(lastEvent.metadata.commitId);
8✔
170
        } else if (truncateIndex) {
124✔
171
            // The index contained items that are not in the storage file; truncate everything
4✔
172
            // after `position`, the last sequence number that was successfully read.
4✔
173
            this.storage.truncate(position);
4✔
174
        }
4✔
175
    }
124✔
176

4✔
177
    /**
4✔
178
     * @private
4✔
179
     * @param {string} name The full stream name, including the `stream-` prefix (and optional `.closed` suffix).
4✔
180
     */
4✔
181
    registerStream(name) {
4✔
182
        /* istanbul ignore if */
968✔
183
        if (!name.startsWith('stream-')) {
968!
184
            return;
×
185
        }
×
186
        let streamName = name.slice(7);
968✔
187
        // Detect the `.closed` suffix — present both in the initial scan and when the directory
968✔
188
        // watcher emits 'index-created' after a writer renames the file (e.g. 'stream-foo-bar.closed').
968✔
189
        let isClosed = false;
968✔
190
        if (streamName.endsWith('.closed')) {
968✔
191
            streamName = streamName.slice(0, -7);
8✔
192
            isClosed = true;
8✔
193
        }
8✔
194
        if (streamName in this.streams) {
968✔
195
            if (isClosed && !this.streams[streamName].closed) {
40✔
196
                // The stream was renamed to .closed while this instance had it open.
4✔
197
                // The old ReadOnlyIndex was already closed via onRename, so we open the new one.
4✔
198
                const closedIndexName = 'stream-' + streamName + '.closed';
4✔
199
                const closedIndex = this.storage.openReadonlyIndex(closedIndexName);
4✔
200
                // deepcode ignore PrototypePollutionFunctionParams: streams is a Map
4✔
201
                this.streams[streamName] = { index: closedIndex, closed: true };
4✔
202
                this.emit('stream-closed', streamName);
4✔
203
            }
4✔
204
            return;
40✔
205
        }
40✔
206
        const index = isClosed
928✔
207
            ? this.storage.openReadonlyIndex(name)
968✔
208
            : this.storage.openIndex(name);
968✔
209
        // deepcode ignore PrototypePollutionFunctionParams: streams is a Map
968✔
210
        this.streams[streamName] = { index, closed: isClosed };
968✔
211
        this.emit('stream-available', streamName);
968✔
212
    }
968✔
213

4✔
214
    /**
4✔
215
     * Close the event store and free up all resources.
4✔
216
     *
4✔
217
     * @api
4✔
218
     */
4✔
219
    close() {
4✔
220
        this.storage.close();
584✔
221
    }
584✔
222

4✔
223
    /**
4✔
224
     * Override EventEmitter.on() to delegate 'preCommit' and 'preRead' event registrations
4✔
225
     * to the underlying storage, so that `eventstore.on('preCommit', handler)` works naturally.
4✔
226
     * All other events are handled by the default EventEmitter.
4✔
227
     *
4✔
228
     * @param {string} event
4✔
229
     * @param {function} listener
4✔
230
     * @returns {this}
4✔
231
     */
4✔
232
    on(event, listener) {
4✔
233
        if (event === 'preCommit' || event === 'preRead') {
208✔
234
            if (event === 'preCommit') {
76✔
235
                assert(!(this.storage instanceof ReadOnlyStorage), 'The storage was opened in read-only mode. Can not register a preCommit handler on it.');
40✔
236
            }
40✔
237
            this.storage.on(event, listener);
68✔
238
            return this;
68✔
239
        }
68✔
240
        return super.on(event, listener);
132✔
241
    }
208✔
242

4✔
243
    /**
4✔
244
     * @inheritDoc
4✔
245
     */
4✔
246
    addListener(event, listener) {
4✔
247
        return this.on(event, listener);
4✔
248
    }
4✔
249

4✔
250
    /**
4✔
251
     * Override EventEmitter.once() to delegate 'preCommit' and 'preRead' to the underlying storage.
4✔
252
     *
4✔
253
     * @param {string} event
4✔
254
     * @param {function} listener
4✔
255
     * @returns {this}
4✔
256
     */
4✔
257
    once(event, listener) {
4✔
258
        if (event === 'preCommit' || event === 'preRead') {
12✔
259
            if (event === 'preCommit') {
8✔
260
                assert(!(this.storage instanceof ReadOnlyStorage), 'The storage was opened in read-only mode. Can not register a preCommit handler on it.');
4✔
261
            }
4✔
262
            this.storage.once(event, listener);
8✔
263
            return this;
8✔
264
        }
8✔
265
        return super.once(event, listener);
4✔
266
    }
12✔
267

4✔
268
    /**
4✔
269
     * Override EventEmitter.off() / removeListener() to delegate 'preCommit' and 'preRead'
4✔
270
     * to the underlying storage.
4✔
271
     *
4✔
272
     * @param {string} event
4✔
273
     * @param {function} listener
4✔
274
     * @returns {this}
4✔
275
     */
4✔
276
    off(event, listener) {
4✔
277
        if (event === 'preCommit' || event === 'preRead') {
24✔
278
            this.storage.off(event, listener);
12✔
279
            return this;
12✔
280
        }
12✔
281
        return super.off(event, listener);
12✔
282
    }
24✔
283

4✔
284
    /**
4✔
285
     * @inheritDoc
4✔
286
     */
4✔
287
    removeListener(event, listener) {
4✔
288
        return this.off(event, listener);
12✔
289
    }
12✔
290

4✔
291
    /**
4✔
292
     * Convenience method to register a handler called before an event is committed to storage.
4✔
293
     * Equivalent to `eventstore.on('preCommit', hook)`.
4✔
294
     * The handler receives `(event, partitionMetadata)` and may throw to abort the write.
4✔
295
     * Multiple handlers can be registered; all run on every write in registration order.
4✔
296
     * The handler is invoked on every write, so its logic should be cheap, fast, and synchronous.
4✔
297
     *
4✔
298
     * @api
4✔
299
     * @param {function(object, object): void} hook A function receiving (event, partitionMetadata).
4✔
300
     * @throws {Error} If the storage was opened in read-only mode.
4✔
301
     */
4✔
302
    preCommit(hook) {
4✔
303
        this.on('preCommit', hook);
20✔
304
    }
20✔
305

4✔
306
    /**
4✔
307
     * Convenience method to register a handler called before an event is read from storage.
4✔
308
     * Equivalent to `eventstore.on('preRead', hook)`.
4✔
309
     * The handler receives `(position, partitionMetadata)` and may throw to abort the read.
4✔
310
     * Multiple handlers can be registered; all run on every read in registration order.
4✔
311
     * The handler is invoked on every read, so its logic should be cheap, fast, and synchronous.
4✔
312
     *
4✔
313
     * @api
4✔
314
     * @param {function(number, object): void} hook A function receiving (position, partitionMetadata).
4✔
315
     */
4✔
316
    preRead(hook) {
4✔
317
        this.on('preRead', hook);
12✔
318
    }
12✔
319

4✔
320
    /**
4✔
321
     * Get the number of events stored.
4✔
322
     *
4✔
323
     * @api
4✔
324
     * @returns {number}
4✔
325
     */
4✔
326
    get length() {
4✔
327
        return this.storage.length;
1,320✔
328
    }
1,320✔
329

4✔
330
    /**
4✔
331
     * This method makes it so the last three arguments can be given either as:
4✔
332
     *  - expectedVersion, metadata, callback
4✔
333
     *  - expectedVersion, callback
4✔
334
     *  - metadata, callback
4✔
335
     *  - callback
4✔
336
     *
4✔
337
     * @private
4✔
338
     * @param {Array<object>|object} events
4✔
339
     * @param {number|CommitCondition} [expectedVersion]
4✔
340
     * @param {object|function} [metadata]
4✔
341
     * @param {function} [callback]
4✔
342
     * @returns {{events: Array<object>, metadata: object, callback: function, expectedVersion: number|CommitCondition}}
4✔
343
     */
4✔
344
    static fixArgumentTypes(events, expectedVersion, metadata, callback) {
4✔
345
        if (!(events instanceof Array)) {
1,136✔
346
            events = [events];
84✔
347
        }
84✔
348
        if (typeof expectedVersion !== 'number' && !(expectedVersion instanceof CommitCondition)) {
1,136✔
349
            callback = metadata;
280✔
350
            metadata = expectedVersion;
280✔
351
            expectedVersion = ExpectedVersion.Any;
280✔
352
        }
280✔
353
        if (typeof metadata !== 'object') {
1,136✔
354
            callback = metadata;
284✔
355
            metadata = {};
284✔
356
        }
284✔
357
        if (typeof callback !== 'function') {
1,136✔
358
            callback = () => {};
840✔
359
        }
840✔
360
        return { events, expectedVersion, metadata, callback };
1,136✔
361
    }
1,136✔
362

4✔
363
    /**
4✔
364
     * Check a {@link CommitCondition} against the current state of the store.
4✔
365
     * Iterates a join stream over all condition type streams starting from
4✔
366
     * `condition.noneMatchAfter` (the global position captured at query time), and throws an
4✔
367
     * {@link OptimisticConcurrencyError} when a new event of a listed type satisfies
4✔
368
     * `condition.matcher(payload, metadata)` (or any such event when no matcher is provided).
4✔
369
     *
4✔
370
     * @param {CommitCondition} condition
4✔
371
     * @throws {OptimisticConcurrencyError}
4✔
372
     */
4✔
373
    checkCondition(condition) {
4✔
374
        if (this.storage.length <= condition.noneMatchAfter) return; // no new events since condition was obtained
28✔
375

16✔
376
        const existingTypes = condition.types.filter(t => t in this.streams);
16✔
377
        if (existingTypes.length === 0) return;
28!
378

16✔
379
        // Only events after condition.noneMatchAfter can be conflicts.
16✔
380
        const stream = this.fromStreams(
16✔
381
            '_check_' + condition.types.join('_'),
16✔
382
            existingTypes,
16✔
383
            condition.noneMatchAfter + 1
16✔
384
        );
16✔
385

16✔
386
        let next;
16✔
387
        while ((next = stream.next()) !== false) {
16✔
388
            if (!condition.matcher || condition.matcher(next.payload, next.metadata)) {
16✔
389
                throw new OptimisticConcurrencyError(
12✔
390
                    `Optimistic Concurrency error. A conflicting event was committed since the condition was obtained.`
12✔
391
                );
12✔
392
            }
12✔
393
        }
16✔
394
    }
28✔
395

4✔
396
    /**
4✔
397
     * Ensure a dedicated type stream exists for each event's type, creating it if needed.
4✔
398
     * Must be called before the entity stream is created to guarantee correct index routing.
4✔
399
     *
4✔
400
     * @param {Array<object>} events The events to process.
4✔
401
     */
4✔
402
    ensureTypeStreams(events) {
4✔
403
        if (!this.typeAccessor) return;
1,124✔
404
        for (const event of events) {
1,124✔
405
            const type = this.typeAccessor(event);
120✔
406
            if (type && !(type in this.streams)) {
120✔
407
                const matcher = this.typeMatcherFn
100✔
408
                    ? this.typeMatcherFn(type)
100✔
409
                    : (doc) => this.typeAccessor(doc.payload) === type;
100✔
410
                this.createEventStream(type, matcher, false);
100✔
411
            }
100✔
412
        }
120✔
413
    }
1,124✔
414

4✔
415
    /**
4✔
416
     * Commit a list of events for the given stream name, which is expected to be at the given version.
4✔
417
     * Note that the events committed may still appear in other streams too - the given stream name is only
4✔
418
     * relevant for optimistic concurrency checks with the given expected version.
4✔
419
     *
4✔
420
     * @api
4✔
421
     * @param {string} streamName The name of the stream to commit the events to.
4✔
422
     * @param {Array<object>|object} events The events to commit or a single event.
4✔
423
     * @param {number|CommitCondition} [expectedVersion] One of the `ExpectedVersion` constants, a positive
4✔
424
     *   stream version number, or a {@link CommitCondition} obtained from {@link EventStore#query}.
4✔
425
     * @param {object} [metadata] The commit metadata to use as base. Useful for replication and adding storage metadata.
4✔
426
     * @param {function} [callback] A function that will be executed when all events have been committed.
4✔
427
     * @throws {OptimisticConcurrencyError} if the stream is not at the expected version, or if a
4✔
428
     *   {@link CommitCondition} was provided and conflicting events have been committed since it was obtained.
4✔
429
     */
4✔
430
    commit(streamName, events, expectedVersion = ExpectedVersion.Any, metadata = {}, callback = null) {
4✔
431
        assert(!(this.storage instanceof ReadOnlyStorage), 'The storage was opened in read-only mode. Can not commit to it.');
1,148✔
432
        assert(typeof streamName === 'string' && streamName !== '', 'Must specify a stream name for commit.');
1,148✔
433
        assert(typeof events !== 'undefined' && events !== null, 'No events specified for commit.');
1,148✔
434

1,148✔
435
        ({ events, expectedVersion, metadata, callback } = EventStore.fixArgumentTypes(events, expectedVersion, metadata, callback));
1,148✔
436

1,148✔
437
        // Perform DCB-style concurrency check when a CommitCondition is provided.
1,148✔
438
        if (expectedVersion instanceof CommitCondition) {
1,148✔
439
            this.checkCondition(expectedVersion);
28✔
440
            expectedVersion = ExpectedVersion.Any;
28✔
441
        }
28✔
442

1,124✔
443
        // When typeAccessor is configured, ensure a dedicated type stream exists for each event
1,124✔
444
        // before the entity stream write so the type stream index is never incomplete.
1,124✔
445
        this.ensureTypeStreams(events);
1,124✔
446

1,124✔
447
        if (!(streamName in this.streams)) {
1,148✔
448
            this.createEventStream(streamName, { stream: streamName }, false);
684✔
449
        }
684✔
450
        assert(!this.streams[streamName].closed, `Stream "${streamName}" is closed and cannot be written to.`);
1,124✔
451
        let streamVersion = this.streams[streamName].index.length;
1,124✔
452
        if (expectedVersion !== ExpectedVersion.Any && streamVersion !== expectedVersion) {
1,148✔
453
            throw new OptimisticConcurrencyError(`Optimistic Concurrency error. Expected stream "${streamName}" at version ${expectedVersion} but is at version ${streamVersion}.`);
12✔
454
        }
12✔
455

1,104✔
456
        if (events.length > 1) {
1,148✔
457
            delete metadata.commitVersion;
60✔
458
        }
60✔
459

1,104✔
460
        const commitId = this.length;
1,104✔
461
        let commitVersion = 0;
1,104✔
462
        const commitSize = events.length;
1,104✔
463
        const committedAt = Date.now();
1,104✔
464
        const commit = Object.assign({
1,104✔
465
            commitId,
1,104✔
466
            committedAt
1,104✔
467
        }, metadata, {
1,104✔
468
            streamName,
1,104✔
469
            streamVersion,
1,104✔
470
            events: []
1,104✔
471
        });
1,104✔
472
        const commitCallback = () => {
1,104✔
473
            this.emit('commit', commit);
1,100✔
474
            callback(commit);
1,100✔
475
        };
1,104✔
476
        for (let event of events) {
1,148✔
477
            const eventMetadata = Object.assign({ commitId, committedAt, commitVersion, commitSize }, metadata, { streamVersion });
1,192✔
478
            const storedEvent = { stream: streamName, payload: event, metadata: eventMetadata };
1,192✔
479
            commitVersion++;
1,192✔
480
            streamVersion++;
1,192✔
481
            commit.events.push(event);
1,192✔
482
            this.storage.write(storedEvent, commitVersion !== events.length ? undefined : commitCallback);
1,192✔
483
        }
1,192✔
484
    }
1,148✔
485

4✔
486
    /**
4✔
487
     * @api
4✔
488
     * @param {string} streamName The name of the stream to get the version for.
4✔
489
     * @returns {number} The version that the given stream is at currently, or -1 if the stream does not exist.
4✔
490
     */
4✔
491
    getStreamVersion(streamName) {
4✔
492
        if (!(streamName in this.streams)) {
108✔
493
            return -1;
20✔
494
        }
20✔
495
        return this.streams[streamName].index.length;
88✔
496
    }
108✔
497

4✔
498
    /**
4✔
499
     * Query the event store for events matching a set of event types and an optional filter function.
4✔
500
     * Returns a pre-filtered event stream and a {@link CommitCondition} that can be passed to
4✔
501
     * {@link EventStore#commit} to enforce optimistic concurrency.
4✔
502
     *
4✔
503
     * A conflict occurs when at least one event appended between the `query` call and the `commit` call
4✔
504
     * belongs to one of the listed types and (when `matcher` is provided) also satisfies
4✔
505
     * `matcher(payload, metadata)`.  Events written before the `query` call are never treated as conflicts.
4✔
506
     *
4✔
507
     * **Behaviour when a type stream does not exist:**
4✔
508
     * - Without `typeAccessor` configured: throws an error, because the store cannot guarantee that no
4✔
509
     *   events of that type exist (the stream was never created).  Create the stream explicitly first,
4✔
510
     *   or configure `typeAccessor` to have streams created automatically on commit.
4✔
511
     * - With `typeAccessor` configured: treats the missing stream as empty (0-length).  The stream will
4✔
512
     *   be created automatically the first time an event of that type is committed.
4✔
513
     *
4✔
514
     * @api
4✔
515
     * @param {string[]} types A non-empty array of event-type names to query.
4✔
516
     * @param {function(object, object): boolean|null} [matcher] An optional filter function `(payload, metadata) => boolean`
4✔
517
     *   passed to the returned {@link CommitCondition}.
4✔
518
     * @param {number} [minRevision=1] The 1-based minimum global revision to include in the returned stream (inclusive).
4✔
519
     * @returns {{ condition: CommitCondition, stream: EventStream }} An object with:
4✔
520
     *   - `condition` — the {@link CommitCondition} to pass to {@link EventStore#commit}.
4✔
521
     *   - `stream` — a read-only event stream containing all matching events.
4✔
522
     * @throws {Error} if `types` is not a non-empty array.
4✔
523
     * @throws {Error} if `typeAccessor` is not configured and any of the listed type streams do not exist.
4✔
524
     */
4✔
525
    query(types, matcher = null, minRevision = 1) {
4✔
526
        assert(Array.isArray(types) && types.length > 0, 'Must specify a non-empty array of event types for query.');
100✔
527

100✔
528
        const queryTypes = [];
100✔
529
        for (const type of types) {
100✔
530
            if (!(type in this.streams)) {
104✔
531
                if (this.typeAccessor) {
52✔
532
                    // typeAccessor is configured: type streams are created on commit, so a missing
48✔
533
                    // stream simply means no event of this type has been committed yet — treat as empty.
48✔
534
                    continue;
48✔
535
                }
48✔
536
                // No typeAccessor: the stream was never created; we cannot know whether events of
4✔
537
                // this type exist in the store, so throw to avoid an unintentional full-store scan.
4✔
538
                throw new Error(`Type stream "${type}" does not exist. Create it with createEventStream() first, or configure typeAccessor to have type streams created automatically on commit.`);
4✔
539
            }
4✔
540
            queryTypes.push(type);
52✔
541
        }
52✔
542

88✔
543
        const condition = new CommitCondition(types, matcher, this.storage.length);
88✔
544
        const stream = this.fromStreams('_query_' + types.join('_'), queryTypes, minRevision, -1, matcher);
88✔
545
        return { stream, condition };
88✔
546
    }
100✔
547

4✔
548
    /**
4✔
549
     * Get an event stream for the given stream name within the revision boundaries.
4✔
550
     *
4✔
551
     * @api
4✔
552
     * @param {string} streamName The name of the stream to get.
4✔
553
     * @param {number} [minRevision] The 1-based minimum revision to include in the events (inclusive).
4✔
554
     * @param {number} [maxRevision] The 1-based maximum revision to include in the events (inclusive).
4✔
555
     * @returns {EventStream|boolean} The event stream or false if a stream with the name doesn't exist.
4✔
556
     */
4✔
557
    getEventStream(streamName, minRevision = 1, maxRevision = -1) {
4✔
558
        if (!(streamName in this.streams)) {
184✔
559
            return false;
4✔
560
        }
4✔
561
        return new EventStream(streamName, this, minRevision, maxRevision);
180✔
562
    }
184✔
563

4✔
564
    /**
4✔
565
     * Get a stream for all events within the revision boundaries.
4✔
566
     * This is the same as `getEventStream('_all', ...)`.
4✔
567
     *
4✔
568
     * @api
4✔
569
     * @param {number} [minRevision] The 1-based minimum revision to include in the events (inclusive).
4✔
570
     * @param {number} [maxRevision] The 1-based maximum revision to include in the events (inclusive).
4✔
571
     * @returns {EventStream} The event stream.
4✔
572
     */
4✔
573
    getAllEvents(minRevision = 1, maxRevision = -1) {
4✔
574
        return this.getEventStream('_all', minRevision, maxRevision);
4✔
575
    }
4✔
576

4✔
577
    /**
4✔
578
     * Create a virtual event stream from existing streams by joining them.
4✔
579
     *
4✔
580
     * @param {string} streamName The (transient) name of the joined stream.
4✔
581
     * @param {Array<string>} streamNames An array of the stream names to join.
4✔
582
     * @param {number} [minRevision] The 1-based minimum revision to include in the events (inclusive).
4✔
583
     * @param {number} [maxRevision] The 1-based maximum revision to include in the events (inclusive).
4✔
584
     * @param {function(object, object): boolean|null} [predicate] An optional filter predicate
4✔
585
     *   `(payload, metadata) => boolean`. Only events for which this returns truthy are yielded.
4✔
586
     * @returns {EventStream} The joined event stream.
4✔
587
     * @throws {Error} if any of the streams doesn't exist.
4✔
588
     */
4✔
589
    fromStreams(streamName, streamNames, minRevision = 1, maxRevision = -1, predicate = null) {
4✔
590
        assert(streamNames instanceof Array, 'Must specify an array of stream names.');
152✔
591

152✔
592
        if (streamNames.length === 0) {
152✔
593
            return new EventStream(streamName, this);
44✔
594
        }
44✔
595

100✔
596
        for (let stream of streamNames) {
152✔
597
            assert(stream in this.streams, `Stream "${stream}" does not exist.`);
256✔
598
        }
256✔
599

96✔
600
        if (streamNames.length === 1) {
152✔
601
            const stream = new EventStream(streamNames[0], this, minRevision, maxRevision, predicate);
52✔
602
            stream.name = streamName;
52✔
603
            return stream;
52✔
604
        }
52✔
605

44✔
606
        return new JoinEventStream(streamName, streamNames, this, minRevision, maxRevision, predicate);
44✔
607
    }
152✔
608

4✔
609
    /**
4✔
610
     * Get a stream for a category of streams. This will effectively return a joined stream of all streams that start
4✔
611
     * with the given `categoryName` followed by a dash (flat layout, e.g. `users-123`) or a slash (hierarchical
4✔
612
     * layout, e.g. `users/123`).
4✔
613
     * If you frequently use this for a category consisting of a lot of streams (e.g. `users`), consider creating a
4✔
614
     * dedicated physical stream for the category:
4✔
615
     *
4✔
616
     *    `eventstore.createEventStream('users', e => e.stream.startsWith('users-') || e.stream.startsWith('users/'))`
4✔
617
     *
4✔
618
     * @api
4✔
619
     * @param {string} categoryName The name of the category to get a stream for. A category is a stream name prefix.
4✔
620
     * @param {number} [minRevision] The 1-based minimum revision to include in the events (inclusive).
4✔
621
     * @param {number} [maxRevision] The 1-based maximum revision to include in the events (inclusive).
4✔
622
     * @returns {EventStream} The joined event stream for all streams of the given category.
4✔
623
     * @throws {Error} If no stream for this category exists.
4✔
624
     */
4✔
625
    getEventStreamForCategory(categoryName, minRevision = 1, maxRevision = -1) {
4✔
626
        if (categoryName in this.streams) {
36✔
627
            return this.getEventStream(categoryName, minRevision, maxRevision);
4✔
628
        }
4✔
629
        const categoryStreams = Object.keys(this.streams).filter(streamName =>
32✔
630
            streamName.startsWith(categoryName + '-') ||
212✔
631
            streamName.startsWith(categoryName + '/')
124✔
632
        );
32✔
633

32✔
634
        if (categoryStreams.length === 0) {
36✔
635
            throw new Error(`No streams for category '${categoryName}' exist.`);
4✔
636
        }
4✔
637
        return this.fromStreams(categoryName, categoryStreams, minRevision, maxRevision);
28✔
638
    }
36✔
639

4✔
640
    /**
4✔
641
     * Create a new stream with the given matcher.
4✔
642
     *
4✔
643
     * @api
4✔
644
     * @param {string} streamName The name of the stream to create.
4✔
645
     * @param {object|function(event)} matcher A matcher object, denoting the properties that need to match on an event a function that takes the event and returns true if the event should be added.
4✔
646
     * @param {boolean} [reindex=true] Whether to scan existing documents and populate the new index. Set to false when it is known that no existing documents can match the matcher (e.g. when creating a brand-new write stream).
4✔
647
     * @returns {EventStream} The EventStream with all existing events matching the matcher.
4✔
648
     * @throws {Error} If a stream with that name already exists.
4✔
649
     * @throws {Error} If the stream could not be created.
4✔
650
     */
4✔
651
    createEventStream(streamName, matcher, reindex = true) {
4✔
652
        assert(!(this.storage instanceof ReadOnlyStorage), 'The storage was opened in read-only mode. Can not create new stream on it.');
860✔
653
        assert(!(streamName in this.streams), 'Can not recreate stream!');
860✔
654

860✔
655
        const streamIndexName = 'stream-' + streamName;
860✔
656
        if (streamName.includes('/')) {
860✔
657
            const subDir = path.join(this.streamsDirectory, this.storeName + '.stream-' + path.dirname(streamName));
88✔
658
            ensureDirectory(subDir);
88✔
659
        }
88✔
660
        const index = this.storage.ensureIndex(streamIndexName, matcher, reindex);
844✔
661
        assert(index !== null, `Error creating stream index ${streamName}.`);
844✔
662

844✔
663
        // deepcode ignore PrototypePollutionFunctionParams: streams is a Map
844✔
664
        this.streams[streamName] = { index, matcher };
844✔
665
        this.emit('stream-created', streamName);
844✔
666
        return new EventStream(streamName, this);
844✔
667
    }
860✔
668

4✔
669
    /**
4✔
670
     * Delete an event stream. Will do nothing if the stream with the name doesn't exist.
4✔
671
     *
4✔
672
     * Note that you can delete a write stream, but that will not delete the events written to it.
4✔
673
     * Also, on next write, that stream will be rebuilt from all existing events, which might take some time.
4✔
674
     *
4✔
675
     * @api
4✔
676
     * @param {string} streamName The name of the stream to delete.
4✔
677
     * @returns void
4✔
678
     */
4✔
679
    deleteEventStream(streamName) {
4✔
680
        assert(!(this.storage instanceof ReadOnlyStorage), 'The storage was opened in read-only mode. Can not delete a stream on it.');
12✔
681

12✔
682
        if (!(streamName in this.streams)) {
12✔
683
            return;
4✔
684
        }
4✔
685
        this.streams[streamName].index.destroy();
4✔
686
        delete this.streams[streamName];
4✔
687
        this.emit('stream-deleted', streamName);
4✔
688
    }
12✔
689

4✔
690
    /**
4✔
691
     * Close a stream so that no new events are indexed into it.
4✔
692
     * The stream will still be readable, but any attempt to write to it will throw an error.
4✔
693
     * A closed stream is persisted by renaming its index file to include a `.closed` marker
4✔
694
     * (e.g. `stream-X.closed.index`), so it will be recognized as closed when the store is reopened.
4✔
695
     *
4✔
696
     * @api
4✔
697
     * @param {string} streamName The name of the stream to close.
4✔
698
     * @returns void
4✔
699
     * @throws {Error} If the storage is read-only.
4✔
700
     * @throws {Error} If the stream does not exist.
4✔
701
     * @throws {Error} If the stream is already closed.
4✔
702
     */
4✔
703
    closeEventStream(streamName) {
4✔
704
        assert(!(this.storage instanceof ReadOnlyStorage), 'The storage was opened in read-only mode. Can not close a stream on it.');
48✔
705
        assert(streamName in this.streams, `Stream "${streamName}" does not exist.`);
48✔
706
        assert(!this.streams[streamName].closed, `Stream "${streamName}" is already closed.`);
48✔
707

48✔
708
        const indexName = 'stream-' + streamName;
48✔
709
        const { index } = this.streams[streamName];
48✔
710

48✔
711
        // Flush and close the index before renaming the file
48✔
712
        index.close();
48✔
713

48✔
714
        // Rename the index file to mark it as closed (e.g. stream-foo.index -> stream-foo.closed.index)
48✔
715
        const closedFileName = index.fileName.replace(/\.index$/, '.closed.index');
48✔
716
        fs.renameSync(index.fileName, closedFileName);
48✔
717

48✔
718
        // Remove from secondary indexes so that new writes are no longer indexed into this stream
48✔
719
        this.storage.removeSecondaryIndex(indexName);
48✔
720

48✔
721
        // Reopen the renamed index for read access, outside the secondary indexes write path
48✔
722
        const closedIndexName = indexName + '.closed';
48✔
723
        const closedIndex = this.storage.openReadonlyIndex(closedIndexName);
48✔
724

48✔
725
        // deepcode ignore PrototypePollutionFunctionParams: streams is a Map
48✔
726
        this.streams[streamName] = { index: closedIndex, closed: true };
48✔
727
        this.emit('stream-closed', streamName);
48✔
728
    }
48✔
729

4✔
730
    /**
4✔
731
     * Get a durable consumer for the given stream that will keep receiving events from the last position.
4✔
732
     *
4✔
733
     * @param {string} streamName The name of the stream to consume.
4✔
734
     * @param {string} identifier The unique identifying name of this consumer.
4✔
735
     * @param {object} [initialState] The initial state of the consumer.
4✔
736
     * @param {number} [since] The stream revision to start consuming from.
4✔
737
     * @returns {Consumer} A durable consumer for the given stream.
4✔
738
     */
4✔
739
    getConsumer(streamName, identifier, initialState = {}, since = 0) {
4✔
740
        const consumer = new Consumer(this.storage, streamName === '_all' ? '_all' : 'stream-' + streamName, identifier, initialState, since);
20✔
741
        consumer.streamName = streamName;
20✔
742
        return consumer;
20✔
743
    }
20✔
744

4✔
745
    /**
4✔
746
     * Scan the existing consumers on this EventStore and asynchronously return a list of their names.
4✔
747
     * @param {function(error: Error, consumers: array)} callback A callback that will receive an error as first and the list of consumers as second argument.
4✔
748
     */
4✔
749
    scanConsumers(callback) {
4✔
750
        const consumersPath = path.join(this.storage.indexDirectory, 'consumers');
8✔
751
        if (!fs.existsSync(consumersPath)) {
8✔
752
            callback(null, []);
4✔
753
            return;
4✔
754
        }
4✔
755
        const regex = new RegExp(`^${this.storage.storageFile}\\.([^.]*\\..*)$`);
4✔
756
        const consumers = [];
4✔
757
        scanForFiles(consumersPath, regex, consumers.push.bind(consumers), /* istanbul ignore next */ (err) => {
4✔
758
            if (err) {
4!
759
                return callback(err, []);
×
760
            }
×
761
            callback(null, consumers);
4✔
762
        });
4✔
763
    }
8✔
764
}
4✔
765

4✔
766
EventStore.Storage = Storage;
4✔
767
EventStore.Index = Index;
4✔
768

4✔
769
export default EventStore;
4✔
770
export { ExpectedVersion, OptimisticConcurrencyError, CommitCondition, LOCK_THROW, LOCK_RECLAIM };
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc