• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

albe / node-event-storage / 26751854367

01 Jun 2026 11:21AM UTC coverage: 98.421% (+0.001%) from 98.42%
26751854367

push

github

albe
Add full logo SVG

1201 of 1247 branches covered (96.31%)

Branch coverage included in aggregate %.

5907 of 5975 relevant lines covered (98.86%)

884.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.54
/src/EventStore.js
1
import EventStream from './EventStream.js';
4✔
2
import JoinEventStream from './JoinEventStream.js';
4✔
3
import fs from 'fs';
4✔
4
import path from 'path';
4✔
5
import events from 'events';
4✔
6
import Storage, { ReadOnly as ReadOnlyStorage, LOCK_THROW, LOCK_RECLAIM } from './Storage.js';
4✔
7
import Index from './Index.js';
4✔
8
import Consumer from './Consumer.js';
4✔
9
import { assert, getPropertyAtPath } from './utils/util.js';
4✔
10
import { ensureDirectory, scanForFiles } from './utils/fsUtil.js';
4✔
11
import { buildTypeMatcherFn } from './utils/metadataUtil.js';
4✔
12

4✔
13
const ExpectedVersion = {
4✔
14
    Any: -1,
4✔
15
    EmptyStream: 0
4✔
16
};
4✔
17

4✔
18
/**
4✔
19
 * Default matcher property paths mirroring the Storage default, used for index optimization.
4✔
20
 */
4✔
21
const DEFAULT_MATCHER_PROPERTIES = ['stream', 'payload.type'];
4✔
22
const STREAM_NAME_PATTERN = /^[A-Za-z0-9][A-Za-z0-9_]*(?:[\/:@~+=\-#.][A-Za-z0-9_]+)*$/;
4✔
23
const STORAGE_HOOK_EVENTS = new Set(['preCommit', 'preRead']);
4✔
24

4✔
25
class OptimisticConcurrencyError extends Error {}
4✔
26

4✔
27
/**
4✔
28
 * An accept condition that captures the global event-log position at the time a {@link EventStore#query}
4✔
29
 * call was made.  Pass it as the `expectedVersion` argument to {@link EventStore#commit} to enforce
4✔
30
 * DCB-style (Dynamic Consistency Boundary) optimistic concurrency: the commit is rejected only when
4✔
31
 * one or more events that match the original query (types + optional matcher) have been appended to
4✔
32
 * the store between the `query` call and the `commit` call.
4✔
33
 *
4✔
34
 * @property {string[]} types   The event types included in the query.
4✔
35
 * @property {function(object, object): boolean|null} matcher An optional function `(payload, metadata) => boolean`
4✔
36
 *   used to narrow the conflict check.  When `null`, any new event of a listed type causes a conflict.
4✔
37
 * @property {number}   noneMatchAfter The global store length (total event count) at the time the query was made.
4✔
38
 */
4✔
39
class CommitCondition {
4✔
40
    /**
4✔
41
     * @param {string[]} types
4✔
42
     * @param {function(object, object): boolean|object|null} [matcher]
4✔
43
     * @param {number}   noneMatchAfter
4✔
44
     * @param {boolean}  [raw=false]
4✔
45
     */
4✔
46
    constructor(types, matcher = null, noneMatchAfter, raw = false) {
4✔
47
        this.types = types;
96✔
48
        this.matcher = matcher;
96✔
49
        this.raw = raw;
96✔
50
        this.noneMatchAfter = noneMatchAfter;
96✔
51
    }
96✔
52
}
4✔
53

4✔
54
/**
4✔
55
 * An event store optimized for working with many streams.
4✔
56
 * An event stream is implemented as an iterator over an index on the storage, therefore indexes need to be lightweight
4✔
57
 * and highly performant in read-only mode.
4✔
58
 */
4✔
59
class EventStore extends events.EventEmitter {
4✔
60

4✔
61
    /**
4✔
62
     * @param {string} [storeName] The name of the store which will be used as storage prefix. Default 'eventstore'.
4✔
63
     * @param {object} [config] An object with config options.
4✔
64
     * @param {string} [config.storageDirectory] The directory where the data should be stored. Default './data'.
4✔
65
     * @param {string} [config.streamsDirectory] The directory where the streams should be stored. Default '{storageDirectory}/streams'.
4✔
66
     * @param {object} [config.storageConfig] Additional config options given to the storage backend. See `Storage`.
4✔
67
     * @param {boolean} [config.readOnly] If the storage should be mounted in read-only mode.
4✔
68
     * @param {string|function(object): string} [config.typeAccessor] Dot-notation path (e.g. `'type'`) or
4✔
69
     *   function `(event) => string` identifying the event type. Enables type-based queries via
4✔
70
     *   {@link EventStore#query} and ensures proper index routing for those queries.
4✔
71
     * @param {object|function(string): object} [config.streamMetadata] A metadata object or a function `(streamName) => object`
4✔
72
     *   that is called whenever a new stream partition is created. The returned object is stored once in the partition
4✔
73
     *   file header and surfaced to `preCommit` / `preRead` hooks. Takes precedence only when
4✔
74
     *   `config.storageConfig.metadata` is not also set.
4✔
75
     */
4✔
76
    constructor(storeName = 'eventstore', config = {}) {
4✔
77
        super();
656✔
78
        if (typeof storeName !== 'string') {
656✔
79
            config = storeName;
652✔
80
            storeName = 'eventstore';
652✔
81
        }
652✔
82

656✔
83
        if (typeof config.typeAccessor === 'string' && config.typeAccessor) {
656✔
84
            const accessorPath = config.typeAccessor;
24✔
85
            this.typeAccessor = (event) => getPropertyAtPath(event, accessorPath);
24✔
86
            this.typeMatcherFn = buildTypeMatcherFn(accessorPath);
24✔
87
        } else {
656✔
88
            this.typeAccessor = typeof config.typeAccessor === 'function' ? config.typeAccessor : null;
632✔
89
            this.typeMatcherFn = null;
632✔
90
        }
632✔
91

656✔
92
        this.storageDirectory = path.resolve(config.storageDirectory || /* istanbul ignore next */ './data');
656!
93
        let defaults = {
656✔
94
            dataDirectory: this.storageDirectory,
656✔
95
            indexDirectory: config.streamsDirectory || path.join(this.storageDirectory, 'streams'),
656✔
96
            partitioner: (event) => event.stream,
656✔
97
            readOnly: config.readOnly || false
656✔
98
        };
656✔
99
        const storageConfig = Object.assign(defaults, config.storageConfig);
656✔
100

656✔
101
        // When typeAccessor is a string path, ensure the corresponding full document path
656✔
102
        // (payload.<path>) is present in matcherProperties so the IndexMatcher discriminant
656✔
103
        // table can route type-stream lookups in O(1) on every write.
656✔
104
        if (this.typeMatcherFn) {
656✔
105
            const fullPath = `payload.${config.typeAccessor}`;
24✔
106
            const currentProps = storageConfig.matcherProperties || DEFAULT_MATCHER_PROPERTIES;
24✔
107
            if (!currentProps.includes(fullPath)) {
24✔
108
                storageConfig.matcherProperties = [...currentProps, fullPath];
8✔
109
            }
8✔
110
        }
24✔
111

656✔
112
        // Translate the high-level streamMetadata option into the storage-level metadata function,
656✔
113
        // but only when the caller has not already provided a lower-level storageConfig.metadata.
656✔
114
        if (config.streamMetadata !== undefined && storageConfig.metadata === undefined) {
656✔
115
            if (typeof config.streamMetadata === 'function') {
76✔
116
                storageConfig.metadata = config.streamMetadata;
4✔
117
            } else {
76✔
118
                storageConfig.metadata = (streamName) => config.streamMetadata[streamName] || {};
72✔
119
            }
72✔
120
        }
76✔
121

656✔
122
        this.initialize(storeName, storageConfig);
656✔
123
    }
656✔
124

4✔
125
    /**
4✔
126
     * @private
4✔
127
     * @param {string} storeName
4✔
128
     * @param {object} storageConfig
4✔
129
     */
4✔
130
    initialize(storeName, storageConfig) {
4✔
131
        this.streamsDirectory = path.resolve(storageConfig.indexDirectory);
656✔
132

656✔
133
        this.storeName = storeName;
656✔
134
        this.storage = (storageConfig.readOnly === true) ?
656✔
135
                        new ReadOnlyStorage(storeName, storageConfig)
44✔
136
                        : new Storage(storeName, storageConfig);
656✔
137
        this.streams = Object.create(null);
656✔
138
        this.streams._all = { index: this.storage.index };
656✔
139
        this.consumers = new Map();
656✔
140

656✔
141
        this.storage.on('index-created', this.registerStream.bind(this));
656✔
142

656✔
143
        this.storage.on('opened', () => {
656✔
144
            this.checkUnfinishedCommits();
128✔
145
            this.emit('ready');
128✔
146
        });
656✔
147

656✔
148
        this.storage.open();
656✔
149
    }
656✔
150

4✔
151
    /**
4✔
152
     * Check if the last commit in the store was unfinished, which is the case if not all events of the commit have been written.
4✔
153
     * Torn writes are handled at the storage level, so this method only deals with unfinished commits.
4✔
154
     * @private
4✔
155
     */
4✔
156
    checkUnfinishedCommits() {
4✔
157
        let position = this.storage.length;
128✔
158
        let lastEvent;
128✔
159
        let truncateIndex = false;
128✔
160
        while (position > 0) {
128✔
161
            try {
80✔
162
                lastEvent = this.storage.read(position);
80✔
163
            } catch (e) {
80!
164
                // A preRead hook may throw (e.g. access control). Stop repair check.
×
165
                return;
×
166
            }
×
167
            if (lastEvent !== false) break;
80✔
168
            truncateIndex = true;
16✔
169
            position--;
16✔
170
        }
16✔
171

128✔
172
        if (lastEvent && lastEvent.metadata.commitSize && lastEvent.metadata.commitVersion !== lastEvent.metadata.commitSize - 1) {
128✔
173
            this.emit('unfinished-commit', lastEvent);
8✔
174
            // commitId = global sequence number at which the commit started
8✔
175
            this.storage.truncate(lastEvent.metadata.commitId);
8✔
176
        } else if (truncateIndex) {
128✔
177
            // The index contained items that are not in the storage file; truncate everything
4✔
178
            // after `position`, the last sequence number that was successfully read.
4✔
179
            this.storage.truncate(position);
4✔
180
        }
4✔
181
    }
128✔
182

4✔
183
    /**
4✔
184
     * @private
4✔
185
     * @param {string} name The full stream name, including the `stream-` prefix (and optional `.closed` suffix).
4✔
186
     */
4✔
187
    registerStream(name) {
4✔
188
        /* istanbul ignore if */
1,076✔
189
        if (!name.startsWith('stream-')) {
1,076!
190
            return;
×
191
        }
×
192
        let streamName = name.slice(7);
1,076✔
193
        // Detect the `.closed` suffix — present both in the initial scan and when the directory
1,076✔
194
        // watcher emits 'index-created' after a writer renames the file (e.g. 'stream-foo-bar.closed').
1,076✔
195
        let isClosed = false;
1,076✔
196
        if (streamName.endsWith('.closed')) {
1,076✔
197
            streamName = streamName.slice(0, -7);
8✔
198
            isClosed = true;
8✔
199
        }
8✔
200
        if (streamName in this.streams) {
1,076✔
201
            if (isClosed && !this.streams[streamName].closed) {
44✔
202
                // The stream was renamed to .closed while this instance had it open.
4✔
203
                // The old ReadOnlyIndex was already closed via onRename, so we open the new one.
4✔
204
                const closedIndexName = 'stream-' + streamName + '.closed';
4✔
205
                const closedIndex = this.storage.openReadonlyIndex(closedIndexName);
4✔
206
                // deepcode ignore PrototypePollutionFunctionParams: streams is a Map
4✔
207
                this.streams[streamName] = { index: closedIndex, closed: true };
4✔
208
                this.emit('stream-closed', streamName);
4✔
209
            }
4✔
210
            return;
44✔
211
        }
44✔
212
        const index = isClosed
1,032✔
213
            ? this.storage.openReadonlyIndex(name)
1,076✔
214
            : this.storage.openIndex(name);
1,076✔
215
        // deepcode ignore PrototypePollutionFunctionParams: streams is a Map
1,076✔
216
        this.streams[streamName] = { index, closed: isClosed };
1,076✔
217
        this.emit('stream-available', streamName);
1,076✔
218
    }
1,076✔
219

4✔
220
    /**
4✔
221
     * Close the event store and free up all resources.
4✔
222
     * Stops all registered consumers before closing storage.
4✔
223
     *
4✔
224
     * @api
4✔
225
     */
4✔
226
    close() {
4✔
227
        for (const consumer of this.consumers.values()) {
648✔
228
            consumer.stop();
24✔
229
        }
24✔
230
        this.consumers.clear();
648✔
231
        this.storage.close();
648✔
232
    }
648✔
233

4✔
234
    /**
4✔
235
     * Override EventEmitter.on() to delegate 'preCommit' and 'preRead' event registrations
4✔
236
     * to the underlying storage, so that `eventstore.on('preCommit', handler)` works naturally.
4✔
237
     * All other events are handled by the default EventEmitter.
4✔
238
     *
4✔
239
     * @param {string} event
4✔
240
     * @param {function} listener
4✔
241
     * @returns {this}
4✔
242
     */
4✔
243
    on(event, listener) {
4✔
244
        if (this.isStorageHookEvent(event)) {
208✔
245
            this.delegateStorageHookEvent('on', event, listener);
76✔
246
            return this;
76✔
247
        }
76✔
248
        return super.on(event, listener);
132✔
249
    }
208✔
250

4✔
251
    /**
4✔
252
     * @inheritDoc
4✔
253
     */
4✔
254
    addListener(event, listener) {
4✔
255
        return this.on(event, listener);
4✔
256
    }
4✔
257

4✔
258
    /**
4✔
259
     * Override EventEmitter.once() to delegate 'preCommit' and 'preRead' to the underlying storage.
4✔
260
     *
4✔
261
     * @param {string} event
4✔
262
     * @param {function} listener
4✔
263
     * @returns {this}
4✔
264
     */
4✔
265
    once(event, listener) {
4✔
266
        if (this.isStorageHookEvent(event)) {
12✔
267
            this.delegateStorageHookEvent('once', event, listener);
8✔
268
            return this;
8✔
269
        }
8✔
270
        return super.once(event, listener);
4✔
271
    }
12✔
272

4✔
273
    /**
4✔
274
     * Override EventEmitter.off() / removeListener() to delegate 'preCommit' and 'preRead'
4✔
275
     * to the underlying storage.
4✔
276
     *
4✔
277
     * @param {string} event
4✔
278
     * @param {function} listener
4✔
279
     * @returns {this}
4✔
280
     */
4✔
281
    off(event, listener) {
4✔
282
        if (this.isStorageHookEvent(event)) {
24✔
283
            this.storage.off(event, listener);
12✔
284
            return this;
12✔
285
        }
12✔
286
        return super.off(event, listener);
12✔
287
    }
24✔
288

4✔
289
    isStorageHookEvent(event) {
4✔
290
        return STORAGE_HOOK_EVENTS.has(event);
244✔
291
    }
244✔
292

4✔
293
    delegateStorageHookEvent(method, event, listener) {
4✔
294
        if (event === 'preCommit') {
84✔
295
            assert(!(this.storage instanceof ReadOnlyStorage), 'The storage was opened in read-only mode. Can not register a preCommit handler on it.');
44✔
296
        }
44✔
297
        this.storage[method](event, listener);
76✔
298
    }
84✔
299

4✔
300
    /**
4✔
301
     * @inheritDoc
4✔
302
     */
4✔
303
    removeListener(event, listener) {
4✔
304
        return this.off(event, listener);
12✔
305
    }
12✔
306

4✔
307
    /**
4✔
308
     * Convenience method to register a handler called before an event is committed to storage.
4✔
309
     * Equivalent to `eventstore.on('preCommit', hook)`.
4✔
310
     * The handler receives `(event, partitionMetadata)` and may throw to abort the write.
4✔
311
     * Multiple handlers can be registered; all run on every write in registration order.
4✔
312
     * The handler is invoked on every write, so its logic should be cheap, fast, and synchronous.
4✔
313
     *
4✔
314
     * @api
4✔
315
     * @param {function(object, object): void} hook A function receiving (event, partitionMetadata).
4✔
316
     * @throws {Error} If the storage was opened in read-only mode.
4✔
317
     */
4✔
318
    preCommit(hook) {
4✔
319
        this.on('preCommit', hook);
20✔
320
    }
20✔
321

4✔
322
    /**
4✔
323
     * Convenience method to register a handler called before an event is read from storage.
4✔
324
     * Equivalent to `eventstore.on('preRead', hook)`.
4✔
325
     * The handler receives `(position, partitionMetadata)` and may throw to abort the read.
4✔
326
     * Multiple handlers can be registered; all run on every read in registration order.
4✔
327
     * The handler is invoked on every read, so its logic should be cheap, fast, and synchronous.
4✔
328
     *
4✔
329
     * @api
4✔
330
     * @param {function(number, object): void} hook A function receiving (position, partitionMetadata).
4✔
331
     */
4✔
332
    preRead(hook) {
4✔
333
        this.on('preRead', hook);
12✔
334
    }
12✔
335

4✔
336
    /**
4✔
337
     * Get the number of events stored.
4✔
338
     *
4✔
339
     * @api
4✔
340
     * @returns {number}
4✔
341
     */
4✔
342
    get length() {
4✔
343
        return this.storage.length;
1,472✔
344
    }
1,472✔
345

4✔
346
    /**
4✔
347
     * This method makes it so the last three arguments can be given either as:
4✔
348
     *  - expectedVersion, metadata, callback
4✔
349
     *  - expectedVersion, callback
4✔
350
     *  - metadata, callback
4✔
351
     *  - callback
4✔
352
     *
4✔
353
     * @private
4✔
354
     * @param {Array<object>|object} events
4✔
355
     * @param {number|CommitCondition} [expectedVersion]
4✔
356
     * @param {object|function} [metadata]
4✔
357
     * @param {function} [callback]
4✔
358
     * @returns {{events: Array<object>, metadata: object, callback: function, expectedVersion: number|CommitCondition}}
4✔
359
     */
4✔
360
    static fixArgumentTypes(events, expectedVersion, metadata, callback) {
4✔
361
        if (!(events instanceof Array)) {
1,236✔
362
            events = [events];
116✔
363
        }
116✔
364
        if (typeof expectedVersion !== 'number' && !(expectedVersion instanceof CommitCondition)) {
1,236✔
365
            callback = metadata;
352✔
366
            metadata = expectedVersion;
352✔
367
            expectedVersion = ExpectedVersion.Any;
352✔
368
        }
352✔
369
        if (typeof metadata !== 'object') {
1,236✔
370
            callback = metadata;
360✔
371
            metadata = {};
360✔
372
        }
360✔
373
        if (typeof callback !== 'function') {
1,236✔
374
            callback = () => {};
864✔
375
        }
864✔
376
        return { events, expectedVersion, metadata, callback };
1,236✔
377
    }
1,236✔
378

4✔
379
    /**
4✔
380
     * Check a {@link CommitCondition} against the current state of the store.
4✔
381
     * Iterates a join stream over all condition type streams starting from
4✔
382
     * `condition.noneMatchAfter` (the global position captured at query time), and throws an
4✔
383
     * {@link OptimisticConcurrencyError} when a new event of a listed type satisfies
4✔
384
     * `condition.matcher(payload, metadata)` (or any such event when no matcher is provided).
4✔
385
     *
4✔
386
     * @param {CommitCondition} condition
4✔
387
     * @throws {OptimisticConcurrencyError}
4✔
388
     */
4✔
389
    checkCondition(condition) {
4✔
390
        if (this.storage.length <= condition.noneMatchAfter) return; // no new events since condition was obtained
28✔
391

16✔
392
        const existingTypes = condition.types.filter(t => t in this.streams);
16✔
393
        if (existingTypes.length === 0) return;
28!
394

16✔
395
        // Only events after condition.noneMatchAfter can be conflicts.
16✔
396
        // Pass the original matcher and raw flag so the stream filters at the source.
16✔
397
        const stream = this.fromStreams(
16✔
398
            '_check_' + condition.types.join('_'),
16✔
399
            existingTypes,
16✔
400
            condition.noneMatchAfter + 1,
16✔
401
            -1,
16✔
402
            condition.matcher,
16✔
403
            condition.raw
16✔
404
        );
16✔
405

16✔
406
        assert(stream.next() === false, `Optimistic Concurrency error. A conflicting event was committed since the condition was obtained.`, OptimisticConcurrencyError);
16✔
407
    }
28✔
408

4✔
409
    /**
4✔
410
     * Ensure a dedicated type stream exists for each event's type, creating it if needed.
4✔
411
     * Must be called before the entity stream is created to guarantee correct index routing.
4✔
412
     *
4✔
413
     * @param {Array<object>} events The events to process.
4✔
414
     */
4✔
415
    ensureTypeStreams(events) {
4✔
416
        if (!this.typeAccessor) return;
1,224✔
417
        for (const event of events) {
1,224✔
418
            const type = this.resolveValidatedTypeStreamName(event);
148✔
419
            if (type && !(type in this.streams)) {
148✔
420
                const matcher = this.typeMatcherFn
112✔
421
                    ? this.typeMatcherFn(type)
112✔
422
                    : (doc) => this.typeAccessor(doc.payload) === type;
112✔
423
                this.createEventStream(type, matcher, false);
112✔
424
            }
112✔
425
        }
148✔
426
    }
1,224✔
427

4✔
428
    resolveValidatedTypeStreamName(event) {
4✔
429
        const type = this.typeAccessor(event);
148✔
430
        if (type === undefined || type === null || type === '') {
148✔
431
            return null;
8✔
432
        }
8✔
433
        assert(typeof type === 'string', 'typeAccessor must return a string.');
140✔
434
        assert(STREAM_NAME_PATTERN.test(type), `typeAccessor must return a valid stream name. Got: "${type}"`);
140✔
435
        return type;
140✔
436
    }
148✔
437

4✔
438
    getExistingQueryTypes(types) {
4✔
439
        const queryTypes = [];
100✔
440
        for (const type of types) {
100✔
441
            if (type in this.streams) {
112✔
442
                queryTypes.push(type);
60✔
443
                continue;
60✔
444
            }
60✔
445
            if (!this.typeAccessor) {
112✔
446
                throw new Error(`Type stream "${type}" does not exist. Create it with createEventStream() first, or configure typeAccessor to have type streams created automatically on commit.`);
4✔
447
            }
4✔
448
        }
112✔
449
        return queryTypes;
96✔
450
    }
100✔
451

4✔
452
    /**
4✔
453
     * Commit a list of events for the given stream name, which is expected to be at the given version.
4✔
454
     * Note that the events committed may still appear in other streams too - the given stream name is only
4✔
455
     * relevant for optimistic concurrency checks with the given expected version.
4✔
456
     *
4✔
457
     * @api
4✔
458
     * @param {string} streamName The name of the stream to commit the events to.
4✔
459
     * @param {Array<object>|object} events The events to commit or a single event.
4✔
460
     * @param {number|CommitCondition} [expectedVersion] One of the `ExpectedVersion` constants, a positive
4✔
461
     *   stream version number, or a {@link CommitCondition} obtained from {@link EventStore#query}.
4✔
462
     * @param {object} [metadata] The commit metadata to use as base. Useful for replication and adding storage metadata.
4✔
463
     * @param {function} [callback] A function that will be executed when all events have been committed.
4✔
464
     * @throws {OptimisticConcurrencyError} if the stream is not at the expected version, or if a
4✔
465
     *   {@link CommitCondition} was provided and conflicting events have been committed since it was obtained.
4✔
466
     */
4✔
467
    commit(streamName, events, expectedVersion = ExpectedVersion.Any, metadata = {}, callback = null) {
4✔
468
        assert(!(this.storage instanceof ReadOnlyStorage), 'The storage was opened in read-only mode. Can not commit to it.');
1,248✔
469
        assert(typeof streamName === 'string' && streamName !== '', 'Must specify a stream name for commit.');
1,248✔
470
        assert(typeof events !== 'undefined' && events !== null, 'No events specified for commit.');
1,248✔
471

1,248✔
472
        ({ events, expectedVersion, metadata, callback } = EventStore.fixArgumentTypes(events, expectedVersion, metadata, callback));
1,248✔
473

1,248✔
474
        // Perform DCB-style concurrency check when a CommitCondition is provided.
1,248✔
475
        if (expectedVersion instanceof CommitCondition) {
1,248✔
476
            this.checkCondition(expectedVersion);
28✔
477
            expectedVersion = ExpectedVersion.Any;
28✔
478
        }
28✔
479

1,224✔
480
        // When typeAccessor is configured, ensure a dedicated type stream exists for each event
1,224✔
481
        // before the entity stream write so the type stream index is never incomplete.
1,224✔
482
        this.ensureTypeStreams(events);
1,224✔
483

1,224✔
484
        if (!(streamName in this.streams)) {
1,248✔
485
            this.createEventStream(streamName, { stream: streamName }, false);
768✔
486
        }
768✔
487
        assert(!this.streams[streamName].closed, `Stream "${streamName}" is closed and cannot be written to.`);
1,216✔
488
        let streamVersion = this.streams[streamName].index.length;
1,216✔
489
        assert(expectedVersion === ExpectedVersion.Any || streamVersion === expectedVersion,
1,248✔
490
            `Optimistic Concurrency error. Expected stream "${streamName}" at version ${expectedVersion} but is at version ${streamVersion}.`,
1,248✔
491
            OptimisticConcurrencyError
1,248✔
492
        );
1,248✔
493

1,248✔
494
        if (events.length > 1) {
1,248✔
495
            delete metadata.commitVersion;
68✔
496
        }
68✔
497

1,196✔
498
        const commitId = this.length;
1,196✔
499
        let commitVersion = 0;
1,196✔
500
        const commitSize = events.length;
1,196✔
501
        const committedAt = Date.now();
1,196✔
502
        const commit = Object.assign({
1,196✔
503
            commitId,
1,196✔
504
            committedAt
1,196✔
505
        }, metadata, {
1,196✔
506
            streamName,
1,196✔
507
            streamVersion,
1,196✔
508
            events: []
1,196✔
509
        });
1,196✔
510
        const commitCallback = () => {
1,196✔
511
            this.emit('commit', commit);
1,192✔
512
            callback(commit);
1,192✔
513
        };
1,196✔
514
        for (let event of events) {
1,248✔
515
            const eventMetadata = Object.assign({ commitId, committedAt, commitVersion, commitSize }, metadata, { streamVersion });
1,292✔
516
            const storedEvent = { stream: streamName, payload: event, metadata: eventMetadata };
1,292✔
517
            commitVersion++;
1,292✔
518
            streamVersion++;
1,292✔
519
            commit.events.push(event);
1,292✔
520
            this.storage.write(storedEvent, commitVersion !== events.length ? undefined : commitCallback);
1,292✔
521
        }
1,292✔
522
    }
1,248✔
523

4✔
524
    /**
4✔
525
     * @api
4✔
526
     * @param {string} streamName The name of the stream to get the version for.
4✔
527
     * @returns {number} The version that the given stream is at currently, or -1 if the stream does not exist.
4✔
528
     */
4✔
529
    getStreamVersion(streamName) {
4✔
530
        if (!(streamName in this.streams)) {
116✔
531
            return -1;
20✔
532
        }
20✔
533
        return this.streams[streamName].index.length;
96✔
534
    }
116✔
535

4✔
536
    /**
4✔
537
     * Query the event store for events matching a set of event types and an optional filter function.
4✔
538
     * Returns a pre-filtered event stream and a {@link CommitCondition} that can be passed to
4✔
539
     * {@link EventStore#commit} to enforce optimistic concurrency.
4✔
540
     *
4✔
541
     * A conflict occurs when at least one event appended between the `query` call and the `commit` call
4✔
542
     * belongs to one of the listed types and (when `matcher` is provided) also satisfies
4✔
543
     * `matcher(payload, metadata)`.  Events written before the `query` call are never treated as conflicts.
4✔
544
     *
4✔
545
     * **Behaviour when a type stream does not exist:**
4✔
546
     * - Without `typeAccessor` configured: throws an error, because the store cannot guarantee that no
4✔
547
     *   events of that type exist (the stream was never created).  Create the stream explicitly first,
4✔
548
     *   or configure `typeAccessor` to have streams created automatically on commit.
4✔
549
     * - With `typeAccessor` configured: treats the missing stream as empty (0-length).  The stream will
4✔
550
     *   be created automatically the first time an event of that type is committed.
4✔
551
     *
4✔
552
     * @api
4✔
553
     * @param {string[]} types A non-empty array of event-type names to query.
4✔
554
     * @param {function|object|null} [matcher] Optional matcher used for stream pre-filtering.
4✔
555
     *   In object mode, function predicates receive `(payload, metadata)`.
4✔
556
     * @param {number} [minRevision=1] The 1-based minimum global revision to include in the returned stream (inclusive).
4✔
557
     * @param {boolean} [raw=false] If true, return NDJSON buffers from the query stream.
4✔
558
     * @returns {{ condition: CommitCondition, stream: EventStream }} An object with:
4✔
559
     *   - `condition` — the {@link CommitCondition} to pass to {@link EventStore#commit}.
4✔
560
     *   - `stream` — a read-only event stream containing all matching events.
4✔
561
     * @throws {Error} if `types` is not a non-empty array.
4✔
562
     * @throws {Error} if `typeAccessor` is not configured and any of the listed type streams do not exist.
4✔
563
     */
4✔
564
    query(types, matcher = null, minRevision = 1, raw = false) {
4✔
565
        assert(Array.isArray(types) && types.length > 0, 'Must specify a non-empty array of event types for query.');
108✔
566
        const queryTypes = this.getExistingQueryTypes(types);
108✔
567
        const condition = new CommitCondition(types, matcher, this.storage.length, raw);
108✔
568
        const stream = this.fromStreams('_query_' + types.join('_'), queryTypes, minRevision, -1, matcher, raw);
108✔
569
        return { stream, condition };
108✔
570
    }
108✔
571

4✔
572
    /**
4✔
573
     * Get an event stream for the given stream name within the revision boundaries.
4✔
574
     *
4✔
575
     * @api
4✔
576
     * @param {string} streamName The name of the stream to get.
4✔
577
     * @param {number} [minRevision=1] The 1-based minimum revision to include in the events (inclusive).
4✔
578
     * @param {number} [maxRevision=-1] The 1-based maximum revision to include in the events (inclusive).
4✔
579
     * @param {function|object|null} [predicate] Optional matcher (see {@link EventStream}).
4✔
580
     * @param {boolean} [raw=false] If true, return NDJSON buffers.
4✔
581
     * @returns {EventStream|boolean} The event stream or false if a stream with the name doesn't exist.
4✔
582
     */
4✔
583
    getEventStream(streamName, minRevision = 1, maxRevision = -1, predicate = null, raw = false) {
4✔
584
        ({ predicate, raw } = normalizePredicateRaw(predicate, raw));
204✔
585
        if (!(streamName in this.streams)) {
204✔
586
            return false;
4✔
587
        }
4✔
588
        return new EventStream(streamName, this, minRevision, maxRevision, predicate, raw);
200✔
589
    }
204✔
590

4✔
591
    /**
4✔
592
     * Get a stream for all events within the revision boundaries.
4✔
593
     * This is the same as `getEventStream('_all', ...)`.
4✔
594
     *
4✔
595
     * @api
4✔
596
     * @param {number} [minRevision=1] The 1-based minimum revision to include in the events (inclusive).
4✔
597
     * @param {number} [maxRevision=-1] The 1-based maximum revision to include in the events (inclusive).
4✔
598
     * @param {function|object|null} [predicate] Optional matcher (see {@link EventStream}).
4✔
599
     * @param {boolean} [raw=false] If true, return NDJSON buffers.
4✔
600
     * @returns {EventStream} The event stream.
4✔
601
     */
4✔
602
    getAllEvents(minRevision = 1, maxRevision = -1, predicate = null, raw = false) {
4✔
603
        ({ predicate, raw } = normalizePredicateRaw(predicate, raw));
12✔
604
        return this.getEventStream('_all', minRevision, maxRevision, predicate, raw);
12✔
605
    }
12✔
606

4✔
607
    /**
4✔
608
     * Create a virtual event stream from existing streams by joining them.
4✔
609
     *
4✔
610
     * @param {string} streamName The (transient) name of the joined stream.
4✔
611
     * @param {Array<string>} streamNames An array of the stream names to join.
4✔
612
     * @param {number} [minRevision=1] The 1-based minimum revision to include in the events (inclusive).
4✔
613
     * @param {number} [maxRevision=-1] The 1-based maximum revision to include in the events (inclusive).
4✔
614
     * @param {function|object|null} [predicate] Optional matcher (see {@link EventStream}).
4✔
615
     * @param {boolean} [raw=false] If true, return NDJSON buffers.
4✔
616
     * @returns {EventStream} The joined event stream.
4✔
617
     * @throws {Error} if any of the streams doesn't exist.
4✔
618
     */
4✔
619
    fromStreams(streamName, streamNames, minRevision = 1, maxRevision = -1, predicate = null, raw = false) {
4✔
620
        ({ predicate, raw } = normalizePredicateRaw(predicate, raw));
172✔
621
        assert(streamNames instanceof Array, 'Must specify an array of stream names.');
172✔
622

172✔
623
        if (streamNames.length === 0) {
172✔
624
            return new EventStream(streamName, this);
44✔
625
        }
44✔
626

120✔
627
        for (let stream of streamNames) {
172✔
628
            assert(stream in this.streams, `Stream "${stream}" does not exist.`);
288✔
629
        }
288✔
630

116✔
631
        if (streamNames.length === 1) {
172✔
632
            const stream = new EventStream(streamNames[0], this, minRevision, maxRevision, predicate, raw);
60✔
633
            stream.name = streamName;
60✔
634
            return stream;
60✔
635
        }
60✔
636

56✔
637
        return new JoinEventStream(streamName, streamNames, this, minRevision, maxRevision, predicate, raw);
56✔
638
    }
172✔
639

4✔
640
    /**
4✔
641
     * Get a stream for a category of streams. This will effectively return a joined stream of all streams that start
4✔
642
     * with the given `categoryName` followed by a dash (flat layout, e.g. `users-123`) or a slash (hierarchical
4✔
643
     * layout, e.g. `users/123`).
4✔
644
     * If you frequently use this for a category consisting of a lot of streams (e.g. `users`), consider creating a
4✔
645
     * dedicated physical stream for the category:
4✔
646
     *
4✔
647
     *    `eventstore.createEventStream('users', e => e.stream.startsWith('users-') || e.stream.startsWith('users/'))`
4✔
648
     *
4✔
649
     * @api
4✔
650
     * @param {string} categoryName The name of the category to get a stream for. A category is a stream name prefix.
4✔
651
     * @param {number} [minRevision=1] The 1-based minimum revision to include in the events (inclusive).
4✔
652
     * @param {number} [maxRevision=-1] The 1-based maximum revision to include in the events (inclusive).
4✔
653
     * @param {function|object|null} [predicate] Optional matcher (see {@link EventStream}).
4✔
654
     * @param {boolean} [raw=false] If true, return NDJSON buffers.
4✔
655
     * @returns {EventStream} The joined event stream for all streams of the given category.
4✔
656
     * @throws {Error} If no stream for this category exists.
4✔
657
     */
4✔
658
    getEventStreamForCategory(categoryName, minRevision = 1, maxRevision = -1, predicate = null, raw = false) {
4✔
659
        ({ predicate, raw } = normalizePredicateRaw(predicate, raw));
40✔
660
        if (categoryName in this.streams) {
40✔
661
            return this.getEventStream(categoryName, minRevision, maxRevision, predicate, raw);
4✔
662
        }
4✔
663
        const categoryStreams = Object.keys(this.streams).filter(streamName =>
36✔
664
            streamName.startsWith(categoryName + '-') ||
224✔
665
            streamName.startsWith(categoryName + '/')
128✔
666
        );
36✔
667

36✔
668
        assert(categoryStreams.length > 0, `No streams for category '${categoryName}' exist.`);
36✔
669

36✔
670
        return this.fromStreams(categoryName, categoryStreams, minRevision, maxRevision, predicate, raw);
36✔
671
    }
40✔
672

4✔
673
    /**
4✔
674
     * Create a new stream with the given matcher.
4✔
675
     *
4✔
676
     * @api
4✔
677
     * @param {string} streamName The name of the stream to create.
4✔
678
     * @param {object|function(event)} matcher A matcher object, denoting the properties that need to match on an event a function that takes the event and returns true if the event should be added.
4✔
679
     * @param {boolean} [reindex=true] Whether to scan existing documents and populate the new index. Set to false when it is known that no existing documents can match the matcher (e.g. when creating a brand-new write stream).
4✔
680
     * @returns {EventStream} The EventStream with all existing events matching the matcher.
4✔
681
     * @throws {Error} If a stream with that name already exists.
4✔
682
     * @throws {Error} If the stream could not be created.
4✔
683
     */
4✔
684
    createEventStream(streamName, matcher, reindex = true) {
4✔
685
        assert(!(this.storage instanceof ReadOnlyStorage), 'The storage was opened in read-only mode. Can not create new stream on it.');
964✔
686
        assert(!(streamName in this.streams), 'Can not recreate stream!');
964✔
687

964✔
688
        const streamIndexName = 'stream-' + streamName;
964✔
689
        if (streamName.includes('/')) {
964✔
690
            const subDir = path.join(this.streamsDirectory, this.storeName + '.stream-' + path.dirname(streamName));
88✔
691
            ensureDirectory(subDir);
88✔
692
        }
88✔
693
        const index = this.storage.ensureIndex(streamIndexName, matcher, reindex);
948✔
694
        assert(index !== null, `Error creating stream index ${streamName}.`);
948✔
695

948✔
696
        // deepcode ignore PrototypePollutionFunctionParams: streams is a Map
948✔
697
        this.streams[streamName] = { index, matcher };
948✔
698
        this.emit('stream-created', streamName);
948✔
699
        return new EventStream(streamName, this);
948✔
700
    }
964✔
701

4✔
702
    /**
4✔
703
     * Delete an event stream. Will do nothing if the stream with the name doesn't exist.
4✔
704
     *
4✔
705
     * Note that you can delete a write stream, but that will not delete the events written to it.
4✔
706
     * Also, on next write, that stream will be rebuilt from all existing events, which might take some time.
4✔
707
     *
4✔
708
     * @api
4✔
709
     * @param {string} streamName The name of the stream to delete.
4✔
710
     * @returns void
4✔
711
     */
4✔
712
    deleteEventStream(streamName) {
4✔
713
        assert(!(this.storage instanceof ReadOnlyStorage), 'The storage was opened in read-only mode. Can not delete a stream on it.');
12✔
714

12✔
715
        if (!(streamName in this.streams)) {
12✔
716
            return;
4✔
717
        }
4✔
718
        this.streams[streamName].index.destroy();
4✔
719
        delete this.streams[streamName];
4✔
720
        this.emit('stream-deleted', streamName);
4✔
721
    }
12✔
722

4✔
723
    /**
4✔
724
     * Close a stream so that no new events are indexed into it.
4✔
725
     * The stream will still be readable, but any attempt to write to it will throw an error.
4✔
726
     * A closed stream is persisted by renaming its index file to include a `.closed` marker
4✔
727
     * (e.g. `stream-X.closed.index`), so it will be recognized as closed when the store is reopened.
4✔
728
     *
4✔
729
     * @api
4✔
730
     * @param {string} streamName The name of the stream to close.
4✔
731
     * @returns void
4✔
732
     * @throws {Error} If the storage is read-only.
4✔
733
     * @throws {Error} If the stream does not exist.
4✔
734
     * @throws {Error} If the stream is already closed.
4✔
735
     */
4✔
736
    closeEventStream(streamName) {
4✔
737
        assert(!(this.storage instanceof ReadOnlyStorage), 'The storage was opened in read-only mode. Can not close a stream on it.');
48✔
738
        assert(streamName in this.streams, `Stream "${streamName}" does not exist.`);
48✔
739
        assert(!this.streams[streamName].closed, `Stream "${streamName}" is already closed.`);
48✔
740

48✔
741
        const indexName = 'stream-' + streamName;
48✔
742
        const { index } = this.streams[streamName];
48✔
743

48✔
744
        // Flush and close the index before renaming the file
48✔
745
        index.close();
48✔
746

48✔
747
        // Rename the index file to mark it as closed (e.g. stream-foo.index -> stream-foo.closed.index)
48✔
748
        const closedFileName = index.fileName.replace(/\.index$/, '.closed.index');
48✔
749
        fs.renameSync(index.fileName, closedFileName);
48✔
750

48✔
751
        // Remove from secondary indexes so that new writes are no longer indexed into this stream
48✔
752
        this.storage.removeSecondaryIndex(indexName);
48✔
753

48✔
754
        // Reopen the renamed index for read access, outside the secondary indexes write path
48✔
755
        const closedIndexName = indexName + '.closed';
48✔
756
        const closedIndex = this.storage.openReadonlyIndex(closedIndexName);
48✔
757

48✔
758
        // deepcode ignore PrototypePollutionFunctionParams: streams is a Map
48✔
759
        this.streams[streamName] = { index: closedIndex, closed: true };
48✔
760
        this.emit('stream-closed', streamName);
48✔
761
    }
48✔
762

4✔
763
    /**
4✔
764
     * Get a durable consumer for the given stream, or look up an existing consumer by identifier.
4✔
765
     *
4✔
766
     * When called with a single argument, returns the running consumer registered under that
4✔
767
     * identifier, or `null` if none is found — useful for read endpoints that need the live
4✔
768
     * in-memory instance without creating a new one.
4✔
769
     *
4✔
770
     * When called with two or more arguments, creates (or re-uses) a Consumer for the given
4✔
771
     * stream and identifier, registers it in `this.consumers`, and returns it.
4✔
772
     *
4✔
773
     * @param {string} streamNameOrIdentifier The stream name, or the consumer identifier when used as a registry lookup.
4✔
774
     * @param {string} [identifier] The unique identifying name of this consumer. Omit for registry-only lookup.
4✔
775
     * @param {object} [initialState] The initial state of the consumer.
4✔
776
     * @param {number} [since] The stream revision to start consuming from.
4✔
777
     * @returns {Consumer|null} A durable consumer, or `null` when looking up by identifier and none is registered.
4✔
778
     */
4✔
779
    getConsumer(streamNameOrIdentifier, identifier, initialState = {}, since = 0) {
4✔
780
        if (identifier === undefined) {
28!
781
            return this.consumers.get(streamNameOrIdentifier) ?? null;
×
782
        }
×
783
        const streamName = streamNameOrIdentifier;
28✔
784
        if (this.consumers.has(identifier)) {
28✔
785
            const existingConsumer = this.consumers.get(identifier);
4✔
786
            if (existingConsumer.streamName === streamName) {
4!
787
                return existingConsumer;
×
788
            }
×
789
            // Rebind identifier to the requested stream when a consumer with the same
4✔
790
            // identifier already exists for another stream.
4✔
791
            existingConsumer.stop();
4✔
792
        }
4✔
793
        const consumer = new Consumer(this.storage, streamName === '_all' ? '_all' : 'stream-' + streamName, identifier, initialState, since);
28✔
794
        consumer.streamName = streamName;
28✔
795
        this.consumers.set(identifier, consumer);
28✔
796
        return consumer;
28✔
797
    }
28✔
798

4✔
799
    /**
4✔
800
     * Scan the existing consumers on this EventStore and asynchronously invoke a callback with the parsed list.
4✔
801
     *
4✔
802
     * Each consumer entry provides `{ name, stream, identifier }` parsed from the on-disk filename.
4✔
803
     * Pass `autoStart = true` to eagerly open every discovered consumer and register it in
4✔
804
     * `this.consumers` so that it is immediately available for registry lookups.
4✔
805
     *
4✔
806
     * @param {function(error: Error|null, consumers: Array<{name: string, stream: string, identifier: string}>)} callback
4✔
807
     * @param {boolean} [autoStart=false] When true, calls `getConsumer(stream, identifier)` for each discovered consumer.
4✔
808
     */
4✔
809
    scanConsumers(callback, autoStart = false) {
4✔
810
        const consumersPath = path.join(this.storage.indexDirectory, 'consumers');
8✔
811
        if (!fs.existsSync(consumersPath)) {
8✔
812
            callback(null, []);
4✔
813
            return;
4✔
814
        }
4✔
815
        const regex = new RegExp(`^${this.storage.storageFile}\\.([^.]*\\..*)$`);
4✔
816
        const consumerNames = [];
4✔
817
        scanForFiles(consumersPath, regex, consumerNames.push.bind(consumerNames), /* istanbul ignore next */ (err) => {
4✔
818
            if (err) {
4!
819
                return callback(err, []);
×
820
            }
×
821
            const consumers = consumerNames.map(name => {
4✔
822
                const splitIndex = name.lastIndexOf('.');
8✔
823
                const indexName = name.slice(0, splitIndex);
8✔
824
                const identifier = name.slice(splitIndex + 1);
8✔
825
                const stream = parseStreamFromIndexName(indexName);
8✔
826
                return { name, stream, identifier };
8✔
827
            });
4✔
828
            if (autoStart) {
4!
829
                for (const { stream, identifier } of consumers) {
×
830
                    this.getConsumer(stream, identifier);
×
831
                }
×
832
            }
×
833
            callback(null, consumers);
4✔
834
        });
4✔
835
    }
8✔
836
}
4✔
837

4✔
838
function parseStreamFromIndexName(indexName) {
8✔
839
    if (indexName === '_all') {
8✔
840
        return '_all';
4✔
841
    }
4✔
842
    if (indexName.startsWith('stream-')) {
4✔
843
        return indexName.slice(7);
4✔
844
    }
4✔
845
    return indexName;
×
846
}
8✔
847

4✔
848
function normalizePredicateRaw(predicate, raw) {
428✔
849
    if (typeof predicate === 'boolean' && raw === false) {
428✔
850
        return { predicate: null, raw: predicate };
16✔
851
    }
16✔
852
    return { predicate, raw };
412✔
853
}
428✔
854

4✔
855
EventStore.Storage = Storage;
4✔
856
EventStore.Index = Index;
4✔
857

4✔
858
export default EventStore;
4✔
859
export { ExpectedVersion, OptimisticConcurrencyError, CommitCondition, LOCK_THROW, LOCK_RECLAIM };
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc