• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

albe / node-event-storage / 23897068196

02 Apr 2026 10:56AM UTC coverage: 98.151% (+0.1%) from 98.054%
23897068196

Pull #257

github

web-flow
Merge b52671064 into 50d3642f2
Pull Request #257: Move to ES Modules (ESM)

890 of 929 branches covered (95.8%)

Branch coverage included in aggregate %.

118 of 118 new or added lines in 21 files covered. (100.0%)

55 existing lines in 12 files now uncovered.

4684 of 4750 relevant lines covered (98.61%)

787.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.91
/src/Storage/ReadableStorage.js
1
import fs from 'fs';
4✔
2
import path from 'path';
4✔
3
import events from 'events';
4✔
4
import Partition, { ReadOnly as ReadOnlyPartition } from '../Partition.js';
4✔
5
import Index, { ReadOnly as ReadOnlyIndex } from '../Index.js';
4✔
6
import { assert, wrapAndCheck, kWayMerge } from '../util.js';
4✔
7
import { createHmac, matches, buildMetadataForMatcher } from '../metadataUtil.js';
4✔
8

4✔
9
const DEFAULT_READ_BUFFER_SIZE = 4 * 1024;
4✔
10

4✔
11
/**
4✔
12
 * Reverses the items of an iterable
4✔
13
 * @param {Generator|Iterable} iterator
4✔
14
 * @returns {Generator<*>}
4✔
15
 */
4✔
16
function *reverse(iterator) {
68✔
17
    const items = Array.from(iterator);
68✔
18
    for (let i = items.length - 1; i >= 0; i--) {
68✔
19
        yield items[i];
580✔
20
    }
580✔
21
}
68✔
22

4✔
23
/**
4✔
24
 * @typedef {object|function(object):boolean} Matcher
4✔
25
 */
4✔
26

4✔
27
/**
4✔
28
 * An append-only storage with highly performant positional range scans.
4✔
29
 * It's highly optimized for an event-store and hence does not support compaction or data-rewrite, nor any querying
4✔
30
 */
4✔
31
class ReadableStorage extends events.EventEmitter {
4✔
32

4✔
33
    /**
4✔
34
     * @param {string} [storageName] The name of the storage.
4✔
35
     * @param {object} [config] An object with storage parameters.
4✔
36
     * @param {object} [config.serializer] A serializer object with methods serialize(document) and deserialize(data).
4✔
37
     * @param {function(object): string} config.serializer.serialize Default is JSON.stringify.
4✔
38
     * @param {function(string): object} config.serializer.deserialize Default is JSON.parse.
4✔
39
     * @param {string} [config.dataDirectory] The path where the storage data should reside. Default '.'.
4✔
40
     * @param {string} [config.indexDirectory] The path where the indexes should be stored. Defaults to dataDirectory.
4✔
41
     * @param {string} [config.indexFile] The name of the primary index. Default '{storageName}.index'.
4✔
42
     * @param {number} [config.readBufferSize] Size of the read buffer in bytes. Default 4096.
4✔
43
     * @param {object} [config.indexOptions] An options object that should be passed to all indexes on construction.
4✔
44
     * @param {string} [config.hmacSecret] A private key that is used to verify matchers retrieved from indexes.
4✔
45
     * @param {object} [config.metadata] A metadata object to be stored in all partitions belonging to this storage.
4✔
46
     */
4✔
47
    constructor(storageName = 'storage', config = {}) {
4✔
48
        super();
1,004✔
49
        if (typeof storageName !== 'string') {
1,004✔
50
            config = storageName;
52✔
51
            storageName = undefined;
52✔
52
        }
52✔
53

1,004✔
54
        this.storageFile = storageName || 'storage';
1,004✔
55
        const defaults = {
1,004✔
56
            serializer: { serialize: JSON.stringify, deserialize: JSON.parse },
1,004✔
57
            dataDirectory: '.',
1,004✔
58
            indexFile: this.storageFile + '.index',
1,004✔
59
            indexOptions: {},
1,004✔
60
            hmacSecret: '',
1,004✔
61
            metadata: {}
1,004✔
62
        };
1,004✔
63
        config = Object.assign(defaults, config);
1,004✔
64
        this.serializer = config.serializer;
1,004✔
65

1,004✔
66
        this.hmac = createHmac(config.hmacSecret);
1,004✔
67

1,004✔
68
        this.dataDirectory = path.resolve(config.dataDirectory);
1,004✔
69

1,004✔
70
        this.scanPartitions(config);
1,004✔
71
        this.initializeIndexes(config);
1,004✔
72
    }
1,004✔
73

4✔
74
    /**
4✔
75
     * @protected
4✔
76
     * @param {string} name
4✔
77
     * @param {object} [options]
4✔
78
     * @returns {{ index: ReadableIndex, matcher?: Matcher }}
4✔
79
     */
4✔
80
    createIndex(name, options = {}) {
4✔
81
        /** @type ReadableIndex */
136✔
82
        const index = new ReadOnlyIndex(name, options);
136✔
83
        return { index };
136✔
84
    }
136✔
85

4✔
86
    /**
4✔
87
     * @protected
4✔
88
     * @param {string} name
4✔
89
     * @param {object} [options]
4✔
90
     * @returns {ReadablePartition}
4✔
91
     */
4✔
92
    createPartition(name, options = {}) {
4✔
93
        return new ReadOnlyPartition(name, options);
52✔
94
    }
52✔
95

4✔
96
    /**
4✔
97
     * Create/open the primary index and build the base configuration for all secondary indexes.
4✔
98
     *
4✔
99
     * @private
4✔
100
     * @param {object} config The configuration object
4✔
101
     * @returns void
4✔
102
     */
4✔
103
    initializeIndexes(config) {
4✔
104
        this.indexDirectory = path.resolve(config.indexDirectory || this.dataDirectory);
1,004✔
105

1,004✔
106
        this.indexOptions = config.indexOptions;
1,004✔
107
        this.indexOptions.dataDirectory = this.indexDirectory;
1,004✔
108
        // Safety precaution to prevent accidentally restricting main index
1,004✔
109
        delete this.indexOptions.matcher;
1,004✔
110
        const { index } = this.createIndex(config.indexFile, this.indexOptions);
1,004✔
111
        this.index = index;
1,004✔
112
        this.secondaryIndexes = {};
1,004✔
113
        this.readonlyIndexes = {};
1,004✔
114
    }
1,004✔
115

4✔
116
    /**
4✔
117
     * The amount of documents in the storage.
4✔
118
     * @returns {number}
4✔
119
     */
4✔
120
    get length() {
4✔
121
        return this.index.length;
4,472✔
122
    }
4,472✔
123

4✔
124
    /**
4✔
125
     * Scan the data directory for all existing partitions.
4✔
126
     * Every file beginning with the storageFile name is considered a partition.
4✔
127
     *
4✔
128
     * @private
4✔
129
     * @param {object} config The configuration object containing options for the partitions.
4✔
130
     * @returns void
4✔
131
     */
4✔
132
    scanPartitions(config) {
4✔
133
        const defaults = {
1,004✔
134
            readBufferSize: DEFAULT_READ_BUFFER_SIZE
1,004✔
135
        };
1,004✔
136
        this.partitionConfig = Object.assign(defaults, config);
1,004✔
137
        this.partitions = Object.create(null);
1,004✔
138

1,004✔
139
        const files = fs.readdirSync(this.dataDirectory);
1,004✔
140
        for (let file of files) {
1,004✔
141
            if (file.substr(-6) === '.index') continue;
492✔
142
            if (file.substr(-7) === '.branch') continue;
492✔
143
            if (file.substr(-5) === '.lock') continue;
492✔
144
            if (file.substr(0, this.storageFile.length) !== this.storageFile) continue;
492✔
145

144✔
146
            const partition = this.createPartition(file, this.partitionConfig);
144✔
147
            this.partitions[partition.id] = partition;
144✔
148
        }
144✔
149
    }
1,004✔
150

4✔
151
    /**
4✔
152
     * Open the storage and indexes and create read and write buffers eagerly.
4✔
153
     * Will emit an 'opened' event if finished.
4✔
154
     *
4✔
155
     * @api
4✔
156
     * @returns {boolean}
4✔
157
     */
4✔
158
    open() {
4✔
159
        this.index.open();
952✔
160

952✔
161
        this.forEachSecondaryIndex(index => index.open());
952✔
162

952✔
163
        this.emit('opened');
952✔
164
        return true;
952✔
165
    }
952✔
166

4✔
167
    /**
4✔
168
     * Close the storage and frees up all resources.
4✔
169
     * Will emit a 'closed' event when finished.
4✔
170
     *
4✔
171
     * @api
4✔
172
     * @returns void
4✔
173
     */
4✔
174
    close() {
4✔
175
        this.index.close();
1,568✔
176
        this.forEachSecondaryIndex(index => index.close());
1,568✔
177
        for (let index of Object.values(this.readonlyIndexes)) {
1,568✔
178
            index.close();
44✔
179
        }
44✔
180
        this.forEachPartition(partition => partition.close());
1,568✔
181
        this.emit('closed');
1,568✔
182
    }
1,568✔
183

4✔
184
    /**
4✔
185
     * Get a partition either by name or by id.
4✔
186
     * If a partition with the given name does not exist, a new one will be created.
4✔
187
     * If a partition with the given id does not exist, an error is thrown.
4✔
188
     *
4✔
189
     * @protected
4✔
190
     * @param {string|number} partitionIdentifier The partition name or the partition Id
4✔
191
     * @returns {ReadablePartition}
4✔
192
     * @throws {Error} If an id is given and no such partition exists.
4✔
193
     */
4✔
194
    getPartition(partitionIdentifier) {
4✔
195
        assert(partitionIdentifier in this.partitions, `Partition #${partitionIdentifier} does not exist.`);
4,184✔
196

4,184✔
197
        this.partitions[partitionIdentifier].open();
4,184✔
198
        return this.partitions[partitionIdentifier];
4,184✔
199
    }
4,184✔
200

4✔
201
    /**
4✔
202
     * Register a handler that is called before a document is read from a partition.
4✔
203
     * The handler receives the position and the partition metadata and may throw to abort the read.
4✔
204
     * Multiple handlers can be registered; all run on every read in registration order.
4✔
205
     * Equivalent to `storage.on('preRead', hook)`.
4✔
206
     *
4✔
207
     * @api
4✔
208
     * @param {function(number, object): void} hook A function receiving (position, partitionMetadata).
4✔
209
     */
4✔
210
    preRead(hook) {
4✔
211
        this.on('preRead', hook);
12✔
212
    }
12✔
213

4✔
214
    /**
4✔
215
     * @protected
4✔
216
     * @param {number} partitionId The partition to read from.
4✔
217
     * @param {number} position The file position to read from.
4✔
218
     * @param {number} [size] The expected byte size of the document at the given position.
4✔
219
     * @returns {object} The document stored at the given position.
4✔
220
     * @throws {Error} if the document at the given position can not be deserialized.
4✔
221
     */
4✔
222
    readFrom(partitionId, position, size) {
4✔
223
        const partition = this.getPartition(partitionId);
4,120✔
224
        if (this.listenerCount('preRead') > 0) {
4,120✔
225
            this.emit('preRead', position, partition.metadata);
84✔
226
        }
84✔
227
        const data = partition.readFrom(position, size);
4,108✔
228
        return this.serializer.deserialize(data);
4,108✔
229
    }
4,120✔
230

4✔
231
    /**
4✔
232
     * Read a single document from the given position, in the full index or in the provided index.
4✔
233
     *
4✔
234
     * @api
4✔
235
     * @param {number} number The 1-based document number (inside the given index) to read.
4✔
236
     * @param {ReadableIndex} [index] The index to use for finding the document position.
4✔
237
     * @returns {object} The document at the given position inside the index.
4✔
238
     */
4✔
239
    read(number, index) {
4✔
240
        index = index || this.index;
580✔
241

580✔
242
        if (!index.isOpen()) {
580✔
243
            index.open();
4✔
244
        }
4✔
245

580✔
246
        const entry = index.get(number);
580✔
247
        if (entry === false) {
580✔
248
            return false;
4✔
249
        }
4✔
250

576✔
251
        return this.readFrom(entry.partition, entry.position, entry.size);
576✔
252
    }
580✔
253

4✔
254
    /**
4✔
255
     * Read a range of documents from the given position range, in the full index or in the provided index.
4✔
256
     * Returns a generator in order to reduce memory usage and be able to read lots of documents with little latency.
4✔
257
     *
4✔
258
     * @api
4✔
259
     * @param {number} from The 1-based document number (inclusive) to start reading from.
4✔
260
     * @param {number} [until] The 1-based document number (inclusive) to read until. Defaults to index.length.
4✔
261
     * @param {ReadableIndex|false} [index] The index to use for finding the documents in the range.
4✔
262
     *   Pass `false` to skip the global index and iterate all partitions directly in sequenceNumber order
4✔
263
     *   (useful when the global index is unavailable or corrupted).
4✔
264
     * @returns {Generator<object>} A generator that will read each document in the range one by one.
4✔
265
     */
4✔
266
    *readRange(from, until = -1, index = null) {
4✔
267
        const lengthSource = index || this.index;
536✔
268
        if (!lengthSource.isOpen()) {
536✔
269
            lengthSource.open();
4✔
270
        }
4✔
271

536✔
272
        const readFrom = wrapAndCheck(from, lengthSource.length);
536✔
273
        const readUntil = wrapAndCheck(until, lengthSource.length);
536✔
274
        assert(readFrom > 0 && readUntil > 0, `Range scan error for range ${from} - ${until}.`);
536✔
275

536✔
276
        if (readFrom > readUntil) {
536✔
277
            const batchSize = 10;
48✔
278
            let batchUntil = readFrom;
48✔
279
            while (batchUntil >= readUntil) {
48✔
280
                const batchFrom = Math.max(readUntil, batchUntil - batchSize);
68✔
281
                yield* reverse(this.iterateRange(batchFrom, batchUntil, index));
68✔
282
                batchUntil = batchFrom - 1;
68✔
283
            }
68✔
284
            return undefined;
48✔
285
        }
48✔
286

472✔
287
        yield* this.iterateRange(readFrom, readUntil, index);
472✔
288
    }
536✔
289

4✔
290
    /**
4✔
291
     * Iterate all documents in this storage in range from to until inside the index.
4✔
292
     * If index is false, iterates all partitions directly in sequenceNumber order.
4✔
293
     * @private
4✔
294
     * @param {number} from
4✔
295
     * @param {number} until
4✔
296
     * @param {ReadableIndex|false|null} index
4✔
297
     * @returns {Generator<object>}
4✔
298
     */
4✔
299
    *iterateRange(from, until, index) {
4✔
300
        if (index === false) {
540✔
301
            // Explicitly disabled index: iterate all partitions and merge by sequenceNumber.
12✔
302
            // Document header sequenceNumber is 0-based; from/until are 1-based index positions.
12✔
303
            for (const entry of this.iterateDocumentsNoIndex(from - 1, until - 1)) {
12✔
304
                yield entry.document;
92✔
305
            }
92✔
306
            return;
12✔
307
        }
12✔
308

528✔
309
        const idx = index || this.index;
540✔
310
        const entries = idx.range(from, until);
540✔
311
        for (let entry of entries) {
540✔
312
            const document = this.readFrom(entry.partition, entry.position, entry.size);
1,780✔
313
            yield document;
1,780✔
314
        }
1,768✔
315
    }
540✔
316

4✔
317
    /**
4✔
318
     * Open an existing readonly index for reading, without registering it in the secondary indexes write path.
4✔
319
     * Use this for indexes whose files carry a status marker (e.g. `stream-foo.closed.index`).
4✔
320
     *
4✔
321
     * @api
4✔
322
     * @param {string} name The readonly index name (e.g. 'stream-foo.closed').
4✔
323
     * @returns {ReadableIndex}
4✔
324
     * @throws {Error} if the readonly index does not exist.
4✔
325
     */
4✔
326
    openReadonlyIndex(name) {
4✔
327
        if (name in this.readonlyIndexes) {
44!
328
            return this.readonlyIndexes[name];
×
UNCOV
329
        }
×
330
        const indexName = this.storageFile + '.' + name + '.index';
44✔
331
        assert(fs.existsSync(path.join(this.indexDirectory, indexName)), `Index "${name}" does not exist.`);
44✔
332
        const { index } = this.createIndex(indexName, Object.assign({}, this.indexOptions));
44✔
333
        index.open();
44✔
334
        this.readonlyIndexes[name] = index;
44✔
335
        return index;
44✔
336
    }
44✔
337

4✔
338
    /**
4✔
339
     * Open an existing index.
4✔
340
     *
4✔
341
     * @api
4✔
342
     * @param {string} name The index name.
4✔
343
     * @param {Matcher} [matcher] The matcher object or function that the index needs to have been defined with. If not given it will not be validated.
4✔
344
     * @returns {ReadableIndex}
4✔
345
     * @throws {Error} if the index with that name does not exist.
4✔
346
     * @throws {Error} if the HMAC for the matcher does not match.
4✔
347
     */
4✔
348
    openIndex(name, matcher) {
4✔
349
        if (name === '_all') {
740✔
350
            return this.index;
8✔
351
        }
8✔
352
        if (name in this.secondaryIndexes) {
740✔
353
            return this.secondaryIndexes[name].index;
628✔
354
        }
628✔
355

104✔
356
        const indexName = this.storageFile + '.' + name + '.index';
104✔
357
        assert(fs.existsSync(path.join(this.indexDirectory, indexName)), `Index "${name}" does not exist.`);
104✔
358

104✔
359
        const metadata = buildMetadataForMatcher(matcher, this.hmac);
104✔
360
        let { index } = this.secondaryIndexes[name] = this.createIndex(indexName, Object.assign({}, this.indexOptions, { metadata }));
104✔
361

104✔
362
        index.open();
104✔
363
        return index;
104✔
364
    }
740✔
365

4✔
366
    /**
4✔
367
     * Iterate documents across all partitions in sequenceNumber order using a k-way merge.
4✔
368
     * Opens any closed partition automatically.
4✔
369
     *
4✔
370
     * @protected
4✔
371
     * @param {number} [from=0] The 0-based sequenceNumber to start from (inclusive).
4✔
372
     * @param {number} [until=Number.MAX_SAFE_INTEGER] The 0-based sequenceNumber to read until (inclusive).
4✔
373
     * @returns {Generator<{document: object, sequenceNumber: number, partitionName: string, position: number, size: number, partition: number}>}
4✔
374
     */
4✔
375
    *iterateDocumentsNoIndex(from = 0, until = Number.MAX_SAFE_INTEGER) {
4✔
376
        const streams = [];
64✔
377

64✔
378
        this.forEachPartition(partition => {
64✔
379
            if (!partition.isOpen()) {
108✔
380
                partition.open();
52✔
381
            }
52✔
382

108✔
383
            const found = partition.findDocument(from);
108✔
384
            if (found && found.headerOut.sequenceNumber <= until) {
108✔
385
                const nextPosition = found.headerOut.position + partition.documentWriteSize(found.headerOut.dataSize);
92✔
386
                const reader = partition.readAll(nextPosition, found.headerOut);
92✔
387
                streams.push({ ...found, reader, partition: partition.id, partitionName: partition.name });
92✔
388
            }
92✔
389
        });
64✔
390

64✔
391
        const items = [];
64✔
392
        kWayMerge(
64✔
393
            streams,
64✔
394
            stream => stream.headerOut.sequenceNumber,
64✔
395
            stream => {
64✔
396
                const next = stream.reader.next();
228✔
397
                if (!next.done && stream.headerOut.sequenceNumber <= until) {
228✔
398
                    stream.data = next.value;
136✔
399
                    return true;
136✔
400
                }
136✔
401
                return false;
92✔
402
            },
64✔
403
            stream => items.push({
64✔
404
                document: this.serializer.deserialize(stream.data),
228✔
405
                sequenceNumber: stream.headerOut.sequenceNumber,
228✔
406
                partitionName: stream.partitionName,
228✔
407
                position: stream.headerOut.position,
228✔
408
                size: stream.headerOut.dataSize,
228✔
409
                partition: stream.partition,
228✔
410
            })
228✔
411
        );
64✔
412

64✔
413
        yield* items;
64✔
414
    }
64✔
415

4✔
416
    /**
4✔
417
     * Helper method to iterate over all documents, invoking a callback for each one.
4✔
418
     * Pass `noIndex = true` to iterate all partitions directly in sequenceNumber order
4✔
419
     * (useful when the global index is unavailable or corrupted).
4✔
420
     * When `noIndex` is false the second callback argument is the raw index `EntryInterface`.
4✔
421
     * When `noIndex` is true the second callback argument has `{ partition, position, size, sequenceNumber, partitionName }`.
4✔
422
     *
4✔
423
     * @protected
4✔
424
     * @param {function(object, object): void} iterationHandler
4✔
425
     * @param {boolean} [noIndex=false] When true, bypasses the index and iterates partitions directly.
4✔
426
     */
4✔
427
    forEachDocument(iterationHandler, noIndex = false) {
4✔
428
        /* istanbul ignore if  */
848✔
429
        if (typeof iterationHandler !== 'function') {
848!
UNCOV
430
            return;
×
UNCOV
431
        }
×
432

848✔
433
        if (noIndex) {
848✔
434
            for (const { document, ...entryInfo } of this.iterateDocumentsNoIndex()) {
4✔
435
                iterationHandler(document, entryInfo);
24✔
436
            }
24✔
437
            return;
4✔
438
        }
4✔
439

844✔
440
        const entries = this.index.all();
844✔
441

844✔
442
        for (let entry of entries) {
848✔
443
            const document = this.readFrom(entry.partition, entry.position, entry.size);
1,752✔
444
            iterationHandler(document, entry);
1,752✔
445
        }
1,752✔
446
    }
848✔
447

4✔
448
    /**
4✔
449
     * Helper method to iterate over all secondary indexes.
4✔
450
     *
4✔
451
     * @protected
4✔
452
     * @param {function(ReadableIndex, string)} iterationHandler
4✔
453
     * @param {object} [matchDocument] If supplied, only indexes the document matches on will be iterated.
4✔
454
     */
4✔
455
    forEachSecondaryIndex(iterationHandler, matchDocument) {
4✔
456
        /* istanbul ignore if  */
5,836✔
457
        if (typeof iterationHandler !== 'function') {
5,836!
UNCOV
458
            return;
×
UNCOV
459
        }
×
460

5,836✔
461
        for (let indexName of Object.keys(this.secondaryIndexes)) {
5,836✔
462
            if (!matchDocument || matches(matchDocument, this.secondaryIndexes[indexName].matcher)) {
4,988✔
463
                iterationHandler(this.secondaryIndexes[indexName].index, indexName);
2,668✔
464
            }
2,668✔
465
        }
4,988✔
466
    }
5,836✔
467

4✔
468
    /**
4✔
469
     * Helper method to iterate over all partitions.
4✔
470
     *
4✔
471
     * @protected
4✔
472
     * @param {function(ReadablePartition)} iterationHandler
4✔
473
     */
4✔
474
    forEachPartition(iterationHandler) {
4✔
475
        /* istanbul ignore if  */
1,868✔
476
        if (typeof iterationHandler !== 'function') {
1,868!
UNCOV
477
            return;
×
UNCOV
478
        }
×
479

1,868✔
480
        for (let partition of Object.keys(this.partitions)) {
1,868✔
481
            iterationHandler(this.partitions[partition]);
2,008✔
482
        }
2,008✔
483
    }
1,868✔
484

4✔
485
}
4✔
486

4✔
487
export default ReadableStorage;
4✔
488
export { matches };
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc