• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

albe / node-event-storage / 23356312261

20 Mar 2026 06:08PM UTC coverage: 97.811% (-0.02%) from 97.826%
23356312261

Pull #250

github

web-flow
Merge 498e58610 into 28d4e34f0
Pull Request #250: Add configurable preCommit and preRead hooks at the stream and partition level

606 of 641 branches covered (94.54%)

Branch coverage included in aggregate %.

37 of 37 new or added lines in 3 files covered. (100.0%)

1 existing line in 1 file now uncovered.

1449 of 1460 relevant lines covered (99.25%)

1170.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.45
/src/Storage/ReadableStorage.js
1
const fs = require('fs');
4✔
2
const path = require('path');
4✔
3
const events = require('events');
4✔
4
const Partition = require('../Partition');
4✔
5
const Index = require('../Index');
4✔
6
const { assert, createHmac, matches, wrapAndCheck, buildMetadataForMatcher } = require('../util');
4✔
7

8
const DEFAULT_READ_BUFFER_SIZE = 4 * 1024;
4✔
9

10
/**
11
 * Reverses the items of an iterable
12
 * @param {Generator|Iterable} iterator
13
 * @returns {Generator<*>}
14
 */
15
function *reverse(iterator) {
16
    const items = Array.from(iterator);
56✔
17
    for (let i = items.length - 1; i >= 0; i--) {
56✔
18
        yield items[i];
496✔
19
    }
20
}
21

22
/**
23
 * @typedef {object|function(object):boolean} Matcher
24
 */
25

26
/**
27
 * An append-only storage with highly performant positional range scans.
28
 * It's highly optimized for an event-store and hence does not support compaction or data-rewrite, nor any querying
29
 */
30
class ReadableStorage extends events.EventEmitter {
31

32
    /**
33
     * @param {string} [storageName] The name of the storage.
34
     * @param {object} [config] An object with storage parameters.
35
     * @param {object} [config.serializer] A serializer object with methods serialize(document) and deserialize(data).
36
     * @param {function(object): string} config.serializer.serialize Default is JSON.stringify.
37
     * @param {function(string): object} config.serializer.deserialize Default is JSON.parse.
38
     * @param {string} [config.dataDirectory] The path where the storage data should reside. Default '.'.
39
     * @param {string} [config.indexDirectory] The path where the indexes should be stored. Defaults to dataDirectory.
40
     * @param {string} [config.indexFile] The name of the primary index. Default '{storageName}.index'.
41
     * @param {number} [config.readBufferSize] Size of the read buffer in bytes. Default 4096.
42
     * @param {object} [config.indexOptions] An options object that should be passed to all indexes on construction.
43
     * @param {string} [config.hmacSecret] A private key that is used to verify matchers retrieved from indexes.
44
     * @param {object} [config.metadata] A metadata object to be stored in all partitions belonging to this storage.
45
     */
46
    constructor(storageName = 'storage', config = {}) {
432!
47
        super();
804✔
48
        if (typeof storageName !== 'string') {
804✔
49
            config = storageName;
52✔
50
            storageName = undefined;
52✔
51
        }
52

53
        this.storageFile = storageName || 'storage';
804✔
54
        const defaults = {
804✔
55
            serializer: { serialize: JSON.stringify, deserialize: JSON.parse },
56
            dataDirectory: '.',
57
            indexFile: this.storageFile + '.index',
58
            indexOptions: {},
59
            hmacSecret: '',
60
            metadata: {}
61
        };
62
        config = Object.assign(defaults, config);
804✔
63
        this.serializer = config.serializer;
804✔
64

65
        this.hmac = createHmac(config.hmacSecret);
804✔
66

67
        this.dataDirectory = path.resolve(config.dataDirectory);
804✔
68

69
        this.preReadHook = null;
804✔
70

71
        this.initializeIndexes(config);
804✔
72
        this.scanPartitions(config);
800✔
73
    }
74

75
    /**
76
     * @protected
77
     * @param {string} name
78
     * @param {object} [options]
79
     * @returns {{ index: ReadableIndex, matcher?: Matcher }}
80
     */
81
    createIndex(name, options = {}) {
×
82
        /** @type ReadableIndex */
83
        const index = new Index.ReadOnly(name, options);
128✔
84
        return { index };
124✔
85
    }
86

87
    /**
88
     * @protected
89
     * @param {string} name
90
     * @param {object} [options]
91
     * @returns {ReadablePartition}
92
     */
93
    createPartition(name, options = {}) {
×
94
        return new Partition.ReadOnly(name, options);
48✔
95
    }
96

97
    /**
98
     * Create/open the primary index and build the base configuration for all secondary indexes.
99
     *
100
     * @private
101
     * @param {object} config The configuration object
102
     * @returns void
103
     */
104
    initializeIndexes(config) {
105
        this.indexDirectory = path.resolve(config.indexDirectory || this.dataDirectory);
804✔
106

107
        this.indexOptions = config.indexOptions;
804✔
108
        this.indexOptions.dataDirectory = this.indexDirectory;
804✔
109
        // Safety precaution to prevent accidentally restricting main index
110
        delete this.indexOptions.matcher;
804✔
111
        const { index } = this.createIndex(config.indexFile, this.indexOptions);
804✔
112
        this.index = index;
800✔
113
        this.secondaryIndexes = {};
800✔
114
        this.readonlyIndexes = {};
800✔
115
    }
116

117
    /**
118
     * The amount of documents in the storage.
119
     * @returns {number}
120
     */
121
    get length() {
122
        return this.index.length;
3,344✔
123
    }
124

125
    /**
126
     * Scan the data directory for all existing partitions.
127
     * Every file beginning with the storageFile name is considered a partition.
128
     *
129
     * @private
130
     * @param {object} config The configuration object containing options for the partitions.
131
     * @returns void
132
     */
133
    scanPartitions(config) {
134
        const defaults = {
800✔
135
            readBufferSize: DEFAULT_READ_BUFFER_SIZE
136
        };
137
        this.partitionConfig = Object.assign(defaults, config);
800✔
138
        this.partitions = Object.create(null);
800✔
139

140
        const files = fs.readdirSync(this.dataDirectory);
800✔
141
        for (let file of files) {
800✔
142
            if (file.substr(-6) === '.index') continue;
1,016✔
143
            if (file.substr(-7) === '.branch') continue;
484!
144
            if (file.substr(-5) === '.lock') continue;
484✔
145
            if (file.substr(0, this.storageFile.length) !== this.storageFile) continue;
400✔
146

147
            const partition = this.createPartition(file, this.partitionConfig);
84✔
148
            this.partitions[partition.id] = partition;
84✔
149
        }
150
    }
151

152
    /**
153
     * Open the storage and indexes and create read and write buffers eagerly.
154
     * Will emit an 'opened' event if finished.
155
     *
156
     * @api
157
     * @returns {boolean}
158
     */
159
    open() {
160
        this.index.open();
728✔
161

162
        this.forEachSecondaryIndex(index => index.open());
728✔
163

164
        this.emit('opened');
728✔
165
        return true;
728✔
166
    }
167

168
    /**
169
     * Close the storage and frees up all resources.
170
     * Will emit a 'closed' event when finished.
171
     *
172
     * @api
173
     * @returns void
174
     */
175
    close() {
176
        this.index.close();
1,252✔
177
        this.forEachSecondaryIndex(index => index.close());
1,252✔
178
        for (let index of Object.values(this.readonlyIndexes)) {
1,252✔
179
            index.close();
44✔
180
        }
181
        this.forEachPartition(partition => partition.close());
1,252✔
182
        this.emit('closed');
1,252✔
183
    }
184

185
    /**
186
     * Get a partition either by name or by id.
187
     * If a partition with the given name does not exist, a new one will be created.
188
     * If a partition with the given id does not exist, an error is thrown.
189
     *
190
     * @protected
191
     * @param {string|number} partitionIdentifier The partition name or the partition Id
192
     * @returns {ReadablePartition}
193
     * @throws {Error} If an id is given and no such partition exists.
194
     */
195
    getPartition(partitionIdentifier) {
196
        assert(partitionIdentifier in this.partitions, `Partition #${partitionIdentifier} does not exist.`);
3,616✔
197

198
        this.partitions[partitionIdentifier].open();
3,616✔
199
        return this.partitions[partitionIdentifier];
3,616✔
200
    }
201

202
    /**
203
     * Register a hook that is called before a document is read from a partition.
204
     * The hook receives the position and the partition metadata and may throw to abort the read.
205
     *
206
     * @api
207
     * @param {function(number, object): void} hook A function receiving (position, partitionMetadata).
208
     */
209
    preRead(hook) {
210
        this.preReadHook = hook;
24✔
211
    }
212

213
    /**
214
     * @protected
215
     * @param {number} partitionId The partition to read from.
216
     * @param {number} position The file position to read from.
217
     * @param {number} [size] The expected byte size of the document at the given position.
218
     * @returns {object} The document stored at the given position.
219
     * @throws {Error} if the document at the given position can not be deserialized.
220
     */
221
    readFrom(partitionId, position, size) {
222
        const partition = this.getPartition(partitionId);
3,576✔
223
        if (this.preReadHook) {
3,576✔
224
            this.preReadHook(position, partition.metadata);
32✔
225
        }
226
        const data = partition.readFrom(position, size);
3,568✔
227
        return this.serializer.deserialize(data);
3,568✔
228
    }
229

230
    /**
231
     * Read a single document from the given position, in the full index or in the provided index.
232
     *
233
     * @api
234
     * @param {number} number The 1-based document number (inside the given index) to read.
235
     * @param {ReadableIndex} [index] The index to use for finding the document position.
236
     * @returns {object} The document at the given position inside the index.
237
     */
238
    read(number, index) {
239
        index = index || this.index;
176✔
240

241
        if (!index.isOpen()) {
176✔
242
            index.open();
4✔
243
        }
244

245
        const entry = index.get(number);
176✔
246
        if (entry === false) {
176✔
247
            return false;
4✔
248
        }
249

250
        return this.readFrom(entry.partition, entry.position, entry.size);
172✔
251
    }
252

253
    /**
254
     * Read a range of documents from the given position range, in the full index or in the provided index.
255
     * Returns a generator in order to reduce memory usage and be able to read lots of documents with little latency.
256
     *
257
     * @api
258
     * @param {number} from The 1-based document number (inclusive) to start reading from.
259
     * @param {number} [until] The 1-based document number (inclusive) to read until. Defaults to index.length.
260
     * @param {ReadableIndex} [index] The index to use for finding the documents in the range.
261
     * @returns {Generator<object>} A generator that will read each document in the range one by one.
262
     */
263
    *readRange(from, until = -1, index = null) {
68✔
264
        index = index || this.index;
476✔
265
        index.open();
476✔
266

267
        const readFrom = wrapAndCheck(from, index.length);
476✔
268
        const readUntil = wrapAndCheck(until, index.length);
476✔
269
        assert(readFrom > 0 && readUntil > 0, `Range scan error for range ${from} - ${until}.`);
476✔
270

271
        if (readFrom > readUntil) {
460✔
272
            const batchSize = 10;
40✔
273
            let batchUntil = readFrom;
40✔
274
            while (batchUntil > readUntil) {
40✔
275
                const batchFrom = Math.max(readUntil, batchUntil - batchSize);
56✔
276
                yield* reverse(this.iterateRange(batchFrom, batchUntil, index));
56✔
277
                batchUntil = batchFrom - 1;
56✔
278
            }
279
            return undefined;
40✔
280
        }
281

282
        yield* this.iterateRange(readFrom, readUntil, index);
420✔
283
    }
284

285
    /**
286
     * Iterate all documents in this storage in range from to until inside the index.
287
     * @private
288
     * @param {number} from
289
     * @param {number} until
290
     * @param {ReadableIndex} index
291
     * @returns {Generator<object>}
292
     */
293
    *iterateRange(from, until, index) {
294
        const entries = index.range(from, until);
476✔
295
        for (let entry of entries) {
476✔
296
            const document = this.readFrom(entry.partition, entry.position, entry.size);
1,660✔
297
            yield document;
1,656✔
298
        }
299
    }
300

301
    /**
302
     * Open an existing readonly index for reading, without registering it in the secondary indexes write path.
303
     * Use this for indexes whose files carry a status marker (e.g. `stream-foo.closed.index`).
304
     *
305
     * @api
306
     * @param {string} name The readonly index name (e.g. 'stream-foo.closed').
307
     * @returns {ReadableIndex}
308
     * @throws {Error} if the readonly index does not exist.
309
     */
310
    openReadonlyIndex(name) {
311
        if (name in this.readonlyIndexes) {
44!
UNCOV
312
            return this.readonlyIndexes[name];
×
313
        }
314
        const indexName = this.storageFile + '.' + name + '.index';
44✔
315
        assert(fs.existsSync(path.join(this.indexDirectory, indexName)), `Index "${name}" does not exist.`);
44✔
316
        const { index } = this.createIndex(indexName, Object.assign({}, this.indexOptions));
44✔
317
        index.open();
44✔
318
        this.readonlyIndexes[name] = index;
44✔
319
        return index;
44✔
320
    }
321

322
    /**
323
     * Open an existing index.
324
     *
325
     * @api
326
     * @param {string} name The index name.
327
     * @param {Matcher} [matcher] The matcher object or function that the index needs to have been defined with. If not given it will not be validated.
328
     * @returns {ReadableIndex}
329
     * @throws {Error} if the index with that name does not exist.
330
     * @throws {Error} if the HMAC for the matcher does not match.
331
     */
332
    openIndex(name, matcher) {
333
        if (name === '_all') {
628✔
334
            return this.index;
8✔
335
        }
336
        if (name in this.secondaryIndexes) {
620✔
337
            return this.secondaryIndexes[name].index;
552✔
338
        }
339

340
        const indexName = this.storageFile + '.' + name + '.index';
68✔
341
        assert(fs.existsSync(path.join(this.indexDirectory, indexName)), `Index "${name}" does not exist.`);
68✔
342

343
        const metadata = buildMetadataForMatcher(matcher, this.hmac);
64✔
344
        let { index } = this.secondaryIndexes[name] = this.createIndex(indexName, Object.assign({}, this.indexOptions, { metadata }));
64✔
345

346
        index.open();
52✔
347
        return index;
52✔
348
    }
349

350
    /**
351
     * Helper method to iterate over all documents.
352
     *
353
     * @protected
354
     * @param {function(object, EntryInterface)} iterationHandler
355
     */
356
    forEachDocument(iterationHandler) {
357
        /* istanbul ignore if  */
358
        if (typeof iterationHandler !== 'function') {
756✔
359
            return;
360
        }
361

362
        const entries = this.index.all();
756✔
363

364
        for (let entry of entries) {
756✔
365
            const document = this.readFrom(entry.partition, entry.position, entry.size);
1,732✔
366
            iterationHandler(document, entry);
1,732✔
367
        }
368
    }
369

370
    /**
371
     * Helper method to iterate over all secondary indexes.
372
     *
373
     * @protected
374
     * @param {function(ReadableIndex, string)} iterationHandler
375
     * @param {object} [matchDocument] If supplied, only indexes the document matches on will be iterated.
376
     */
377
    forEachSecondaryIndex(iterationHandler, matchDocument) {
378
        /* istanbul ignore if  */
379
        if (typeof iterationHandler !== 'function') {
4,384✔
380
            return;
381
        }
382

383
        for (let indexName of Object.keys(this.secondaryIndexes)) {
4,384✔
384
            if (!matchDocument || matches(matchDocument, this.secondaryIndexes[indexName].matcher)) {
4,528✔
385
                iterationHandler(this.secondaryIndexes[indexName].index, indexName);
2,288✔
386
            }
387
        }
388
    }
389

390
    /**
391
     * Helper method to iterate over all partitions.
392
     *
393
     * @protected
394
     * @param {function(ReadablePartition)} iterationHandler
395
     */
396
    forEachPartition(iterationHandler) {
397
        /* istanbul ignore if  */
398
        if (typeof iterationHandler !== 'function') {
1,320✔
399
            return;
400
        }
401

402
        for (let partition of Object.keys(this.partitions)) {
1,320✔
403
            iterationHandler(this.partitions[partition]);
1,264✔
404
        }
405
    }
406

407
}
408

409
module.exports = ReadableStorage;
4✔
410
module.exports.matches = matches;
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc