• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

albe / node-event-storage / 23503077312

24 Mar 2026 05:23PM UTC coverage: 98.028% (+0.1%) from 97.928%
23503077312

Pull #267

github

web-flow
Merge 3971ca999 into 04fbba608
Pull Request #267: Replace unbounded index data array with fixed-size ring buffer

719 of 755 branches covered (95.23%)

Branch coverage included in aggregate %.

90 of 90 new or added lines in 4 files covered. (100.0%)

1 existing line in 1 file now uncovered.

1617 of 1628 relevant lines covered (99.32%)

1406.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.27
/src/Storage/ReadableStorage.js
1
const fs = require('fs');
4✔
2
const path = require('path');
4✔
3
const events = require('events');
4✔
4
const Partition = require('../Partition');
4✔
5
const Index = require('../Index');
4✔
6
const { assert, createHmac, matches, wrapAndCheck, buildMetadataForMatcher } = require('../util');
4✔
7

8
const DEFAULT_READ_BUFFER_SIZE = 4 * 1024;
4✔
9

10
/**
11
 * Reverses the items of an iterable
12
 * @param {Generator|Iterable} iterator
13
 * @returns {Generator<*>}
14
 */
15
function *reverse(iterator) {
16
    const items = Array.from(iterator);
68✔
17
    for (let i = items.length - 1; i >= 0; i--) {
68✔
18
        yield items[i];
580✔
19
    }
20
}
21

22
/**
23
 * @typedef {object|function(object):boolean} Matcher
24
 */
25

26
/**
27
 * An append-only storage with highly performant positional range scans.
28
 * It's highly optimized for an event-store and hence does not support compaction or data-rewrite, nor any querying
29
 */
30
class ReadableStorage extends events.EventEmitter {
31

32
    /**
33
     * @param {string} [storageName] The name of the storage.
34
     * @param {object} [config] An object with storage parameters.
35
     * @param {object} [config.serializer] A serializer object with methods serialize(document) and deserialize(data).
36
     * @param {function(object): string} config.serializer.serialize Default is JSON.stringify.
37
     * @param {function(string): object} config.serializer.deserialize Default is JSON.parse.
38
     * @param {string} [config.dataDirectory] The path where the storage data should reside. Default '.'.
39
     * @param {string} [config.indexDirectory] The path where the indexes should be stored. Defaults to dataDirectory.
40
     * @param {string} [config.indexFile] The name of the primary index. Default '{storageName}.index'.
41
     * @param {number} [config.readBufferSize] Size of the read buffer in bytes. Default 4096.
42
     * @param {object} [config.indexOptions] An options object that should be passed to all indexes on construction.
43
     * @param {string} [config.hmacSecret] A private key that is used to verify matchers retrieved from indexes.
44
     * @param {object} [config.metadata] A metadata object to be stored in all partitions belonging to this storage.
45
     */
46
    constructor(storageName = 'storage', config = {}) {
536!
47
        super();
1,004✔
48
        if (typeof storageName !== 'string') {
1,004✔
49
            config = storageName;
52✔
50
            storageName = undefined;
52✔
51
        }
52

53
        this.storageFile = storageName || 'storage';
1,004✔
54
        const defaults = {
1,004✔
55
            serializer: { serialize: JSON.stringify, deserialize: JSON.parse },
56
            dataDirectory: '.',
57
            indexFile: this.storageFile + '.index',
58
            indexOptions: {},
59
            hmacSecret: '',
60
            metadata: {}
61
        };
62
        config = Object.assign(defaults, config);
1,004✔
63
        this.serializer = config.serializer;
1,004✔
64

65
        this.hmac = createHmac(config.hmacSecret);
1,004✔
66

67
        this.dataDirectory = path.resolve(config.dataDirectory);
1,004✔
68

69
        this.scanPartitions(config);
1,004✔
70
        this.initializeIndexes(config);
1,004✔
71
    }
72

73
    /**
74
     * @protected
75
     * @param {string} name
76
     * @param {object} [options]
77
     * @returns {{ index: ReadableIndex, matcher?: Matcher }}
78
     */
79
    createIndex(name, options = {}) {
×
80
        /** @type ReadableIndex */
81
        const index = new Index.ReadOnly(name, options);
136✔
82
        return { index };
132✔
83
    }
84

85
    /**
86
     * @protected
87
     * @param {string} name
88
     * @param {object} [options]
89
     * @returns {ReadablePartition}
90
     */
91
    createPartition(name, options = {}) {
×
92
        return new Partition.ReadOnly(name, options);
52✔
93
    }
94

95
    /**
96
     * Create/open the primary index and build the base configuration for all secondary indexes.
97
     *
98
     * @private
99
     * @param {object} config The configuration object
100
     * @returns void
101
     */
102
    initializeIndexes(config) {
103
        this.indexDirectory = path.resolve(config.indexDirectory || this.dataDirectory);
1,004✔
104

105
        this.indexOptions = config.indexOptions;
1,004✔
106
        this.indexOptions.dataDirectory = this.indexDirectory;
1,004✔
107
        // Safety precaution to prevent accidentally restricting main index
108
        delete this.indexOptions.matcher;
1,004✔
109
        const { index } = this.createIndex(config.indexFile, this.indexOptions);
1,004✔
110
        this.index = index;
1,000✔
111
        this.secondaryIndexes = {};
1,000✔
112
        this.readonlyIndexes = {};
1,000✔
113
    }
114

115
    /**
116
     * The amount of documents in the storage.
117
     * @returns {number}
118
     */
119
    get length() {
120
        return this.index.length;
4,472✔
121
    }
122

123
    /**
124
     * Scan the data directory for all existing partitions.
125
     * Every file beginning with the storageFile name is considered a partition.
126
     *
127
     * @private
128
     * @param {object} config The configuration object containing options for the partitions.
129
     * @returns void
130
     */
131
    scanPartitions(config) {
132
        const defaults = {
1,004✔
133
            readBufferSize: DEFAULT_READ_BUFFER_SIZE
134
        };
135
        this.partitionConfig = Object.assign(defaults, config);
1,004✔
136
        this.partitions = Object.create(null);
1,004✔
137

138
        const files = fs.readdirSync(this.dataDirectory);
1,004✔
139
        for (let file of files) {
1,004✔
140
            if (file.substr(-6) === '.index') continue;
492✔
141
            if (file.substr(-7) === '.branch') continue;
320✔
142
            if (file.substr(-5) === '.lock') continue;
308✔
143
            if (file.substr(0, this.storageFile.length) !== this.storageFile) continue;
216✔
144

145
            const partition = this.createPartition(file, this.partitionConfig);
144✔
146
            this.partitions[partition.id] = partition;
144✔
147
        }
148
    }
149

150
    /**
151
     * Open the storage and indexes and create read and write buffers eagerly.
152
     * Will emit an 'opened' event if finished.
153
     *
154
     * @api
155
     * @returns {boolean}
156
     */
157
    open() {
158
        this.index.open();
952✔
159

160
        this.forEachSecondaryIndex(index => index.open());
952✔
161

162
        this.emit('opened');
952✔
163
        return true;
952✔
164
    }
165

166
    /**
167
     * Close the storage and frees up all resources.
168
     * Will emit a 'closed' event when finished.
169
     *
170
     * @api
171
     * @returns void
172
     */
173
    close() {
174
        this.index.close();
1,568✔
175
        this.forEachSecondaryIndex(index => index.close());
1,568✔
176
        for (let index of Object.values(this.readonlyIndexes)) {
1,568✔
177
            index.close();
44✔
178
        }
179
        this.forEachPartition(partition => partition.close());
1,636✔
180
        this.emit('closed');
1,568✔
181
    }
182

183
    /**
184
     * Get a partition either by name or by id.
185
     * If a partition with the given name does not exist, a new one will be created.
186
     * If a partition with the given id does not exist, an error is thrown.
187
     *
188
     * @protected
189
     * @param {string|number} partitionIdentifier The partition name or the partition Id
190
     * @returns {ReadablePartition}
191
     * @throws {Error} If an id is given and no such partition exists.
192
     */
193
    getPartition(partitionIdentifier) {
194
        assert(partitionIdentifier in this.partitions, `Partition #${partitionIdentifier} does not exist.`);
4,192✔
195

196
        this.partitions[partitionIdentifier].open();
4,192✔
197
        return this.partitions[partitionIdentifier];
4,192✔
198
    }
199

200
    /**
201
     * Register a handler that is called before a document is read from a partition.
202
     * The handler receives the position and the partition metadata and may throw to abort the read.
203
     * Multiple handlers can be registered; all run on every read in registration order.
204
     * Equivalent to `storage.on('preRead', hook)`.
205
     *
206
     * @api
207
     * @param {function(number, object): void} hook A function receiving (position, partitionMetadata).
208
     */
209
    preRead(hook) {
210
        this.on('preRead', hook);
12✔
211
    }
212

213
    /**
214
     * @protected
215
     * @param {number} partitionId The partition to read from.
216
     * @param {number} position The file position to read from.
217
     * @param {number} [size] The expected byte size of the document at the given position.
218
     * @returns {object} The document stored at the given position.
219
     * @throws {Error} if the document at the given position can not be deserialized.
220
     */
221
    readFrom(partitionId, position, size) {
222
        const partition = this.getPartition(partitionId);
4,120✔
223
        if (this.listenerCount('preRead') > 0) {
4,120✔
224
            this.emit('preRead', position, partition.metadata);
84✔
225
        }
226
        const data = partition.readFrom(position, size);
4,108✔
227
        return this.serializer.deserialize(data);
4,108✔
228
    }
229

230
    /**
231
     * Read a single document from the given position, in the full index or in the provided index.
232
     *
233
     * @api
234
     * @param {number} number The 1-based document number (inside the given index) to read.
235
     * @param {ReadableIndex} [index] The index to use for finding the document position.
236
     * @returns {object} The document at the given position inside the index.
237
     */
238
    read(number, index) {
239
        index = index || this.index;
580✔
240

241
        if (!index.isOpen()) {
580✔
242
            index.open();
4✔
243
        }
244

245
        const entry = index.get(number);
580✔
246
        if (entry === false) {
580✔
247
            return false;
4✔
248
        }
249

250
        return this.readFrom(entry.partition, entry.position, entry.size);
576✔
251
    }
252

253
    /**
254
     * Read a range of documents from the given position range, in the full index or in the provided index.
255
     * Returns a generator in order to reduce memory usage and be able to read lots of documents with little latency.
256
     *
257
     * @api
258
     * @param {number} from The 1-based document number (inclusive) to start reading from.
259
     * @param {number} [until] The 1-based document number (inclusive) to read until. Defaults to index.length.
260
     * @param {ReadableIndex|false} [index] The index to use for finding the documents in the range.
261
     *   Pass `false` to skip the global index and iterate all partitions directly in sequenceNumber order
262
     *   (useful when the global index is unavailable or corrupted).
263
     * @returns {Generator<object>} A generator that will read each document in the range one by one.
264
     */
265
    *readRange(from, until = -1, index = null) {
72✔
266
        const lengthSource = index || this.index;
536✔
267
        if (!lengthSource.isOpen()) {
536✔
268
            lengthSource.open();
4✔
269
        }
270

271
        const readFrom = wrapAndCheck(from, lengthSource.length);
536✔
272
        const readUntil = wrapAndCheck(until, lengthSource.length);
536✔
273
        assert(readFrom > 0 && readUntil > 0, `Range scan error for range ${from} - ${until}.`);
536✔
274

275
        if (readFrom > readUntil) {
520✔
276
            const batchSize = 10;
48✔
277
            let batchUntil = readFrom;
48✔
278
            while (batchUntil >= readUntil) {
48✔
279
                const batchFrom = Math.max(readUntil, batchUntil - batchSize);
68✔
280
                yield* reverse(this.iterateRange(batchFrom, batchUntil, index));
68✔
281
                batchUntil = batchFrom - 1;
68✔
282
            }
283
            return undefined;
48✔
284
        }
285

286
        yield* this.iterateRange(readFrom, readUntil, index);
472✔
287
    }
288

289
    /**
290
     * Iterate all documents in this storage in range from to until inside the index.
291
     * If index is false, iterates all partitions directly in sequenceNumber order.
292
     * @private
293
     * @param {number} from
294
     * @param {number} until
295
     * @param {ReadableIndex|false|null} index
296
     * @returns {Generator<object>}
297
     */
298
    *iterateRange(from, until, index) {
299
        if (index === false) {
540✔
300
            // Explicitly disabled index: iterate all partitions and merge by sequenceNumber.
301
            // Document header sequenceNumber is 0-based; from/until are 1-based index positions.
302
            for (const entry of this.iteratePartitionsBySequenceNumber(from - 1, until - 1)) {
12✔
303
                yield entry.document;
92✔
304
            }
305
            return;
12✔
306
        }
307

308
        const idx = index || this.index;
528✔
309
        const entries = idx.range(from, until);
528✔
310
        for (let entry of entries) {
528✔
311
            const document = this.readFrom(entry.partition, entry.position, entry.size);
1,780✔
312
            yield document;
1,776✔
313
        }
314
    }
315

316
    /**
317
     * Iterate documents across all partitions in sequenceNumber order using a k-way merge.
318
     * SequenceNumbers stored in document headers are 0-based.
319
     * Each yielded entry includes the deserialized document, its sequenceNumber, the partition name,
320
     * the byte position within the partition, the data size, and the partition id —
321
     * allowing callers to rebuild index entries.
322
     * @api
323
     * @param {number} fromSeq The 0-based sequenceNumber to start from (inclusive).
324
     * @param {number} untilSeq The 0-based sequenceNumber to read until (inclusive).
325
     * @returns {Generator<{document: object, sequenceNumber: number, partitionName: string, position: number, size: number, partitionId: number}>}
326
     */
327
    *iteratePartitionsBySequenceNumber(fromSeq, untilSeq) {
328
        const partitions = [];
52✔
329

330
        for (const partition of Object.values(this.partitions)) {
52✔
331
            if (!partition.isOpen()) {
92✔
332
                partition.open();
48✔
333
            }
334
            const headerOut = {};
92✔
335
            const reader = partition.readAll(0, headerOut);
92✔
336

337
            // Advance to the first document with sequenceNumber >= fromSeq
338
            let result = reader.next();
92✔
339
            while (!result.done && headerOut.sequenceNumber < fromSeq) {
92✔
340
                result = reader.next();
96✔
341
            }
342

343
            if (!result.done && headerOut.sequenceNumber <= untilSeq) {
92!
344
                partitions.push({ reader, headerOut, data: result.value, sequenceNumber: headerOut.sequenceNumber, position: headerOut.position, size: headerOut.dataSize, partitionId: partition.id, partitionName: partition.name });
92✔
345
            }
346
        }
347

348
        // K-way merge: at each step, yield the document with the smallest sequenceNumber
349
        while (partitions.length > 0) {
52✔
350
            let minIdx = 0;
228✔
351
            for (let i = 1; i < partitions.length; i++) {
228✔
352
                if (partitions[i].sequenceNumber < partitions[minIdx].sequenceNumber) {
212✔
353
                    minIdx = i;
104✔
354
                }
355
            }
356

357
            const { data, sequenceNumber, partitionName, position, size, partitionId } = partitions[minIdx];
228✔
358
            yield { document: this.serializer.deserialize(data), sequenceNumber, partitionName, position, size, partitionId };
228✔
359

360
            const next = partitions[minIdx].reader.next();
228✔
361
            if (!next.done && partitions[minIdx].headerOut.sequenceNumber <= untilSeq) {
228✔
362
                partitions[minIdx].data = next.value;
136✔
363
                partitions[minIdx].sequenceNumber = partitions[minIdx].headerOut.sequenceNumber;
136✔
364
                partitions[minIdx].position = partitions[minIdx].headerOut.position;
136✔
365
                partitions[minIdx].size = partitions[minIdx].headerOut.dataSize;
136✔
366
            } else {
367
                partitions.splice(minIdx, 1);
92✔
368
            }
369
        }
370
    }
371

372
    /**
373
     * Open an existing readonly index for reading, without registering it in the secondary indexes write path.
374
     * Use this for indexes whose files carry a status marker (e.g. `stream-foo.closed.index`).
375
     *
376
     * @api
377
     * @param {string} name The readonly index name (e.g. 'stream-foo.closed').
378
     * @returns {ReadableIndex}
379
     * @throws {Error} if the readonly index does not exist.
380
     */
381
    openReadonlyIndex(name) {
382
        if (name in this.readonlyIndexes) {
44!
UNCOV
383
            return this.readonlyIndexes[name];
×
384
        }
385
        const indexName = this.storageFile + '.' + name + '.index';
44✔
386
        assert(fs.existsSync(path.join(this.indexDirectory, indexName)), `Index "${name}" does not exist.`);
44✔
387
        const { index } = this.createIndex(indexName, Object.assign({}, this.indexOptions));
44✔
388
        index.open();
44✔
389
        this.readonlyIndexes[name] = index;
44✔
390
        return index;
44✔
391
    }
392

393
    /**
394
     * Open an existing index.
395
     *
396
     * @api
397
     * @param {string} name The index name.
398
     * @param {Matcher} [matcher] The matcher object or function that the index needs to have been defined with. If not given it will not be validated.
399
     * @returns {ReadableIndex}
400
     * @throws {Error} if the index with that name does not exist.
401
     * @throws {Error} if the HMAC for the matcher does not match.
402
     */
403
    openIndex(name, matcher) {
404
        if (name === '_all') {
740✔
405
            return this.index;
8✔
406
        }
407
        if (name in this.secondaryIndexes) {
732✔
408
            return this.secondaryIndexes[name].index;
628✔
409
        }
410

411
        const indexName = this.storageFile + '.' + name + '.index';
104✔
412
        assert(fs.existsSync(path.join(this.indexDirectory, indexName)), `Index "${name}" does not exist.`);
104✔
413

414
        const metadata = buildMetadataForMatcher(matcher, this.hmac);
100✔
415
        let { index } = this.secondaryIndexes[name] = this.createIndex(indexName, Object.assign({}, this.indexOptions, { metadata }));
100✔
416

417
        index.open();
88✔
418
        return index;
88✔
419
    }
420

421
    /**
422
     * Helper method to iterate over all documents.
423
     *
424
     * @protected
425
     * @param {function(object, EntryInterface)} iterationHandler
426
     */
427
    forEachDocument(iterationHandler) {
428
        /* istanbul ignore if  */
429
        if (typeof iterationHandler !== 'function') {
844✔
430
            return;
431
        }
432

433
        const entries = this.index.all();
844✔
434

435
        for (let entry of entries) {
844✔
436
            const document = this.readFrom(entry.partition, entry.position, entry.size);
1,752✔
437
            iterationHandler(document, entry);
1,752✔
438
        }
439
    }
440

441
    /**
442
     * Helper method to iterate over all secondary indexes.
443
     *
444
     * @protected
445
     * @param {function(ReadableIndex, string)} iterationHandler
446
     * @param {object} [matchDocument] If supplied, only indexes the document matches on will be iterated.
447
     */
448
    forEachSecondaryIndex(iterationHandler, matchDocument) {
449
        /* istanbul ignore if  */
450
        if (typeof iterationHandler !== 'function') {
5,812✔
451
            return;
452
        }
453

454
        for (let indexName of Object.keys(this.secondaryIndexes)) {
5,812✔
455
            if (!matchDocument || matches(matchDocument, this.secondaryIndexes[indexName].matcher)) {
4,988✔
456
                iterationHandler(this.secondaryIndexes[indexName].index, indexName);
2,684✔
457
            }
458
        }
459
    }
460

461
    /**
462
     * Helper method to iterate over all partitions.
463
     *
464
     * @protected
465
     * @param {function(ReadablePartition)} iterationHandler
466
     */
467
    forEachPartition(iterationHandler) {
468
        /* istanbul ignore if  */
469
        if (typeof iterationHandler !== 'function') {
1,816✔
470
            return;
471
        }
472

473
        for (let partition of Object.keys(this.partitions)) {
1,816✔
474
            iterationHandler(this.partitions[partition]);
1,912✔
475
        }
476
    }
477

478
}
479

480
module.exports = ReadableStorage;
4✔
481
module.exports.matches = matches;
4✔
482
module.exports.CorruptFileError = Partition.CorruptFileError;
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc