• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

albe / node-event-storage / 26331781152

23 May 2026 11:43AM UTC coverage: 98.028% (-0.08%) from 98.106%
26331781152

Pull #316

github

web-flow
Merge 88893fee7 into 03ae7bb7f
Pull Request #316: Add `event-storage-http` REST API layer with NDJSON reads, consumer endpoints, and raw buffer streaming

1188 of 1244 branches covered (95.5%)

Branch coverage included in aggregate %.

582 of 604 new or added lines in 11 files covered. (96.36%)

10 existing lines in 2 files now uncovered.

5771 of 5855 relevant lines covered (98.57%)

841.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.69
/src/EventStore.js
1
import EventStream from './EventStream.js';
4✔
2
import JoinEventStream from './JoinEventStream.js';
4✔
3
import fs from 'fs';
4✔
4
import path from 'path';
4✔
5
import events from 'events';
4✔
6
import Storage, { ReadOnly as ReadOnlyStorage, LOCK_THROW, LOCK_RECLAIM } from './Storage.js';
4✔
7
import Index from './Index.js';
4✔
8
import Consumer from './Consumer.js';
4✔
9
import { assert, getPropertyAtPath } from './util.js';
4✔
10
import { ensureDirectory, scanForFiles } from './fsUtil.js';
4✔
11
import { buildTypeMatcherFn } from './metadataUtil.js';
4✔
12

4✔
13
const ExpectedVersion = {
4✔
14
    Any: -1,
4✔
15
    EmptyStream: 0
4✔
16
};
4✔
17

4✔
18
/**
4✔
19
 * Default matcher property paths mirroring the Storage default, used for index optimization.
4✔
20
 */
4✔
21
const DEFAULT_MATCHER_PROPERTIES = ['stream', 'payload.type'];
4✔
22
const STREAM_NAME_PATTERN = /^[A-Za-z0-9][A-Za-z0-9_]*(?:[\/.-][A-Za-z0-9][A-Za-z0-9_]*)*$/;
4✔
23

4✔
24
class OptimisticConcurrencyError extends Error {}
4✔
25

4✔
26
/**
4✔
27
 * An accept condition that captures the global event-log position at the time a {@link EventStore#query}
4✔
28
 * call was made.  Pass it as the `expectedVersion` argument to {@link EventStore#commit} to enforce
4✔
29
 * DCB-style (Dynamic Consistency Boundary) optimistic concurrency: the commit is rejected only when
4✔
30
 * one or more events that match the original query (types + optional matcher) have been appended to
4✔
31
 * the store between the `query` call and the `commit` call.
4✔
32
 *
4✔
33
 * @property {string[]} types   The event types included in the query.
4✔
34
 * @property {function(object, object): boolean|null} matcher An optional function `(payload, metadata) => boolean`
4✔
35
 *   used to narrow the conflict check.  When `null`, any new event of a listed type causes a conflict.
4✔
36
 * @property {number}   noneMatchAfter The global store length (total event count) at the time the query was made.
4✔
37
 */
4✔
38
class CommitCondition {
4✔
39
    /**
4✔
40
     * @param {string[]} types
4✔
41
     * @param {function(object, object): boolean|object|null} [matcher]
4✔
42
     * @param {number}   noneMatchAfter
4✔
43
     * @param {boolean}  [raw=false]
4✔
44
     */
4✔
45
    constructor(types, matcher = null, noneMatchAfter, raw = false) {
4✔
46
        this.types = types;
96✔
47
        this.matcher = matcher;
96✔
48
        this.raw = raw;
96✔
49
        this.noneMatchAfter = noneMatchAfter;
96✔
50
    }
96✔
51
}
4✔
52

4✔
53
/**
4✔
54
 * An event store optimized for working with many streams.
4✔
55
 * An event stream is implemented as an iterator over an index on the storage, therefore indexes need to be lightweight
4✔
56
 * and highly performant in read-only mode.
4✔
57
 */
4✔
58
class EventStore extends events.EventEmitter {
4✔
59

4✔
60
    /**
4✔
61
     * @param {string} [storeName] The name of the store which will be used as storage prefix. Default 'eventstore'.
4✔
62
     * @param {object} [config] An object with config options.
4✔
63
     * @param {string} [config.storageDirectory] The directory where the data should be stored. Default './data'.
4✔
64
     * @param {string} [config.streamsDirectory] The directory where the streams should be stored. Default '{storageDirectory}/streams'.
4✔
65
     * @param {object} [config.storageConfig] Additional config options given to the storage backend. See `Storage`.
4✔
66
     * @param {boolean} [config.readOnly] If the storage should be mounted in read-only mode.
4✔
67
     * @param {string|function(object): string} [config.typeAccessor] Dot-notation path (e.g. `'type'`) or
4✔
68
     *   function `(event) => string` identifying the event type. Enables type-based queries via
4✔
69
     *   {@link EventStore#query} and ensures proper index routing for those queries.
4✔
70
     * @param {object|function(string): object} [config.streamMetadata] A metadata object or a function `(streamName) => object`
4✔
71
     *   that is called whenever a new stream partition is created. The returned object is stored once in the partition
4✔
72
     *   file header and surfaced to `preCommit` / `preRead` hooks. Takes precedence only when
4✔
73
     *   `config.storageConfig.metadata` is not also set.
4✔
74
     */
4✔
75
    constructor(storeName = 'eventstore', config = {}) {
4✔
76
        super();
636✔
77
        if (typeof storeName !== 'string') {
636✔
78
            config = storeName;
632✔
79
            storeName = 'eventstore';
632✔
80
        }
632✔
81

636✔
82
        if (typeof config.typeAccessor === 'string' && config.typeAccessor) {
636✔
83
            const accessorPath = config.typeAccessor;
24✔
84
            this.typeAccessor = (event) => getPropertyAtPath(event, accessorPath);
24✔
85
            this.typeMatcherFn = buildTypeMatcherFn(accessorPath);
24✔
86
        } else {
636✔
87
            this.typeAccessor = typeof config.typeAccessor === 'function' ? config.typeAccessor : null;
612✔
88
            this.typeMatcherFn = null;
612✔
89
        }
612✔
90

636✔
91
        this.storageDirectory = path.resolve(config.storageDirectory || /* istanbul ignore next */ './data');
636!
92
        let defaults = {
636✔
93
            dataDirectory: this.storageDirectory,
636✔
94
            indexDirectory: config.streamsDirectory || path.join(this.storageDirectory, 'streams'),
636✔
95
            partitioner: (event) => event.stream,
636✔
96
            readOnly: config.readOnly || false
636✔
97
        };
636✔
98
        const storageConfig = Object.assign(defaults, config.storageConfig);
636✔
99

636✔
100
        // When typeAccessor is a string path, ensure the corresponding full document path
636✔
101
        // (payload.<path>) is present in matcherProperties so the IndexMatcher discriminant
636✔
102
        // table can route type-stream lookups in O(1) on every write.
636✔
103
        if (this.typeMatcherFn) {
636✔
104
            const fullPath = `payload.${config.typeAccessor}`;
24✔
105
            const currentProps = storageConfig.matcherProperties || DEFAULT_MATCHER_PROPERTIES;
24✔
106
            if (!currentProps.includes(fullPath)) {
24✔
107
                storageConfig.matcherProperties = [...currentProps, fullPath];
8✔
108
            }
8✔
109
        }
24✔
110

636✔
111
        // Translate the high-level streamMetadata option into the storage-level metadata function,
636✔
112
        // but only when the caller has not already provided a lower-level storageConfig.metadata.
636✔
113
        if (config.streamMetadata !== undefined && storageConfig.metadata === undefined) {
636✔
114
            if (typeof config.streamMetadata === 'function') {
76✔
115
                storageConfig.metadata = config.streamMetadata;
4✔
116
            } else {
76✔
117
                storageConfig.metadata = (streamName) => config.streamMetadata[streamName] || {};
72✔
118
            }
72✔
119
        }
76✔
120

636✔
121
        this.initialize(storeName, storageConfig);
636✔
122
    }
636✔
123

4✔
124
    /**
4✔
125
     * @private
4✔
126
     * @param {string} storeName
4✔
127
     * @param {object} storageConfig
4✔
128
     */
4✔
129
    initialize(storeName, storageConfig) {
4✔
130
        this.streamsDirectory = path.resolve(storageConfig.indexDirectory);
636✔
131

636✔
132
        this.storeName = storeName;
636✔
133
        this.storage = (storageConfig.readOnly === true) ?
636✔
134
                        new ReadOnlyStorage(storeName, storageConfig)
44✔
135
                        : new Storage(storeName, storageConfig);
636✔
136
        this.streams = Object.create(null);
636✔
137
        this.streams._all = { index: this.storage.index };
636✔
138

636✔
139
        this.storage.on('index-created', this.registerStream.bind(this));
636✔
140

636✔
141
        this.storage.on('opened', () => {
636✔
142
            this.checkUnfinishedCommits();
128✔
143
            this.emit('ready');
128✔
144
        });
636✔
145

636✔
146
        this.storage.open();
636✔
147
    }
636✔
148

4✔
149
    /**
4✔
150
     * Check if the last commit in the store was unfinished, which is the case if not all events of the commit have been written.
4✔
151
     * Torn writes are handled at the storage level, so this method only deals with unfinished commits.
4✔
152
     * @private
4✔
153
     */
4✔
154
    checkUnfinishedCommits() {
4✔
155
        let position = this.storage.length;
128✔
156
        let lastEvent;
128✔
157
        let truncateIndex = false;
128✔
158
        while (position > 0) {
128✔
159
            try {
80✔
160
                lastEvent = this.storage.read(position);
80✔
161
            } catch (e) {
80!
UNCOV
162
                // A preRead hook may throw (e.g. access control). Stop repair check.
×
163
                return;
×
164
            }
×
165
            if (lastEvent !== false) break;
80✔
166
            truncateIndex = true;
16✔
167
            position--;
16✔
168
        }
16✔
169

128✔
170
        if (lastEvent && lastEvent.metadata.commitSize && lastEvent.metadata.commitVersion !== lastEvent.metadata.commitSize - 1) {
128✔
171
            this.emit('unfinished-commit', lastEvent);
8✔
172
            // commitId = global sequence number at which the commit started
8✔
173
            this.storage.truncate(lastEvent.metadata.commitId);
8✔
174
        } else if (truncateIndex) {
128✔
175
            // The index contained items that are not in the storage file; truncate everything
4✔
176
            // after `position`, the last sequence number that was successfully read.
4✔
177
            this.storage.truncate(position);
4✔
178
        }
4✔
179
    }
128✔
180

4✔
181
    /**
4✔
182
     * @private
4✔
183
     * @param {string} name The full stream name, including the `stream-` prefix (and optional `.closed` suffix).
4✔
184
     */
4✔
185
    registerStream(name) {
4✔
186
        /* istanbul ignore if */
1,040✔
187
        if (!name.startsWith('stream-')) {
1,040!
UNCOV
188
            return;
×
189
        }
×
190
        let streamName = name.slice(7);
1,040✔
191
        // Detect the `.closed` suffix — present both in the initial scan and when the directory
1,040✔
192
        // watcher emits 'index-created' after a writer renames the file (e.g. 'stream-foo-bar.closed').
1,040✔
193
        let isClosed = false;
1,040✔
194
        if (streamName.endsWith('.closed')) {
1,040✔
195
            streamName = streamName.slice(0, -7);
8✔
196
            isClosed = true;
8✔
197
        }
8✔
198
        if (streamName in this.streams) {
1,040✔
199
            if (isClosed && !this.streams[streamName].closed) {
44✔
200
                // The stream was renamed to .closed while this instance had it open.
4✔
201
                // The old ReadOnlyIndex was already closed via onRename, so we open the new one.
4✔
202
                const closedIndexName = 'stream-' + streamName + '.closed';
4✔
203
                const closedIndex = this.storage.openReadonlyIndex(closedIndexName);
4✔
204
                // deepcode ignore PrototypePollutionFunctionParams: streams is a Map
4✔
205
                this.streams[streamName] = { index: closedIndex, closed: true };
4✔
206
                this.emit('stream-closed', streamName);
4✔
207
            }
4✔
208
            return;
44✔
209
        }
44✔
210
        const index = isClosed
996✔
211
            ? this.storage.openReadonlyIndex(name)
1,040✔
212
            : this.storage.openIndex(name);
1,040✔
213
        // deepcode ignore PrototypePollutionFunctionParams: streams is a Map
1,040✔
214
        this.streams[streamName] = { index, closed: isClosed };
1,040✔
215
        this.emit('stream-available', streamName);
1,040✔
216
    }
1,040✔
217

4✔
218
    /**
4✔
219
     * Close the event store and free up all resources.
4✔
220
     *
4✔
221
     * @api
4✔
222
     */
4✔
223
    close() {
4✔
224
        this.storage.close();
628✔
225
    }
628✔
226

4✔
227
    /**
4✔
228
     * Override EventEmitter.on() to delegate 'preCommit' and 'preRead' event registrations
4✔
229
     * to the underlying storage, so that `eventstore.on('preCommit', handler)` works naturally.
4✔
230
     * All other events are handled by the default EventEmitter.
4✔
231
     *
4✔
232
     * @param {string} event
4✔
233
     * @param {function} listener
4✔
234
     * @returns {this}
4✔
235
     */
4✔
236
    on(event, listener) {
4✔
237
        if (event === 'preCommit' || event === 'preRead') {
208✔
238
            if (event === 'preCommit') {
76✔
239
                assert(!(this.storage instanceof ReadOnlyStorage), 'The storage was opened in read-only mode. Can not register a preCommit handler on it.');
40✔
240
            }
40✔
241
            this.storage.on(event, listener);
68✔
242
            return this;
68✔
243
        }
68✔
244
        return super.on(event, listener);
132✔
245
    }
208✔
246

4✔
247
    /**
4✔
248
     * @inheritDoc
4✔
249
     */
4✔
250
    addListener(event, listener) {
4✔
251
        return this.on(event, listener);
4✔
252
    }
4✔
253

4✔
254
    /**
4✔
255
     * Override EventEmitter.once() to delegate 'preCommit' and 'preRead' to the underlying storage.
4✔
256
     *
4✔
257
     * @param {string} event
4✔
258
     * @param {function} listener
4✔
259
     * @returns {this}
4✔
260
     */
4✔
261
    once(event, listener) {
4✔
262
        if (event === 'preCommit' || event === 'preRead') {
12✔
263
            if (event === 'preCommit') {
8✔
264
                assert(!(this.storage instanceof ReadOnlyStorage), 'The storage was opened in read-only mode. Can not register a preCommit handler on it.');
4✔
265
            }
4✔
266
            this.storage.once(event, listener);
8✔
267
            return this;
8✔
268
        }
8✔
269
        return super.once(event, listener);
4✔
270
    }
12✔
271

4✔
272
    /**
4✔
273
     * Override EventEmitter.off() / removeListener() to delegate 'preCommit' and 'preRead'
4✔
274
     * to the underlying storage.
4✔
275
     *
4✔
276
     * @param {string} event
4✔
277
     * @param {function} listener
4✔
278
     * @returns {this}
4✔
279
     */
4✔
280
    off(event, listener) {
4✔
281
        if (event === 'preCommit' || event === 'preRead') {
24✔
282
            this.storage.off(event, listener);
12✔
283
            return this;
12✔
284
        }
12✔
285
        return super.off(event, listener);
12✔
286
    }
24✔
287

4✔
288
    /**
4✔
289
     * @inheritDoc
4✔
290
     */
4✔
291
    removeListener(event, listener) {
4✔
292
        return this.off(event, listener);
12✔
293
    }
12✔
294

4✔
295
    /**
4✔
296
     * Convenience method to register a handler called before an event is committed to storage.
4✔
297
     * Equivalent to `eventstore.on('preCommit', hook)`.
4✔
298
     * The handler receives `(event, partitionMetadata)` and may throw to abort the write.
4✔
299
     * Multiple handlers can be registered; all run on every write in registration order.
4✔
300
     * The handler is invoked on every write, so its logic should be cheap, fast, and synchronous.
4✔
301
     *
4✔
302
     * @api
4✔
303
     * @param {function(object, object): void} hook A function receiving (event, partitionMetadata).
4✔
304
     * @throws {Error} If the storage was opened in read-only mode.
4✔
305
     */
4✔
306
    preCommit(hook) {
4✔
307
        this.on('preCommit', hook);
20✔
308
    }
20✔
309

4✔
310
    /**
4✔
311
     * Convenience method to register a handler called before an event is read from storage.
4✔
312
     * Equivalent to `eventstore.on('preRead', hook)`.
4✔
313
     * The handler receives `(position, partitionMetadata)` and may throw to abort the read.
4✔
314
     * Multiple handlers can be registered; all run on every read in registration order.
4✔
315
     * The handler is invoked on every read, so its logic should be cheap, fast, and synchronous.
4✔
316
     *
4✔
317
     * @api
4✔
318
     * @param {function(number, object): void} hook A function receiving (position, partitionMetadata).
4✔
319
     */
4✔
320
    preRead(hook) {
4✔
321
        this.on('preRead', hook);
12✔
322
    }
12✔
323

4✔
324
    /**
4✔
325
     * Get the number of events stored.
4✔
326
     *
4✔
327
     * @api
4✔
328
     * @returns {number}
4✔
329
     */
4✔
330
    get length() {
4✔
331
        return this.storage.length;
1,424✔
332
    }
1,424✔
333

4✔
334
    /**
4✔
335
     * This method makes it so the last three arguments can be given either as:
4✔
336
     *  - expectedVersion, metadata, callback
4✔
337
     *  - expectedVersion, callback
4✔
338
     *  - metadata, callback
4✔
339
     *  - callback
4✔
340
     *
4✔
341
     * @private
4✔
342
     * @param {Array<object>|object} events
4✔
343
     * @param {number|CommitCondition} [expectedVersion]
4✔
344
     * @param {object|function} [metadata]
4✔
345
     * @param {function} [callback]
4✔
346
     * @returns {{events: Array<object>, metadata: object, callback: function, expectedVersion: number|CommitCondition}}
4✔
347
     */
4✔
348
    static fixArgumentTypes(events, expectedVersion, metadata, callback) {
4✔
349
        if (!(events instanceof Array)) {
1,204✔
350
            events = [events];
100✔
351
        }
100✔
352
        if (typeof expectedVersion !== 'number' && !(expectedVersion instanceof CommitCondition)) {
1,204✔
353
            callback = metadata;
336✔
354
            metadata = expectedVersion;
336✔
355
            expectedVersion = ExpectedVersion.Any;
336✔
356
        }
336✔
357
        if (typeof metadata !== 'object') {
1,204✔
358
            callback = metadata;
344✔
359
            metadata = {};
344✔
360
        }
344✔
361
        if (typeof callback !== 'function') {
1,204✔
362
            callback = () => {};
848✔
363
        }
848✔
364
        return { events, expectedVersion, metadata, callback };
1,204✔
365
    }
1,204✔
366

4✔
367
    /**
4✔
368
     * Check a {@link CommitCondition} against the current state of the store.
4✔
369
     * Iterates a join stream over all condition type streams starting from
4✔
370
     * `condition.noneMatchAfter` (the global position captured at query time), and throws an
4✔
371
     * {@link OptimisticConcurrencyError} when a new event of a listed type satisfies
4✔
372
     * `condition.matcher(payload, metadata)` (or any such event when no matcher is provided).
4✔
373
     *
4✔
374
     * @param {CommitCondition} condition
4✔
375
     * @throws {OptimisticConcurrencyError}
4✔
376
     */
4✔
377
    checkCondition(condition) {
4✔
378
        if (this.storage.length <= condition.noneMatchAfter) return; // no new events since condition was obtained
28✔
379

16✔
380
        const existingTypes = condition.types.filter(t => t in this.streams);
16✔
381
        if (existingTypes.length === 0) return;
28!
382

16✔
383
        // Only events after condition.noneMatchAfter can be conflicts.
16✔
384
        // Pass the original matcher and raw flag so the stream filters at the source.
16✔
385
        const stream = this.fromStreams(
16✔
386
            '_check_' + condition.types.join('_'),
16✔
387
            existingTypes,
16✔
388
            condition.noneMatchAfter + 1,
16✔
389
            -1,
16✔
390
            condition.matcher,
16✔
391
            condition.raw
16✔
392
        );
16✔
393

16✔
394
        if (stream.next() !== false) {
28✔
395
            throw new OptimisticConcurrencyError(
12✔
396
                `Optimistic Concurrency error. A conflicting event was committed since the condition was obtained.`
12✔
397
            );
12✔
398
        }
12✔
399
    }
28✔
400

4✔
401
    /**
4✔
402
     * Ensure a dedicated type stream exists for each event's type, creating it if needed.
4✔
403
     * Must be called before the entity stream is created to guarantee correct index routing.
4✔
404
     *
4✔
405
     * @param {Array<object>} events The events to process.
4✔
406
     */
4✔
407
    ensureTypeStreams(events) {
4✔
408
        if (!this.typeAccessor) return;
1,192✔
409
        for (const event of events) {
1,192✔
410
            const type = this.resolveValidatedTypeStreamName(event);
148✔
411
            if (type && !(type in this.streams)) {
148✔
412
                const matcher = this.typeMatcherFn
112✔
413
                    ? this.typeMatcherFn(type)
112✔
414
                    : (doc) => this.typeAccessor(doc.payload) === type;
112✔
415
                this.createEventStream(type, matcher, false);
112✔
416
            }
112✔
417
        }
148✔
418
    }
1,192✔
419

4✔
420
    resolveValidatedTypeStreamName(event) {
4✔
421
        const type = this.typeAccessor(event);
148✔
422
        if (type === undefined || type === null || type === '') {
148✔
423
            return null;
8✔
424
        }
8✔
425
        assert(typeof type === 'string', 'typeAccessor must return a string.');
140✔
426
        assert(STREAM_NAME_PATTERN.test(type), 'typeAccessor must return a valid stream name.');
140✔
427
        return type;
140✔
428
    }
148✔
429

4✔
430
    /**
4✔
431
     * Commit a list of events for the given stream name, which is expected to be at the given version.
4✔
432
     * Note that the events committed may still appear in other streams too - the given stream name is only
4✔
433
     * relevant for optimistic concurrency checks with the given expected version.
4✔
434
     *
4✔
435
     * @api
4✔
436
     * @param {string} streamName The name of the stream to commit the events to.
4✔
437
     * @param {Array<object>|object} events The events to commit or a single event.
4✔
438
     * @param {number|CommitCondition} [expectedVersion] One of the `ExpectedVersion` constants, a positive
4✔
439
     *   stream version number, or a {@link CommitCondition} obtained from {@link EventStore#query}.
4✔
440
     * @param {object} [metadata] The commit metadata to use as base. Useful for replication and adding storage metadata.
4✔
441
     * @param {function} [callback] A function that will be executed when all events have been committed.
4✔
442
     * @throws {OptimisticConcurrencyError} if the stream is not at the expected version, or if a
4✔
443
     *   {@link CommitCondition} was provided and conflicting events have been committed since it was obtained.
4✔
444
     */
4✔
445
    commit(streamName, events, expectedVersion = ExpectedVersion.Any, metadata = {}, callback = null) {
4✔
446
        assert(!(this.storage instanceof ReadOnlyStorage), 'The storage was opened in read-only mode. Can not commit to it.');
1,216✔
447
        assert(typeof streamName === 'string' && streamName !== '', 'Must specify a stream name for commit.');
1,216✔
448
        assert(typeof events !== 'undefined' && events !== null, 'No events specified for commit.');
1,216✔
449

1,216✔
450
        ({ events, expectedVersion, metadata, callback } = EventStore.fixArgumentTypes(events, expectedVersion, metadata, callback));
1,216✔
451

1,216✔
452
        // Perform DCB-style concurrency check when a CommitCondition is provided.
1,216✔
453
        if (expectedVersion instanceof CommitCondition) {
1,216✔
454
            this.checkCondition(expectedVersion);
28✔
455
            expectedVersion = ExpectedVersion.Any;
28✔
456
        }
28✔
457

1,192✔
458
        // When typeAccessor is configured, ensure a dedicated type stream exists for each event
1,192✔
459
        // before the entity stream write so the type stream index is never incomplete.
1,192✔
460
        this.ensureTypeStreams(events);
1,192✔
461

1,192✔
462
        if (!(streamName in this.streams)) {
1,216✔
463
            this.createEventStream(streamName, { stream: streamName }, false);
736✔
464
        }
736✔
465
        assert(!this.streams[streamName].closed, `Stream "${streamName}" is closed and cannot be written to.`);
1,184✔
466
        let streamVersion = this.streams[streamName].index.length;
1,184✔
467
        if (expectedVersion !== ExpectedVersion.Any && streamVersion !== expectedVersion) {
1,216✔
468
            throw new OptimisticConcurrencyError(`Optimistic Concurrency error. Expected stream "${streamName}" at version ${expectedVersion} but is at version ${streamVersion}.`);
12✔
469
        }
12✔
470

1,164✔
471
        if (events.length > 1) {
1,216✔
472
            delete metadata.commitVersion;
68✔
473
        }
68✔
474

1,164✔
475
        const commitId = this.length;
1,164✔
476
        let commitVersion = 0;
1,164✔
477
        const commitSize = events.length;
1,164✔
478
        const committedAt = Date.now();
1,164✔
479
        const commit = Object.assign({
1,164✔
480
            commitId,
1,164✔
481
            committedAt
1,164✔
482
        }, metadata, {
1,164✔
483
            streamName,
1,164✔
484
            streamVersion,
1,164✔
485
            events: []
1,164✔
486
        });
1,164✔
487
        const commitCallback = () => {
1,164✔
488
            this.emit('commit', commit);
1,160✔
489
            callback(commit);
1,160✔
490
        };
1,164✔
491
        for (let event of events) {
1,216✔
492
            const eventMetadata = Object.assign({ commitId, committedAt, commitVersion, commitSize }, metadata, { streamVersion });
1,260✔
493
            const storedEvent = { stream: streamName, payload: event, metadata: eventMetadata };
1,260✔
494
            commitVersion++;
1,260✔
495
            streamVersion++;
1,260✔
496
            commit.events.push(event);
1,260✔
497
            this.storage.write(storedEvent, commitVersion !== events.length ? undefined : commitCallback);
1,260✔
498
        }
1,260✔
499
    }
1,216✔
500

4✔
501
    /**
4✔
502
     * @api
4✔
503
     * @param {string} streamName The name of the stream to get the version for.
4✔
504
     * @returns {number} The version that the given stream is at currently, or -1 if the stream does not exist.
4✔
505
     */
4✔
506
    getStreamVersion(streamName) {
4✔
507
        if (!(streamName in this.streams)) {
116✔
508
            return -1;
20✔
509
        }
20✔
510
        return this.streams[streamName].index.length;
96✔
511
    }
116✔
512

4✔
513
    /**
4✔
514
     * Query the event store for events matching a set of event types and an optional filter function.
4✔
515
     * Returns a pre-filtered event stream and a {@link CommitCondition} that can be passed to
4✔
516
     * {@link EventStore#commit} to enforce optimistic concurrency.
4✔
517
     *
4✔
518
     * A conflict occurs when at least one event appended between the `query` call and the `commit` call
4✔
519
     * belongs to one of the listed types and (when `matcher` is provided) also satisfies
4✔
520
     * `matcher(payload, metadata)`.  Events written before the `query` call are never treated as conflicts.
4✔
521
     *
4✔
522
     * **Behaviour when a type stream does not exist:**
4✔
523
     * - Without `typeAccessor` configured: throws an error, because the store cannot guarantee that no
4✔
524
     *   events of that type exist (the stream was never created).  Create the stream explicitly first,
4✔
525
     *   or configure `typeAccessor` to have streams created automatically on commit.
4✔
526
     * - With `typeAccessor` configured: treats the missing stream as empty (0-length).  The stream will
4✔
527
     *   be created automatically the first time an event of that type is committed.
4✔
528
     *
4✔
529
     * @api
4✔
530
     * @param {string[]} types A non-empty array of event-type names to query.
4✔
531
     * @param {function|object|null} [matcher] Optional matcher used for stream pre-filtering.
4✔
532
     *   In object mode, function predicates receive `(payload, metadata)`.
4✔
533
     * @param {number} [minRevision=1] The 1-based minimum global revision to include in the returned stream (inclusive).
4✔
534
     * @param {boolean} [raw=false] If true, return NDJSON buffers from the query stream.
4✔
535
     * @returns {{ condition: CommitCondition, stream: EventStream }} An object with:
4✔
536
     *   - `condition` — the {@link CommitCondition} to pass to {@link EventStore#commit}.
4✔
537
     *   - `stream` — a read-only event stream containing all matching events.
4✔
538
     * @throws {Error} if `types` is not a non-empty array.
4✔
539
     * @throws {Error} if `typeAccessor` is not configured and any of the listed type streams do not exist.
4✔
540
     */
4✔
541
    query(types, matcher = null, minRevision = 1, raw = false) {
4✔
542
        assert(Array.isArray(types) && types.length > 0, 'Must specify a non-empty array of event types for query.');
108✔
543

108✔
544
        const queryTypes = [];
108✔
545
        for (const type of types) {
108✔
546
            if (!(type in this.streams)) {
112✔
547
                if (this.typeAccessor) {
52✔
548
                    // typeAccessor is configured: type streams are created on commit, so a missing
48✔
549
                    // stream simply means no event of this type has been committed yet — treat as empty.
48✔
550
                    continue;
48✔
551
                }
48✔
552
                // No typeAccessor: the stream was never created; we cannot know whether events of
4✔
553
                // this type exist in the store, so throw to avoid an unintentional full-store scan.
4✔
554
                throw new Error(`Type stream "${type}" does not exist. Create it with createEventStream() first, or configure typeAccessor to have type streams created automatically on commit.`);
4✔
555
            }
4✔
556
            queryTypes.push(type);
60✔
557
        }
60✔
558

96✔
559
        const condition = new CommitCondition(types, matcher, this.storage.length, raw);
96✔
560
        const stream = this.fromStreams('_query_' + types.join('_'), queryTypes, minRevision, -1, matcher, raw);
96✔
561
        return { stream, condition };
96✔
562
    }
108✔
563

4✔
564
    /**
4✔
565
     * Get an event stream for the given stream name within the revision boundaries.
4✔
566
     *
4✔
567
     * @api
4✔
568
     * @param {string} streamName The name of the stream to get.
4✔
569
     * @param {number} [minRevision=1] The 1-based minimum revision to include in the events (inclusive).
4✔
570
     * @param {number} [maxRevision=-1] The 1-based maximum revision to include in the events (inclusive).
4✔
571
     * @param {function|object|null} [predicate] Optional matcher (see {@link EventStream}).
4✔
572
     * @param {boolean} [raw=false] If true, return NDJSON buffers.
4✔
573
     * @returns {EventStream|boolean} The event stream or false if a stream with the name doesn't exist.
4✔
574
     */
4✔
575
    getEventStream(streamName, minRevision = 1, maxRevision = -1, predicate = null, raw = false) {
4✔
576
        if (typeof predicate === 'boolean' && raw === false) {
196!
NEW
577
            raw = predicate;
×
NEW
578
            predicate = null;
×
NEW
UNCOV
579
        }
×
580
        if (!(streamName in this.streams)) {
196✔
581
            return false;
4✔
582
        }
4✔
583
        return new EventStream(streamName, this, minRevision, maxRevision, predicate, raw);
192✔
584
    }
196✔
585

4✔
586
    /**
4✔
587
     * Get a stream for all events within the revision boundaries.
4✔
588
     * This is the same as `getEventStream('_all', ...)`.
4✔
589
     *
4✔
590
     * @api
4✔
591
     * @param {number} [minRevision=1] The 1-based minimum revision to include in the events (inclusive).
4✔
592
     * @param {number} [maxRevision=-1] The 1-based maximum revision to include in the events (inclusive).
4✔
593
     * @param {function|object|null} [predicate] Optional matcher (see {@link EventStream}).
4✔
594
     * @param {boolean} [raw=false] If true, return NDJSON buffers.
4✔
595
     * @returns {EventStream} The event stream.
4✔
596
     */
4✔
597
    getAllEvents(minRevision = 1, maxRevision = -1, predicate = null, raw = false) {
4✔
598
        if (typeof predicate === 'boolean' && raw === false) {
8!
NEW
599
            raw = predicate;
×
NEW
600
            predicate = null;
×
NEW
UNCOV
601
        }
×
602
        return this.getEventStream('_all', minRevision, maxRevision, predicate, raw);
8✔
603
    }
8✔
604

4✔
605
    /**
4✔
606
     * Create a virtual event stream from existing streams by joining them.
4✔
607
     *
4✔
608
     * @param {string} streamName The (transient) name of the joined stream.
4✔
609
     * @param {Array<string>} streamNames An array of the stream names to join.
4✔
610
     * @param {number} [minRevision=1] The 1-based minimum revision to include in the events (inclusive).
4✔
611
     * @param {number} [maxRevision=-1] The 1-based maximum revision to include in the events (inclusive).
4✔
612
     * @param {function|object|null} [predicate] Optional matcher (see {@link EventStream}).
4✔
613
     * @param {boolean} [raw=false] If true, return NDJSON buffers.
4✔
614
     * @returns {EventStream} The joined event stream.
4✔
615
     * @throws {Error} if any of the streams doesn't exist.
4✔
616
     */
4✔
617
    fromStreams(streamName, streamNames, minRevision = 1, maxRevision = -1, predicate = null, raw = false) {
4✔
618
        if (typeof predicate === 'boolean' && raw === false) {
164!
NEW
619
            raw = predicate;
×
NEW
UNCOV
620
            predicate = null;
×
NEW
UNCOV
621
        }
×
622
        assert(streamNames instanceof Array, 'Must specify an array of stream names.');
164✔
623

164✔
624
        if (streamNames.length === 0) {
164✔
625
            return new EventStream(streamName, this);
44✔
626
        }
44✔
627

112✔
628
        for (let stream of streamNames) {
164✔
629
            assert(stream in this.streams, `Stream "${stream}" does not exist.`);
272✔
630
        }
272✔
631

108✔
632
        if (streamNames.length === 1) {
164✔
633
            const stream = new EventStream(streamNames[0], this, minRevision, maxRevision, predicate, raw);
60✔
634
            stream.name = streamName;
60✔
635
            return stream;
60✔
636
        }
60✔
637

48✔
638
        return new JoinEventStream(streamName, streamNames, this, minRevision, maxRevision, predicate, raw);
48✔
639
    }
164✔
640

4✔
641
    /**
4✔
642
     * Get a stream for a category of streams. This will effectively return a joined stream of all streams that start
4✔
643
     * with the given `categoryName` followed by a dash (flat layout, e.g. `users-123`) or a slash (hierarchical
4✔
644
     * layout, e.g. `users/123`).
4✔
645
     * If you frequently use this for a category consisting of a lot of streams (e.g. `users`), consider creating a
4✔
646
     * dedicated physical stream for the category:
4✔
647
     *
4✔
648
     *    `eventstore.createEventStream('users', e => e.stream.startsWith('users-') || e.stream.startsWith('users/'))`
4✔
649
     *
4✔
650
     * @api
4✔
651
     * @param {string} categoryName The name of the category to get a stream for. A category is a stream name prefix.
4✔
652
     * @param {number} [minRevision=1] The 1-based minimum revision to include in the events (inclusive).
4✔
653
     * @param {number} [maxRevision=-1] The 1-based maximum revision to include in the events (inclusive).
4✔
654
     * @param {function|object|null} [predicate] Optional matcher (see {@link EventStream}).
4✔
655
     * @param {boolean} [raw=false] If true, return NDJSON buffers.
4✔
656
     * @returns {EventStream} The joined event stream for all streams of the given category.
4✔
657
     * @throws {Error} If no stream for this category exists.
4✔
658
     */
4✔
659
    getEventStreamForCategory(categoryName, minRevision = 1, maxRevision = -1, predicate = null, raw = false) {
4✔
660
        if (typeof predicate === 'boolean' && raw === false) {
36!
NEW
661
            raw = predicate;
×
NEW
UNCOV
662
            predicate = null;
×
NEW
UNCOV
663
        }
×
664
        if (categoryName in this.streams) {
36✔
665
            return this.getEventStream(categoryName, minRevision, maxRevision, predicate, raw);
4✔
666
        }
4✔
667
        const categoryStreams = Object.keys(this.streams).filter(streamName =>
32✔
668
            streamName.startsWith(categoryName + '-') ||
212✔
669
            streamName.startsWith(categoryName + '/')
124✔
670
        );
32✔
671

32✔
672
        if (categoryStreams.length === 0) {
36✔
673
            throw new Error(`No streams for category '${categoryName}' exist.`);
4✔
674
        }
4✔
675
        return this.fromStreams(categoryName, categoryStreams, minRevision, maxRevision, predicate, raw);
28✔
676
    }
36✔
677

4✔
678
    /**
4✔
679
     * Create a new stream with the given matcher.
4✔
680
     *
4✔
681
     * @api
4✔
682
     * @param {string} streamName The name of the stream to create.
4✔
683
     * @param {object|function(event)} matcher A matcher object, denoting the properties that need to match on an event a function that takes the event and returns true if the event should be added.
4✔
684
     * @param {boolean} [reindex=true] Whether to scan existing documents and populate the new index. Set to false when it is known that no existing documents can match the matcher (e.g. when creating a brand-new write stream).
4✔
685
     * @returns {EventStream} The EventStream with all existing events matching the matcher.
4✔
686
     * @throws {Error} If a stream with that name already exists.
4✔
687
     * @throws {Error} If the stream could not be created.
4✔
688
     */
4✔
689
    createEventStream(streamName, matcher, reindex = true) {
4✔
690
        assert(!(this.storage instanceof ReadOnlyStorage), 'The storage was opened in read-only mode. Can not create new stream on it.');
928✔
691
        assert(!(streamName in this.streams), 'Can not recreate stream!');
928✔
692

928✔
693
        const streamIndexName = 'stream-' + streamName;
928✔
694
        if (streamName.includes('/')) {
928✔
695
            const subDir = path.join(this.streamsDirectory, this.storeName + '.stream-' + path.dirname(streamName));
88✔
696
            ensureDirectory(subDir);
88✔
697
        }
88✔
698
        const index = this.storage.ensureIndex(streamIndexName, matcher, reindex);
912✔
699
        assert(index !== null, `Error creating stream index ${streamName}.`);
912✔
700

912✔
701
        // deepcode ignore PrototypePollutionFunctionParams: streams is a Map
912✔
702
        this.streams[streamName] = { index, matcher };
912✔
703
        this.emit('stream-created', streamName);
912✔
704
        return new EventStream(streamName, this);
912✔
705
    }
928✔
706

4✔
707
    /**
4✔
708
     * Delete an event stream. Will do nothing if the stream with the name doesn't exist.
4✔
709
     *
4✔
710
     * Note that you can delete a write stream, but that will not delete the events written to it.
4✔
711
     * Also, on next write, that stream will be rebuilt from all existing events, which might take some time.
4✔
712
     *
4✔
713
     * @api
4✔
714
     * @param {string} streamName The name of the stream to delete.
4✔
715
     * @returns void
4✔
716
     */
4✔
717
    deleteEventStream(streamName) {
4✔
718
        assert(!(this.storage instanceof ReadOnlyStorage), 'The storage was opened in read-only mode. Can not delete a stream on it.');
12✔
719

12✔
720
        if (!(streamName in this.streams)) {
12✔
721
            return;
4✔
722
        }
4✔
723
        this.streams[streamName].index.destroy();
4✔
724
        delete this.streams[streamName];
4✔
725
        this.emit('stream-deleted', streamName);
4✔
726
    }
12✔
727

4✔
728
    /**
4✔
729
     * Close a stream so that no new events are indexed into it.
4✔
730
     * The stream will still be readable, but any attempt to write to it will throw an error.
4✔
731
     * A closed stream is persisted by renaming its index file to include a `.closed` marker
4✔
732
     * (e.g. `stream-X.closed.index`), so it will be recognized as closed when the store is reopened.
4✔
733
     *
4✔
734
     * @api
4✔
735
     * @param {string} streamName The name of the stream to close.
4✔
736
     * @returns void
4✔
737
     * @throws {Error} If the storage is read-only.
4✔
738
     * @throws {Error} If the stream does not exist.
4✔
739
     * @throws {Error} If the stream is already closed.
4✔
740
     */
4✔
741
    closeEventStream(streamName) {
4✔
742
        assert(!(this.storage instanceof ReadOnlyStorage), 'The storage was opened in read-only mode. Can not close a stream on it.');
48✔
743
        assert(streamName in this.streams, `Stream "${streamName}" does not exist.`);
48✔
744
        assert(!this.streams[streamName].closed, `Stream "${streamName}" is already closed.`);
48✔
745

48✔
746
        const indexName = 'stream-' + streamName;
48✔
747
        const { index } = this.streams[streamName];
48✔
748

48✔
749
        // Flush and close the index before renaming the file
48✔
750
        index.close();
48✔
751

48✔
752
        // Rename the index file to mark it as closed (e.g. stream-foo.index -> stream-foo.closed.index)
48✔
753
        const closedFileName = index.fileName.replace(/\.index$/, '.closed.index');
48✔
754
        fs.renameSync(index.fileName, closedFileName);
48✔
755

48✔
756
        // Remove from secondary indexes so that new writes are no longer indexed into this stream
48✔
757
        this.storage.removeSecondaryIndex(indexName);
48✔
758

48✔
759
        // Reopen the renamed index for read access, outside the secondary indexes write path
48✔
760
        const closedIndexName = indexName + '.closed';
48✔
761
        const closedIndex = this.storage.openReadonlyIndex(closedIndexName);
48✔
762

48✔
763
        // deepcode ignore PrototypePollutionFunctionParams: streams is a Map
48✔
764
        this.streams[streamName] = { index: closedIndex, closed: true };
48✔
765
        this.emit('stream-closed', streamName);
48✔
766
    }
48✔
767

4✔
768
    /**
4✔
769
     * Get a durable consumer for the given stream that will keep receiving events from the last position.
4✔
770
     *
4✔
771
     * @param {string} streamName The name of the stream to consume.
4✔
772
     * @param {string} identifier The unique identifying name of this consumer.
4✔
773
     * @param {object} [initialState] The initial state of the consumer.
4✔
774
     * @param {number} [since] The stream revision to start consuming from.
4✔
775
     * @returns {Consumer} A durable consumer for the given stream.
4✔
776
     */
4✔
777
    getConsumer(streamName, identifier, initialState = {}, since = 0) {
4✔
778
        const consumer = new Consumer(this.storage, streamName === '_all' ? '_all' : 'stream-' + streamName, identifier, initialState, since);
20✔
779
        consumer.streamName = streamName;
20✔
780
        return consumer;
20✔
781
    }
20✔
782

4✔
783
    /**
4✔
784
     * Scan the existing consumers on this EventStore and asynchronously return a list of their names.
4✔
785
     * @param {function(error: Error, consumers: array)} callback A callback that will receive an error as first and the list of consumers as second argument.
4✔
786
     */
4✔
787
    scanConsumers(callback) {
4✔
788
        const consumersPath = path.join(this.storage.indexDirectory, 'consumers');
8✔
789
        if (!fs.existsSync(consumersPath)) {
8✔
790
            callback(null, []);
4✔
791
            return;
4✔
792
        }
4✔
793
        const regex = new RegExp(`^${this.storage.storageFile}\\.([^.]*\\..*)$`);
4✔
794
        const consumers = [];
4✔
795
        scanForFiles(consumersPath, regex, consumers.push.bind(consumers), /* istanbul ignore next */ (err) => {
4✔
796
            if (err) {
4!
797
                return callback(err, []);
×
UNCOV
798
            }
×
799
            callback(null, consumers);
4✔
800
        });
4✔
801
    }
8✔
802
}
4✔
803

4✔
804
EventStore.Storage = Storage;
4✔
805
EventStore.Index = Index;
4✔
806

4✔
807
export default EventStore;
4✔
808
export { ExpectedVersion, OptimisticConcurrencyError, CommitCondition, LOCK_THROW, LOCK_RECLAIM };
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc