• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

albe / node-event-storage / 26192646756

20 May 2026 10:04PM UTC coverage: 98.111% (+0.005%) from 98.106%
26192646756

Pull #316

github

web-flow
Merge a50d2c2e1 into 03ae7bb7f
Pull Request #316: Add `event-storage-http` REST API layer with NDJSON reads, consumer endpoints, and raw buffer streaming

1093 of 1141 branches covered (95.79%)

Branch coverage included in aggregate %.

352 of 359 new or added lines in 10 files covered. (98.05%)

11 existing lines in 2 files now uncovered.

5554 of 5634 relevant lines covered (98.58%)

830.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.72
/src/EventStore.js
1
import EventStream from './EventStream.js';
4✔
2
import JoinEventStream from './JoinEventStream.js';
4✔
3
import fs from 'fs';
4✔
4
import path from 'path';
4✔
5
import events from 'events';
4✔
6
import Storage, { ReadOnly as ReadOnlyStorage, LOCK_THROW, LOCK_RECLAIM } from './Storage.js';
4✔
7
import Index from './Index.js';
4✔
8
import Consumer from './Consumer.js';
4✔
9
import { assert, getPropertyAtPath } from './util.js';
4✔
10
import { ensureDirectory, scanForFiles } from './fsUtil.js';
4✔
11
import { buildTypeMatcherFn } from './metadataUtil.js';
4✔
12

4✔
13
const ExpectedVersion = {
4✔
14
    Any: -1,
4✔
15
    EmptyStream: 0
4✔
16
};
4✔
17

4✔
18
/**
4✔
19
 * Default matcher property paths mirroring the Storage default, used for index optimization.
4✔
20
 */
4✔
21
const DEFAULT_MATCHER_PROPERTIES = ['stream', 'payload.type'];
4✔
22
const STREAM_NAME_PATTERN = /^[A-Za-z0-9][A-Za-z0-9_]*(?:[\/.-][A-Za-z0-9][A-Za-z0-9_]*)*$/;
4✔
23

4✔
24
class OptimisticConcurrencyError extends Error {}
4✔
25

4✔
26
/**
4✔
27
 * An accept condition that captures the global event-log position at the time a {@link EventStore#query}
4✔
28
 * call was made.  Pass it as the `expectedVersion` argument to {@link EventStore#commit} to enforce
4✔
29
 * DCB-style (Dynamic Consistency Boundary) optimistic concurrency: the commit is rejected only when
4✔
30
 * one or more events that match the original query (types + optional matcher) have been appended to
4✔
31
 * the store between the `query` call and the `commit` call.
4✔
32
 *
4✔
33
 * @property {string[]} types   The event types included in the query.
4✔
34
 * @property {function(object, object): boolean|null} matcher An optional function `(payload, metadata) => boolean`
4✔
35
 *   used to narrow the conflict check.  When `null`, any new event of a listed type causes a conflict.
4✔
36
 * @property {number}   noneMatchAfter The global store length (total event count) at the time the query was made.
4✔
37
 */
4✔
38
class CommitCondition {
4✔
39
    /**
4✔
40
     * @param {string[]} types
4✔
41
     * @param {function(object, object): boolean|null} [matcher]
4✔
42
     * @param {number}   noneMatchAfter
4✔
43
     */
4✔
44
    constructor(types, matcher = null, noneMatchAfter) {
4✔
45
        this.types = types;
88✔
46
        this.matcher = matcher;
88✔
47
        this.noneMatchAfter = noneMatchAfter;
88✔
48
    }
88✔
49
}
4✔
50

4✔
51
/**
4✔
52
 * An event store optimized for working with many streams.
4✔
53
 * An event stream is implemented as an iterator over an index on the storage, therefore indexes need to be lightweight
4✔
54
 * and highly performant in read-only mode.
4✔
55
 */
4✔
56
class EventStore extends events.EventEmitter {
4✔
57

4✔
58
    /**
4✔
59
     * @param {string} [storeName] The name of the store which will be used as storage prefix. Default 'eventstore'.
4✔
60
     * @param {object} [config] An object with config options.
4✔
61
     * @param {string} [config.storageDirectory] The directory where the data should be stored. Default './data'.
4✔
62
     * @param {string} [config.streamsDirectory] The directory where the streams should be stored. Default '{storageDirectory}/streams'.
4✔
63
     * @param {object} [config.storageConfig] Additional config options given to the storage backend. See `Storage`.
4✔
64
     * @param {boolean} [config.readOnly] If the storage should be mounted in read-only mode.
4✔
65
     * @param {string|function(object): string} [config.typeAccessor] Dot-notation path (e.g. `'type'`) or
4✔
66
     *   function `(event) => string` identifying the event type. Enables type-based queries via
4✔
67
     *   {@link EventStore#query} and ensures proper index routing for those queries.
4✔
68
     * @param {object|function(string): object} [config.streamMetadata] A metadata object or a function `(streamName) => object`
4✔
69
     *   that is called whenever a new stream partition is created. The returned object is stored once in the partition
4✔
70
     *   file header and surfaced to `preCommit` / `preRead` hooks. Takes precedence only when
4✔
71
     *   `config.storageConfig.metadata` is not also set.
4✔
72
     */
4✔
73
    constructor(storeName = 'eventstore', config = {}) {
4✔
74
        super();
612✔
75
        if (typeof storeName !== 'string') {
612✔
76
            config = storeName;
608✔
77
            storeName = 'eventstore';
608✔
78
        }
608✔
79

612✔
80
        if (typeof config.typeAccessor === 'string' && config.typeAccessor) {
612✔
81
            const accessorPath = config.typeAccessor;
24✔
82
            this.typeAccessor = (event) => getPropertyAtPath(event, accessorPath);
24✔
83
            this.typeMatcherFn = buildTypeMatcherFn(accessorPath);
24✔
84
        } else {
612✔
85
            this.typeAccessor = typeof config.typeAccessor === 'function' ? config.typeAccessor : null;
588✔
86
            this.typeMatcherFn = null;
588✔
87
        }
588✔
88

612✔
89
        this.storageDirectory = path.resolve(config.storageDirectory || /* istanbul ignore next */ './data');
612!
90
        let defaults = {
612✔
91
            dataDirectory: this.storageDirectory,
612✔
92
            indexDirectory: config.streamsDirectory || path.join(this.storageDirectory, 'streams'),
612✔
93
            partitioner: (event) => event.stream,
612✔
94
            readOnly: config.readOnly || false
612✔
95
        };
612✔
96
        const storageConfig = Object.assign(defaults, config.storageConfig);
612✔
97

612✔
98
        // When typeAccessor is a string path, ensure the corresponding full document path
612✔
99
        // (payload.<path>) is present in matcherProperties so the IndexMatcher discriminant
612✔
100
        // table can route type-stream lookups in O(1) on every write.
612✔
101
        if (this.typeMatcherFn) {
612✔
102
            const fullPath = `payload.${config.typeAccessor}`;
24✔
103
            const currentProps = storageConfig.matcherProperties || DEFAULT_MATCHER_PROPERTIES;
24✔
104
            if (!currentProps.includes(fullPath)) {
24✔
105
                storageConfig.matcherProperties = [...currentProps, fullPath];
8✔
106
            }
8✔
107
        }
24✔
108

612✔
109
        // Translate the high-level streamMetadata option into the storage-level metadata function,
612✔
110
        // but only when the caller has not already provided a lower-level storageConfig.metadata.
612✔
111
        if (config.streamMetadata !== undefined && storageConfig.metadata === undefined) {
612✔
112
            if (typeof config.streamMetadata === 'function') {
76✔
113
                storageConfig.metadata = config.streamMetadata;
4✔
114
            } else {
76✔
115
                storageConfig.metadata = (streamName) => config.streamMetadata[streamName] || {};
72✔
116
            }
72✔
117
        }
76✔
118

612✔
119
        this.initialize(storeName, storageConfig);
612✔
120
    }
612✔
121

4✔
122
    /**
4✔
123
     * @private
4✔
124
     * @param {string} storeName
4✔
125
     * @param {object} storageConfig
4✔
126
     */
4✔
127
    initialize(storeName, storageConfig) {
4✔
128
        this.streamsDirectory = path.resolve(storageConfig.indexDirectory);
612✔
129

612✔
130
        this.storeName = storeName;
612✔
131
        this.storage = (storageConfig.readOnly === true) ?
612✔
132
                        new ReadOnlyStorage(storeName, storageConfig)
44✔
133
                        : new Storage(storeName, storageConfig);
612✔
134
        this.streams = Object.create(null);
612✔
135
        this.streams._all = { index: this.storage.index };
612✔
136

612✔
137
        this.storage.on('index-created', this.registerStream.bind(this));
612✔
138

612✔
139
        this.storage.on('opened', () => {
612✔
140
            this.checkUnfinishedCommits();
126✔
141
            this.emit('ready');
126✔
142
        });
612✔
143

612✔
144
        this.storage.open();
612✔
145
    }
612✔
146

4✔
147
    /**
4✔
148
     * Check if the last commit in the store was unfinished, which is the case if not all events of the commit have been written.
4✔
149
     * Torn writes are handled at the storage level, so this method only deals with unfinished commits.
4✔
150
     * @private
4✔
151
     */
4✔
152
    checkUnfinishedCommits() {
4✔
153
        let position = this.storage.length;
126✔
154
        let lastEvent;
126✔
155
        let truncateIndex = false;
126✔
156
        while (position > 0) {
126✔
157
            try {
78✔
158
                lastEvent = this.storage.read(position);
78✔
159
            } catch (e) {
78!
UNCOV
160
                // A preRead hook may throw (e.g. access control). Stop repair check.
×
UNCOV
161
                return;
×
162
            }
×
163
            if (lastEvent !== false) break;
78✔
164
            truncateIndex = true;
16✔
165
            position--;
16✔
166
        }
16✔
167

126✔
168
        if (lastEvent && lastEvent.metadata.commitSize && lastEvent.metadata.commitVersion !== lastEvent.metadata.commitSize - 1) {
126✔
169
            this.emit('unfinished-commit', lastEvent);
8✔
170
            // commitId = global sequence number at which the commit started
8✔
171
            this.storage.truncate(lastEvent.metadata.commitId);
8✔
172
        } else if (truncateIndex) {
126✔
173
            // The index contained items that are not in the storage file; truncate everything
4✔
174
            // after `position`, the last sequence number that was successfully read.
4✔
175
            this.storage.truncate(position);
4✔
176
        }
4✔
177
    }
126✔
178

4✔
179
    /**
4✔
180
     * @private
4✔
181
     * @param {string} name The full stream name, including the `stream-` prefix (and optional `.closed` suffix).
4✔
182
     */
4✔
183
    registerStream(name) {
4✔
184
        /* istanbul ignore if */
992✔
185
        if (!name.startsWith('stream-')) {
992!
UNCOV
186
            return;
×
UNCOV
187
        }
×
188
        let streamName = name.slice(7);
992✔
189
        // Detect the `.closed` suffix — present both in the initial scan and when the directory
992✔
190
        // watcher emits 'index-created' after a writer renames the file (e.g. 'stream-foo-bar.closed').
992✔
191
        let isClosed = false;
992✔
192
        if (streamName.endsWith('.closed')) {
992✔
193
            streamName = streamName.slice(0, -7);
8✔
194
            isClosed = true;
8✔
195
        }
8✔
196
        if (streamName in this.streams) {
992✔
197
            if (isClosed && !this.streams[streamName].closed) {
44✔
198
                // The stream was renamed to .closed while this instance had it open.
4✔
199
                // The old ReadOnlyIndex was already closed via onRename, so we open the new one.
4✔
200
                const closedIndexName = 'stream-' + streamName + '.closed';
4✔
201
                const closedIndex = this.storage.openReadonlyIndex(closedIndexName);
4✔
202
                // deepcode ignore PrototypePollutionFunctionParams: streams is a Map
4✔
203
                this.streams[streamName] = { index: closedIndex, closed: true };
4✔
204
                this.emit('stream-closed', streamName);
4✔
205
            }
4✔
206
            return;
44✔
207
        }
44✔
208
        const index = isClosed
948✔
209
            ? this.storage.openReadonlyIndex(name)
992✔
210
            : this.storage.openIndex(name);
992✔
211
        // deepcode ignore PrototypePollutionFunctionParams: streams is a Map
992✔
212
        this.streams[streamName] = { index, closed: isClosed };
992✔
213
        this.emit('stream-available', streamName);
992✔
214
    }
992✔
215

4✔
216
    /**
4✔
217
     * Close the event store and free up all resources.
4✔
218
     *
4✔
219
     * @api
4✔
220
     */
4✔
221
    close() {
4✔
222
        this.storage.close();
604✔
223
    }
604✔
224

4✔
225
    /**
4✔
226
     * Override EventEmitter.on() to delegate 'preCommit' and 'preRead' event registrations
4✔
227
     * to the underlying storage, so that `eventstore.on('preCommit', handler)` works naturally.
4✔
228
     * All other events are handled by the default EventEmitter.
4✔
229
     *
4✔
230
     * @param {string} event
4✔
231
     * @param {function} listener
4✔
232
     * @returns {this}
4✔
233
     */
4✔
234
    on(event, listener) {
4✔
235
        if (event === 'preCommit' || event === 'preRead') {
208✔
236
            if (event === 'preCommit') {
76✔
237
                assert(!(this.storage instanceof ReadOnlyStorage), 'The storage was opened in read-only mode. Can not register a preCommit handler on it.');
40✔
238
            }
40✔
239
            this.storage.on(event, listener);
68✔
240
            return this;
68✔
241
        }
68✔
242
        return super.on(event, listener);
132✔
243
    }
208✔
244

4✔
245
    /**
4✔
246
     * @inheritDoc
4✔
247
     */
4✔
248
    addListener(event, listener) {
4✔
249
        return this.on(event, listener);
4✔
250
    }
4✔
251

4✔
252
    /**
4✔
253
     * Override EventEmitter.once() to delegate 'preCommit' and 'preRead' to the underlying storage.
4✔
254
     *
4✔
255
     * @param {string} event
4✔
256
     * @param {function} listener
4✔
257
     * @returns {this}
4✔
258
     */
4✔
259
    once(event, listener) {
4✔
260
        if (event === 'preCommit' || event === 'preRead') {
12✔
261
            if (event === 'preCommit') {
8✔
262
                assert(!(this.storage instanceof ReadOnlyStorage), 'The storage was opened in read-only mode. Can not register a preCommit handler on it.');
4✔
263
            }
4✔
264
            this.storage.once(event, listener);
8✔
265
            return this;
8✔
266
        }
8✔
267
        return super.once(event, listener);
4✔
268
    }
12✔
269

4✔
270
    /**
4✔
271
     * Override EventEmitter.off() / removeListener() to delegate 'preCommit' and 'preRead'
4✔
272
     * to the underlying storage.
4✔
273
     *
4✔
274
     * @param {string} event
4✔
275
     * @param {function} listener
4✔
276
     * @returns {this}
4✔
277
     */
4✔
278
    off(event, listener) {
4✔
279
        if (event === 'preCommit' || event === 'preRead') {
24✔
280
            this.storage.off(event, listener);
12✔
281
            return this;
12✔
282
        }
12✔
283
        return super.off(event, listener);
12✔
284
    }
24✔
285

4✔
286
    /**
4✔
287
     * @inheritDoc
4✔
288
     */
4✔
289
    removeListener(event, listener) {
4✔
290
        return this.off(event, listener);
12✔
291
    }
12✔
292

4✔
293
    /**
4✔
294
     * Convenience method to register a handler called before an event is committed to storage.
4✔
295
     * Equivalent to `eventstore.on('preCommit', hook)`.
4✔
296
     * The handler receives `(event, partitionMetadata)` and may throw to abort the write.
4✔
297
     * Multiple handlers can be registered; all run on every write in registration order.
4✔
298
     * The handler is invoked on every write, so its logic should be cheap, fast, and synchronous.
4✔
299
     *
4✔
300
     * @api
4✔
301
     * @param {function(object, object): void} hook A function receiving (event, partitionMetadata).
4✔
302
     * @throws {Error} If the storage was opened in read-only mode.
4✔
303
     */
4✔
304
    preCommit(hook) {
4✔
305
        this.on('preCommit', hook);
20✔
306
    }
20✔
307

4✔
308
    /**
4✔
309
     * Convenience method to register a handler called before an event is read from storage.
4✔
310
     * Equivalent to `eventstore.on('preRead', hook)`.
4✔
311
     * The handler receives `(position, partitionMetadata)` and may throw to abort the read.
4✔
312
     * Multiple handlers can be registered; all run on every read in registration order.
4✔
313
     * The handler is invoked on every read, so its logic should be cheap, fast, and synchronous.
4✔
314
     *
4✔
315
     * @api
4✔
316
     * @param {function(number, object): void} hook A function receiving (position, partitionMetadata).
4✔
317
     */
4✔
318
    preRead(hook) {
4✔
319
        this.on('preRead', hook);
12✔
320
    }
12✔
321

4✔
322
    /**
4✔
323
     * Get the number of events stored.
4✔
324
     *
4✔
325
     * @api
4✔
326
     * @returns {number}
4✔
327
     */
4✔
328
    get length() {
4✔
329
        return this.storage.length;
1,360✔
330
    }
1,360✔
331

4✔
332
    /**
4✔
333
     * This method makes it so the last three arguments can be given either as:
4✔
334
     *  - expectedVersion, metadata, callback
4✔
335
     *  - expectedVersion, callback
4✔
336
     *  - metadata, callback
4✔
337
     *  - callback
4✔
338
     *
4✔
339
     * @private
4✔
340
     * @param {Array<object>|object} events
4✔
341
     * @param {number|CommitCondition} [expectedVersion]
4✔
342
     * @param {object|function} [metadata]
4✔
343
     * @param {function} [callback]
4✔
344
     * @returns {{events: Array<object>, metadata: object, callback: function, expectedVersion: number|CommitCondition}}
4✔
345
     */
4✔
346
    static fixArgumentTypes(events, expectedVersion, metadata, callback) {
4✔
347
        if (!(events instanceof Array)) {
1,164✔
348
            events = [events];
92✔
349
        }
92✔
350
        if (typeof expectedVersion !== 'number' && !(expectedVersion instanceof CommitCondition)) {
1,164✔
351
            callback = metadata;
296✔
352
            metadata = expectedVersion;
296✔
353
            expectedVersion = ExpectedVersion.Any;
296✔
354
        }
296✔
355
        if (typeof metadata !== 'object') {
1,164✔
356
            callback = metadata;
304✔
357
            metadata = {};
304✔
358
        }
304✔
359
        if (typeof callback !== 'function') {
1,164✔
360
            callback = () => {};
848✔
361
        }
848✔
362
        return { events, expectedVersion, metadata, callback };
1,164✔
363
    }
1,164✔
364

4✔
365
    /**
4✔
366
     * Check a {@link CommitCondition} against the current state of the store.
4✔
367
     * Iterates a join stream over all condition type streams starting from
4✔
368
     * `condition.noneMatchAfter` (the global position captured at query time), and throws an
4✔
369
     * {@link OptimisticConcurrencyError} when a new event of a listed type satisfies
4✔
370
     * `condition.matcher(payload, metadata)` (or any such event when no matcher is provided).
4✔
371
     *
4✔
372
     * @param {CommitCondition} condition
4✔
373
     * @throws {OptimisticConcurrencyError}
4✔
374
     */
4✔
375
    checkCondition(condition) {
4✔
376
        if (this.storage.length <= condition.noneMatchAfter) return; // no new events since condition was obtained
28✔
377

16✔
378
        const existingTypes = condition.types.filter(t => t in this.streams);
16✔
379
        if (existingTypes.length === 0) return;
28!
380

16✔
381
        // Only events after condition.noneMatchAfter can be conflicts.
16✔
382
        const stream = this.fromStreams(
16✔
383
            '_check_' + condition.types.join('_'),
16✔
384
            existingTypes,
16✔
385
            condition.noneMatchAfter + 1
16✔
386
        );
16✔
387

16✔
388
        let next;
16✔
389
        while ((next = stream.next()) !== false) {
16✔
390
            if (!condition.matcher || condition.matcher(next.payload, next.metadata)) {
16✔
391
                throw new OptimisticConcurrencyError(
12✔
392
                    `Optimistic Concurrency error. A conflicting event was committed since the condition was obtained.`
12✔
393
                );
12✔
394
            }
12✔
395
        }
16✔
396
    }
28✔
397

4✔
398
    /**
4✔
399
     * Ensure a dedicated type stream exists for each event's type, creating it if needed.
4✔
400
     * Must be called before the entity stream is created to guarantee correct index routing.
4✔
401
     *
4✔
402
     * @param {Array<object>} events The events to process.
4✔
403
     */
4✔
404
    ensureTypeStreams(events) {
4✔
405
        if (!this.typeAccessor) return;
1,152✔
406
        for (const event of events) {
1,152✔
407
            const type = this.resolveValidatedTypeStreamName(event);
132✔
408
            if (type && !(type in this.streams)) {
132✔
409
                const matcher = this.typeMatcherFn
104✔
410
                    ? this.typeMatcherFn(type)
104✔
411
                    : (doc) => this.typeAccessor(doc.payload) === type;
104✔
412
                this.createEventStream(type, matcher, false);
104✔
413
            }
104✔
414
        }
132✔
415
    }
1,152✔
416

4✔
417
    resolveValidatedTypeStreamName(event) {
4✔
418
        const type = this.typeAccessor(event);
132✔
419
        if (type === undefined || type === null || type === '') {
132✔
420
            return null;
8✔
421
        }
8✔
422
        assert(typeof type === 'string', 'typeAccessor must return a string.');
124✔
423
        assert(STREAM_NAME_PATTERN.test(type), 'typeAccessor must return a valid stream name.');
124✔
424
        return type;
124✔
425
    }
132✔
426

4✔
427
    /**
4✔
428
     * Commit a list of events for the given stream name, which is expected to be at the given version.
4✔
429
     * Note that the events committed may still appear in other streams too - the given stream name is only
4✔
430
     * relevant for optimistic concurrency checks with the given expected version.
4✔
431
     *
4✔
432
     * @api
4✔
433
     * @param {string} streamName The name of the stream to commit the events to.
4✔
434
     * @param {Array<object>|object} events The events to commit or a single event.
4✔
435
     * @param {number|CommitCondition} [expectedVersion] One of the `ExpectedVersion` constants, a positive
4✔
436
     *   stream version number, or a {@link CommitCondition} obtained from {@link EventStore#query}.
4✔
437
     * @param {object} [metadata] The commit metadata to use as base. Useful for replication and adding storage metadata.
4✔
438
     * @param {function} [callback] A function that will be executed when all events have been committed.
4✔
439
     * @throws {OptimisticConcurrencyError} if the stream is not at the expected version, or if a
4✔
440
     *   {@link CommitCondition} was provided and conflicting events have been committed since it was obtained.
4✔
441
     */
4✔
442
    commit(streamName, events, expectedVersion = ExpectedVersion.Any, metadata = {}, callback = null) {
4✔
443
        assert(!(this.storage instanceof ReadOnlyStorage), 'The storage was opened in read-only mode. Can not commit to it.');
1,176✔
444
        assert(typeof streamName === 'string' && streamName !== '', 'Must specify a stream name for commit.');
1,176✔
445
        assert(typeof events !== 'undefined' && events !== null, 'No events specified for commit.');
1,176✔
446

1,176✔
447
        ({ events, expectedVersion, metadata, callback } = EventStore.fixArgumentTypes(events, expectedVersion, metadata, callback));
1,176✔
448

1,176✔
449
        // Perform DCB-style concurrency check when a CommitCondition is provided.
1,176✔
450
        if (expectedVersion instanceof CommitCondition) {
1,176✔
451
            this.checkCondition(expectedVersion);
28✔
452
            expectedVersion = ExpectedVersion.Any;
28✔
453
        }
28✔
454

1,152✔
455
        // When typeAccessor is configured, ensure a dedicated type stream exists for each event
1,152✔
456
        // before the entity stream write so the type stream index is never incomplete.
1,152✔
457
        this.ensureTypeStreams(events);
1,152✔
458

1,152✔
459
        if (!(streamName in this.streams)) {
1,176✔
460
            this.createEventStream(streamName, { stream: streamName }, false);
696✔
461
        }
696✔
462
        assert(!this.streams[streamName].closed, `Stream "${streamName}" is closed and cannot be written to.`);
1,144✔
463
        let streamVersion = this.streams[streamName].index.length;
1,144✔
464
        if (expectedVersion !== ExpectedVersion.Any && streamVersion !== expectedVersion) {
1,176✔
465
            throw new OptimisticConcurrencyError(`Optimistic Concurrency error. Expected stream "${streamName}" at version ${expectedVersion} but is at version ${streamVersion}.`);
12✔
466
        }
12✔
467

1,124✔
468
        if (events.length > 1) {
1,176✔
469
            delete metadata.commitVersion;
60✔
470
        }
60✔
471

1,124✔
472
        const commitId = this.length;
1,124✔
473
        let commitVersion = 0;
1,124✔
474
        const commitSize = events.length;
1,124✔
475
        const committedAt = Date.now();
1,124✔
476
        const commit = Object.assign({
1,124✔
477
            commitId,
1,124✔
478
            committedAt
1,124✔
479
        }, metadata, {
1,124✔
480
            streamName,
1,124✔
481
            streamVersion,
1,124✔
482
            events: []
1,124✔
483
        });
1,124✔
484
        const commitCallback = () => {
1,124✔
485
            this.emit('commit', commit);
1,120✔
486
            callback(commit);
1,120✔
487
        };
1,124✔
488
        for (let event of events) {
1,176✔
489
            const eventMetadata = Object.assign({ commitId, committedAt, commitVersion, commitSize }, metadata, { streamVersion });
1,212✔
490
            const storedEvent = { stream: streamName, payload: event, metadata: eventMetadata };
1,212✔
491
            commitVersion++;
1,212✔
492
            streamVersion++;
1,212✔
493
            commit.events.push(event);
1,212✔
494
            this.storage.write(storedEvent, commitVersion !== events.length ? undefined : commitCallback);
1,212✔
495
        }
1,212✔
496
    }
1,176✔
497

4✔
498
    /**
4✔
499
     * @api
4✔
500
     * @param {string} streamName The name of the stream to get the version for.
4✔
501
     * @returns {number} The version that the given stream is at currently, or -1 if the stream does not exist.
4✔
502
     */
4✔
503
    getStreamVersion(streamName) {
4✔
504
        if (!(streamName in this.streams)) {
116✔
505
            return -1;
20✔
506
        }
20✔
507
        return this.streams[streamName].index.length;
96✔
508
    }
116✔
509

4✔
510
    /**
4✔
511
     * Query the event store for events matching a set of event types and an optional filter function.
4✔
512
     * Returns a pre-filtered event stream and a {@link CommitCondition} that can be passed to
4✔
513
     * {@link EventStore#commit} to enforce optimistic concurrency.
4✔
514
     *
4✔
515
     * A conflict occurs when at least one event appended between the `query` call and the `commit` call
4✔
516
     * belongs to one of the listed types and (when `matcher` is provided) also satisfies
4✔
517
     * `matcher(payload, metadata)`.  Events written before the `query` call are never treated as conflicts.
4✔
518
     *
4✔
519
     * **Behaviour when a type stream does not exist:**
4✔
520
     * - Without `typeAccessor` configured: throws an error, because the store cannot guarantee that no
4✔
521
     *   events of that type exist (the stream was never created).  Create the stream explicitly first,
4✔
522
     *   or configure `typeAccessor` to have streams created automatically on commit.
4✔
523
     * - With `typeAccessor` configured: treats the missing stream as empty (0-length).  The stream will
4✔
524
     *   be created automatically the first time an event of that type is committed.
4✔
525
     *
4✔
526
     * @api
4✔
527
     * @param {string[]} types A non-empty array of event-type names to query.
4✔
528
     * @param {function(object, object): boolean|null} [matcher] An optional filter function `(payload, metadata) => boolean`
4✔
529
     *   passed to the returned {@link CommitCondition}.
4✔
530
     * @param {number} [minRevision=1] The 1-based minimum global revision to include in the returned stream (inclusive).
4✔
531
     * @returns {{ condition: CommitCondition, stream: EventStream }} An object with:
4✔
532
     *   - `condition` — the {@link CommitCondition} to pass to {@link EventStore#commit}.
4✔
533
     *   - `stream` — a read-only event stream containing all matching events.
4✔
534
     * @throws {Error} if `types` is not a non-empty array.
4✔
535
     * @throws {Error} if `typeAccessor` is not configured and any of the listed type streams do not exist.
4✔
536
     */
4✔
537
    query(types, matcher = null, minRevision = 1) {
4✔
538
        assert(Array.isArray(types) && types.length > 0, 'Must specify a non-empty array of event types for query.');
100✔
539

100✔
540
        const queryTypes = [];
100✔
541
        for (const type of types) {
100✔
542
            if (!(type in this.streams)) {
104✔
543
                if (this.typeAccessor) {
52✔
544
                    // typeAccessor is configured: type streams are created on commit, so a missing
48✔
545
                    // stream simply means no event of this type has been committed yet — treat as empty.
48✔
546
                    continue;
48✔
547
                }
48✔
548
                // No typeAccessor: the stream was never created; we cannot know whether events of
4✔
549
                // this type exist in the store, so throw to avoid an unintentional full-store scan.
4✔
550
                throw new Error(`Type stream "${type}" does not exist. Create it with createEventStream() first, or configure typeAccessor to have type streams created automatically on commit.`);
4✔
551
            }
4✔
552
            queryTypes.push(type);
52✔
553
        }
52✔
554

88✔
555
        const condition = new CommitCondition(types, matcher, this.storage.length);
88✔
556
        const stream = this.fromStreams('_query_' + types.join('_'), queryTypes, minRevision, -1, matcher);
88✔
557
        return { stream, condition };
88✔
558
    }
100✔
559

4✔
560
    /**
4✔
561
     * Get an event stream for the given stream name within the revision boundaries.
4✔
562
     *
4✔
563
     * @api
4✔
564
     * @param {string} streamName The name of the stream to get.
4✔
565
     * @param {number} [minRevision=1] The 1-based minimum revision to include in the events (inclusive).
4✔
566
     * @param {number} [maxRevision=-1] The 1-based maximum revision to include in the events (inclusive).
4✔
567
     * @param {(function(object, object): boolean)|true|null} [predicate] An optional filter function
4✔
568
     *   `(payload, metadata) => boolean`. Pass `true` to activate raw-buffer mode to receive NDJSON buffers
4✔
569
     * @returns {EventStream|boolean} The event stream or false if a stream with the name doesn't exist.
4✔
570
     */
4✔
571
    getEventStream(streamName, minRevision = 1, maxRevision = -1, predicate = null) {
4✔
572
        if (!(streamName in this.streams)) {
184✔
573
            return false;
4✔
574
        }
4✔
575
        return new EventStream(streamName, this, minRevision, maxRevision, predicate);
180✔
576
    }
184✔
577

4✔
578
    /**
4✔
579
     * Get a stream for all events within the revision boundaries.
4✔
580
     * This is the same as `getEventStream('_all', ...)`.
4✔
581
     *
4✔
582
     * @api
4✔
583
     * @param {number} [minRevision=1] The 1-based minimum revision to include in the events (inclusive).
4✔
584
     * @param {number} [maxRevision=-1] The 1-based maximum revision to include in the events (inclusive).
4✔
585
     * @param {(function(object, object): boolean)|true|null} [predicate] An optional filter function
4✔
586
     *   `(payload, metadata) => boolean`. Only events for which this returns truthy are yielded. Pass `true` to activate raw-buffer mode to receive NDJSON buffers
4✔
587
     * @returns {EventStream} The event stream.
4✔
588
     */
4✔
589
    getAllEvents(minRevision = 1, maxRevision = -1, predicate = null) {
4✔
590
        return this.getEventStream('_all', minRevision, maxRevision, predicate);
4✔
591
    }
4✔
592

4✔
593
    /**
4✔
594
     * Create a virtual event stream from existing streams by joining them.
4✔
595
     *
4✔
596
     * @param {string} streamName The (transient) name of the joined stream.
4✔
597
     * @param {Array<string>} streamNames An array of the stream names to join.
4✔
598
     * @param {number} [minRevision=1] The 1-based minimum revision to include in the events (inclusive).
4✔
599
     * @param {number} [maxRevision=-1] The 1-based maximum revision to include in the events (inclusive).
4✔
600
     * @param {(function(object, object): boolean)|true|null} [predicate] An optional filter function
4✔
601
     *   `(payload, metadata) => boolean`. Only events for which this returns truthy are yielded. Pass `true` to activate raw-buffer mode to receive NDJSON buffers
4✔
602
     * @returns {EventStream} The joined event stream.
4✔
603
     * @throws {Error} if any of the streams doesn't exist.
4✔
604
     */
4✔
605
    fromStreams(streamName, streamNames, minRevision = 1, maxRevision = -1, predicate = null) {
4✔
606
        assert(streamNames instanceof Array, 'Must specify an array of stream names.');
152✔
607

152✔
608
        if (streamNames.length === 0) {
152✔
609
            return new EventStream(streamName, this);
44✔
610
        }
44✔
611

100✔
612
        for (let stream of streamNames) {
152✔
613
            assert(stream in this.streams, `Stream "${stream}" does not exist.`);
256✔
614
        }
256✔
615

96✔
616
        if (streamNames.length === 1) {
152✔
617
            const stream = new EventStream(streamNames[0], this, minRevision, maxRevision, predicate);
52✔
618
            stream.name = streamName;
52✔
619
            return stream;
52✔
620
        }
52✔
621

44✔
622
        return new JoinEventStream(streamName, streamNames, this, minRevision, maxRevision, predicate);
44✔
623
    }
152✔
624

4✔
625
    /**
4✔
626
     * Get a stream for a category of streams. This will effectively return a joined stream of all streams that start
4✔
627
     * with the given `categoryName` followed by a dash (flat layout, e.g. `users-123`) or a slash (hierarchical
4✔
628
     * layout, e.g. `users/123`).
4✔
629
     * If you frequently use this for a category consisting of a lot of streams (e.g. `users`), consider creating a
4✔
630
     * dedicated physical stream for the category:
4✔
631
     *
4✔
632
     *    `eventstore.createEventStream('users', e => e.stream.startsWith('users-') || e.stream.startsWith('users/'))`
4✔
633
     *
4✔
634
     * @api
4✔
635
     * @param {string} categoryName The name of the category to get a stream for. A category is a stream name prefix.
4✔
636
     * @param {number} [minRevision=1] The 1-based minimum revision to include in the events (inclusive).
4✔
637
     * @param {number} [maxRevision=-1] The 1-based maximum revision to include in the events (inclusive).
4✔
638
     * @returns {EventStream} The joined event stream for all streams of the given category.
4✔
639
     * @throws {Error} If no stream for this category exists.
4✔
640
     */
4✔
641
    getEventStreamForCategory(categoryName, minRevision = 1, maxRevision = -1, raw = false) {
4✔
642
        if (categoryName in this.streams) {
36✔
643
            return this.getEventStream(categoryName, minRevision, maxRevision, raw);
4✔
644
        }
4✔
645
        const categoryStreams = Object.keys(this.streams).filter(streamName =>
32✔
646
            streamName.startsWith(categoryName + '-') ||
212✔
647
            streamName.startsWith(categoryName + '/')
124✔
648
        );
32✔
649

32✔
650
        if (categoryStreams.length === 0) {
36✔
651
            throw new Error(`No streams for category '${categoryName}' exist.`);
4✔
652
        }
4✔
653
        return this.fromStreams(categoryName, categoryStreams, minRevision, maxRevision, null, raw);
28✔
654
    }
36✔
655

4✔
656
    /**
4✔
657
     * Create a new stream with the given matcher.
4✔
658
     *
4✔
659
     * @api
4✔
660
     * @param {string} streamName The name of the stream to create.
4✔
661
     * @param {object|function(event)} matcher A matcher object, denoting the properties that need to match on an event a function that takes the event and returns true if the event should be added.
4✔
662
     * @param {boolean} [reindex=true] Whether to scan existing documents and populate the new index. Set to false when it is known that no existing documents can match the matcher (e.g. when creating a brand-new write stream).
4✔
663
     * @returns {EventStream} The EventStream with all existing events matching the matcher.
4✔
664
     * @throws {Error} If a stream with that name already exists.
4✔
665
     * @throws {Error} If the stream could not be created.
4✔
666
     */
4✔
667
    createEventStream(streamName, matcher, reindex = true) {
4✔
668
        assert(!(this.storage instanceof ReadOnlyStorage), 'The storage was opened in read-only mode. Can not create new stream on it.');
880✔
669
        assert(!(streamName in this.streams), 'Can not recreate stream!');
880✔
670

880✔
671
        const streamIndexName = 'stream-' + streamName;
880✔
672
        if (streamName.includes('/')) {
880✔
673
            const subDir = path.join(this.streamsDirectory, this.storeName + '.stream-' + path.dirname(streamName));
88✔
674
            ensureDirectory(subDir);
88✔
675
        }
88✔
676
        const index = this.storage.ensureIndex(streamIndexName, matcher, reindex);
864✔
677
        assert(index !== null, `Error creating stream index ${streamName}.`);
864✔
678

864✔
679
        // deepcode ignore PrototypePollutionFunctionParams: streams is a Map
864✔
680
        this.streams[streamName] = { index, matcher };
864✔
681
        this.emit('stream-created', streamName);
864✔
682
        return new EventStream(streamName, this);
864✔
683
    }
880✔
684

4✔
685
    /**
4✔
686
     * Delete an event stream. Will do nothing if the stream with the name doesn't exist.
4✔
687
     *
4✔
688
     * Note that you can delete a write stream, but that will not delete the events written to it.
4✔
689
     * Also, on next write, that stream will be rebuilt from all existing events, which might take some time.
4✔
690
     *
4✔
691
     * @api
4✔
692
     * @param {string} streamName The name of the stream to delete.
4✔
693
     * @returns void
4✔
694
     */
4✔
695
    deleteEventStream(streamName) {
4✔
696
        assert(!(this.storage instanceof ReadOnlyStorage), 'The storage was opened in read-only mode. Can not delete a stream on it.');
12✔
697

12✔
698
        if (!(streamName in this.streams)) {
12✔
699
            return;
4✔
700
        }
4✔
701
        this.streams[streamName].index.destroy();
4✔
702
        delete this.streams[streamName];
4✔
703
        this.emit('stream-deleted', streamName);
4✔
704
    }
12✔
705

4✔
706
    /**
4✔
707
     * Close a stream so that no new events are indexed into it.
4✔
708
     * The stream will still be readable, but any attempt to write to it will throw an error.
4✔
709
     * A closed stream is persisted by renaming its index file to include a `.closed` marker
4✔
710
     * (e.g. `stream-X.closed.index`), so it will be recognized as closed when the store is reopened.
4✔
711
     *
4✔
712
     * @api
4✔
713
     * @param {string} streamName The name of the stream to close.
4✔
714
     * @returns void
4✔
715
     * @throws {Error} If the storage is read-only.
4✔
716
     * @throws {Error} If the stream does not exist.
4✔
717
     * @throws {Error} If the stream is already closed.
4✔
718
     */
4✔
719
    closeEventStream(streamName) {
4✔
720
        assert(!(this.storage instanceof ReadOnlyStorage), 'The storage was opened in read-only mode. Can not close a stream on it.');
48✔
721
        assert(streamName in this.streams, `Stream "${streamName}" does not exist.`);
48✔
722
        assert(!this.streams[streamName].closed, `Stream "${streamName}" is already closed.`);
48✔
723

48✔
724
        const indexName = 'stream-' + streamName;
48✔
725
        const { index } = this.streams[streamName];
48✔
726

48✔
727
        // Flush and close the index before renaming the file
48✔
728
        index.close();
48✔
729

48✔
730
        // Rename the index file to mark it as closed (e.g. stream-foo.index -> stream-foo.closed.index)
48✔
731
        const closedFileName = index.fileName.replace(/\.index$/, '.closed.index');
48✔
732
        fs.renameSync(index.fileName, closedFileName);
48✔
733

48✔
734
        // Remove from secondary indexes so that new writes are no longer indexed into this stream
48✔
735
        this.storage.removeSecondaryIndex(indexName);
48✔
736

48✔
737
        // Reopen the renamed index for read access, outside the secondary indexes write path
48✔
738
        const closedIndexName = indexName + '.closed';
48✔
739
        const closedIndex = this.storage.openReadonlyIndex(closedIndexName);
48✔
740

48✔
741
        // deepcode ignore PrototypePollutionFunctionParams: streams is a Map
48✔
742
        this.streams[streamName] = { index: closedIndex, closed: true };
48✔
743
        this.emit('stream-closed', streamName);
48✔
744
    }
48✔
745

4✔
746
    /**
4✔
747
     * Get a durable consumer for the given stream that will keep receiving events from the last position.
4✔
748
     *
4✔
749
     * @param {string} streamName The name of the stream to consume.
4✔
750
     * @param {string} identifier The unique identifying name of this consumer.
4✔
751
     * @param {object} [initialState] The initial state of the consumer.
4✔
752
     * @param {number} [since] The stream revision to start consuming from.
4✔
753
     * @returns {Consumer} A durable consumer for the given stream.
4✔
754
     */
4✔
755
    getConsumer(streamName, identifier, initialState = {}, since = 0) {
4✔
756
        const consumer = new Consumer(this.storage, streamName === '_all' ? '_all' : 'stream-' + streamName, identifier, initialState, since);
20✔
757
        consumer.streamName = streamName;
20✔
758
        return consumer;
20✔
759
    }
20✔
760

4✔
761
    /**
4✔
762
     * Scan the existing consumers on this EventStore and asynchronously return a list of their names.
4✔
763
     * @param {function(error: Error, consumers: array)} callback A callback that will receive an error as first and the list of consumers as second argument.
4✔
764
     */
4✔
765
    scanConsumers(callback) {
4✔
766
        const consumersPath = path.join(this.storage.indexDirectory, 'consumers');
8✔
767
        if (!fs.existsSync(consumersPath)) {
8✔
768
            callback(null, []);
4✔
769
            return;
4✔
770
        }
4✔
771
        const regex = new RegExp(`^${this.storage.storageFile}\\.([^.]*\\..*)$`);
4✔
772
        const consumers = [];
4✔
773
        scanForFiles(consumersPath, regex, consumers.push.bind(consumers), /* istanbul ignore next */ (err) => {
4✔
774
            if (err) {
4!
UNCOV
775
                return callback(err, []);
×
UNCOV
776
            }
×
777
            callback(null, consumers);
4✔
778
        });
4✔
779
    }
8✔
780
}
4✔
781

4✔
782
EventStore.Storage = Storage;
4✔
783
EventStore.Index = Index;
4✔
784

4✔
785
export default EventStore;
4✔
786
export { ExpectedVersion, OptimisticConcurrencyError, CommitCondition, LOCK_THROW, LOCK_RECLAIM };
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc