• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

albe / node-event-storage / 23402128479

22 Mar 2026 11:31AM UTC coverage: 97.958% (+0.1%) from 97.826%
23402128479

Pull #107

github

web-flow
Merge 967ba2d7d into ea855be31
Pull Request #107: Implement auto-repair

658 of 691 branches covered (95.22%)

Branch coverage included in aggregate %.

54 of 55 new or added lines in 6 files covered. (98.18%)

2 existing lines in 2 files now uncovered.

1501 of 1513 relevant lines covered (99.21%)

1302.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.57
/src/Storage/WritableStorage.js
1
const fs = require('fs');
4✔
2
const path = require('path');
4✔
3
const WritablePartition = require('../Partition/WritablePartition');
4✔
4
const WritableIndex = require('../Index/WritableIndex');
4✔
5
const ReadableStorage = require('./ReadableStorage');
4✔
6
const { assert, matches, buildMetadataForMatcher, buildMatcherFromMetadata, ensureDirectory } = require('../util');
4✔
7

8
const DEFAULT_WRITE_BUFFER_SIZE = 16 * 1024;
4✔
9

10
const LOCK_RECLAIM = 0x1;
4✔
11
const LOCK_THROW = 0x2;
4✔
12

13
class StorageLockedError extends Error {}
14

15
/**
16
 * @typedef {object|function(object):boolean} Matcher
17
 */
18

19
/**
20
 * An append-only storage with highly performant positional range scans.
21
 * It's highly optimized for an event-store and hence does not support compaction or data-rewrite, nor any querying
22
 */
23
class WritableStorage extends ReadableStorage {
24

25
    /**
26
     * @param {string} [storageName] The name of the storage.
27
     * @param {object} [config] An object with storage parameters.
28
     * @param {object} [config.serializer] A serializer object with methods serialize(document) and deserialize(data).
29
     * @param {function(object): string} config.serializer.serialize Default is JSON.stringify.
30
     * @param {function(string): object} config.serializer.deserialize Default is JSON.parse.
31
     * @param {string} [config.dataDirectory] The path where the storage data should reside. Default '.'.
32
     * @param {string} [config.indexDirectory] The path where the indexes should be stored. Defaults to dataDirectory.
33
     * @param {string} [config.indexFile] The name of the primary index. Default '{storageName}.index'.
34
     * @param {number} [config.readBufferSize] Size of the read buffer in bytes. Default 4096.
35
     * @param {number} [config.writeBufferSize] Size of the write buffer in bytes. Default 16384.
36
     * @param {number} [config.maxWriteBufferDocuments] How many documents to have in the write buffer at max. 0 means as much as possible. Default 0.
37
     * @param {boolean} [config.syncOnFlush] If fsync should be called on write buffer flush. Set this if you need strict durability. Defaults to false.
38
     * @param {boolean} [config.dirtyReads] If dirty reads should be allowed. This means that writes that are in write buffer but not yet flushed can be read. Defaults to true.
39
     * @param {function(object, number): string} [config.partitioner] A function that takes a document and sequence number and returns a partition name that the document should be stored in. Defaults to write all documents to the primary partition.
40
     * @param {object} [config.indexOptions] An options object that should be passed to all indexes on construction.
41
     * @param {string} [config.hmacSecret] A private key that is used to verify matchers retrieved from indexes.
42
     * @param {number} [config.lock] One of LOCK_* constants that defines how an existing lock should be handled.
43
     */
44
    constructor(storageName = 'storage', config = {}) {
476!
45
        if (typeof storageName !== 'string') {
848✔
46
            config = storageName;
476✔
47
            storageName = undefined;
476✔
48
        }
49
        const defaults = {
848✔
50
            partitioner: (document, number) => '',
1,580✔
51
            writeBufferSize: DEFAULT_WRITE_BUFFER_SIZE,
52
            maxWriteBufferDocuments: 0,
53
            syncOnFlush: false,
54
            dirtyReads: true,
55
            dataDirectory: '.'
56
        };
57
        config = Object.assign(defaults, config);
848✔
58
        config.indexOptions = Object.assign({ syncOnFlush: config.syncOnFlush }, config.indexOptions);
848✔
59
        ensureDirectory(config.dataDirectory);
848✔
60
        super(storageName, config);
848✔
61

62
        this.lockFile = path.resolve(this.dataDirectory, this.storageFile + '.lock');
848✔
63
        if (config.lock === LOCK_RECLAIM) {
848✔
64
            this.unlock();
8✔
65
        }
66
        this.partitioner = config.partitioner;
848✔
67
    }
68

69
    /**
70
     * @inheritDoc
71
     * @returns {boolean}
72
     * @throws {StorageLockedError} If this storage is locked by another process.
73
     */
74
    open() {
75
        if (!this.lock()) {
784✔
76
            return true;
4✔
77
        }
78
        return super.open();
776✔
79
    }
80

81
    /**
82
     * Check all partitions torn writes and truncate the storage to the position before the first torn write.
83
     * This might delete correctly written events in partitions, if their sequence number is higher than the
84
     * torn write in another partition.
85
     * Also detects when the primary index is lagging behind the actual partition data and emits a
86
     * 'primary-index-lagging' event in that case.
87
     */
88
    checkTornWrites() {
89
        let lastValidSequenceNumber = Number.MAX_SAFE_INTEGER;
24✔
90
        let maxPartitionSequenceNumber = -1;
24✔
91
        this.forEachPartition(partition => {
24✔
92
            partition.open();
28✔
93
            const result = partition.checkTornWrite();
28✔
94
            if (result < 0) {
28✔
95
                // Torn write: result encodes -(tornSeqnum + 1), so torn seqnum = -result - 1.
96
                const tornSeqnum = -result - 1;
16✔
97
                lastValidSequenceNumber = Math.min(lastValidSequenceNumber, tornSeqnum);
16✔
98
                // Any complete documents before the torn one contribute to the lagging check.
99
                // Their last seqnum is tornSeqnum - 1 (if > 0; otherwise no complete docs).
100
                if (tornSeqnum > 0) {
16✔
101
                    maxPartitionSequenceNumber = Math.max(maxPartitionSequenceNumber, tornSeqnum - 1);
12✔
102
                }
103
            } else if (result > 0) {
12!
104
                // No torn write: result encodes (lastCompleteSeqnum + 1), so seqnum = result - 1.
105
                maxPartitionSequenceNumber = Math.max(maxPartitionSequenceNumber, result - 1);
12✔
106
            }
107
            // result === 0: empty partition, no action needed.
108
        });
109
        if (lastValidSequenceNumber < Number.MAX_SAFE_INTEGER) {
24✔
110
            this.truncate(lastValidSequenceNumber);
16✔
111
            // After truncation, account for documents beyond the truncation point being removed.
112
            // truncate(N) keeps index entries 1..N, so the last kept partition seqnum is N-1.
113
            maxPartitionSequenceNumber = Math.min(maxPartitionSequenceNumber, lastValidSequenceNumber - 1);
16✔
114
        }
115
        // A partition seqnum of N means the document was written when the index had N entries,
116
        // so the index should contain at least N+1 entries to be consistent.
117
        if (maxPartitionSequenceNumber >= 0 && maxPartitionSequenceNumber + 1 > this.index.length) {
24✔
118
            this.emit('primary-index-lagging', maxPartitionSequenceNumber + 1, this.index.length);
8✔
119
        }
120
        this.forEachPartition(partition => partition.close());
28✔
121
    }
122

123
    /**
124
     * Attempt to lock this storage by means of a lock directory.
125
     * @returns {boolean} True if the lock was created or false if the lock is already in place.
126
     * @throws {StorageLockedError} If this storage is already locked by another process.
127
     * @throws {Error} If the lock could not be created.
128
     */
129
    lock() {
130
        if (this.locked) {
784✔
131
            return false;
4✔
132
        }
133
        try {
780✔
134
            fs.mkdirSync(this.lockFile);
780✔
135
            this.locked = true;
776✔
136
        } catch (e) {
137
            /* istanbul ignore if */
138
            if (e.code !== 'EEXIST') {
4✔
139
                throw new Error(`Error creating lock for storage ${this.storageFile}: ` + e.message);
140
            }
141
            throw new StorageLockedError(`Storage ${this.storageFile} is locked by another process`);
4✔
142
        }
143
        return true;
776✔
144
    }
145

146
    /**
147
     * Unlock this storage, no matter if it was previously locked by this writer.
148
     * Only use this if you are sure there is no other process still having a writer open.
149
     * Current implementation just deletes a lock file that is named like the storage.
150
     */
151
    unlock() {
152
        if (fs.existsSync(this.lockFile)) {
784✔
153
            if (!this.locked) {
776✔
154
                this.checkTornWrites();
8✔
155
            }
156
            fs.rmdirSync(this.lockFile);
776✔
157
        }
158
        this.locked = false;
784✔
159
    }
160

161
    /**
162
     * @inheritDoc
163
     */
164
    close() {
165
        if (this.locked) {
1,292✔
166
            this.unlock();
776✔
167
        }
168
        super.close();
1,292✔
169
    }
170

171
    /**
172
     * Add an index entry for the given document at the position and size.
173
     *
174
     * @private
175
     * @param {number} partitionId The partition where the document is stored.
176
     * @param {number} position The file offset where the document is stored.
177
     * @param {number} size The size of the stored document.
178
     * @param {object} document The document to add to the index.
179
     * @param {function} [callback] The callback to call when the index is written to disk.
180
     * @returns {EntryInterface} The index entry item.
181
     */
182
    addIndex(partitionId, position, size, document, callback) {
183
        if (!this.index.isOpen()) {
2,604✔
184
            this.index.open();
4✔
185
        }
186

187
        /*if (this.index.lastEntry.position + this.index.lastEntry.size !== position) {
188
         this.emit('index-corrupted');
189
         throw new Error('Corrupted index, needs to be rebuilt!');
190
         }*/
191

192
        const entry = new WritableIndex.Entry(this.index.length + 1, position, size, partitionId);
2,604✔
193
        this.index.add(entry, (indexPosition) => {
2,604✔
194
            this.emit('wrote', document, entry, indexPosition);
2,604✔
195
            /* istanbul ignore if  */
196
            if (typeof callback === 'function') {
2,604✔
197
                return callback(indexPosition);
198
            }
199
        });
200
        return entry;
2,604✔
201
    }
202

203
    /**
204
     * Register a handler that is called before a document is written to storage.
205
     * The handler receives the document and the partition metadata and may throw to abort the write.
206
     * Multiple handlers can be registered; all run on every write in registration order.
207
     * Equivalent to `storage.on('preCommit', hook)`.
208
     *
209
     * @api
210
     * @param {function(object, object): void} hook A function receiving (document, partitionMetadata).
211
     */
212
    preCommit(hook) {
213
        this.on('preCommit', hook);
16✔
214
    }
215

216
    /**
217
     * Get a partition either by name or by id.
218
     * If a partition with the given name does not exist, a new one will be created.
219
     * If a partition with the given id does not exist, an error is thrown.
220
     *
221
     * @protected
222
     * @param {string|number} partitionIdentifier The partition name or the partition Id
223
     * @returns {ReadablePartition}
224
     * @throws {Error} If an id is given and no such partition exists.
225
     */
226
    getPartition(partitionIdentifier) {
227
        if (typeof partitionIdentifier === 'string') {
6,620✔
228
            const partitionShortName = partitionIdentifier;
2,628✔
229
            const partitionName = this.storageFile + (partitionIdentifier.length ? '.' + partitionIdentifier : '');
2,628✔
230
            partitionIdentifier = WritablePartition.idFor(partitionName);
2,628✔
231
            if (!this.partitions[partitionIdentifier]) {
2,628✔
232
                const partitionConfig = typeof this.partitionConfig.metadata === 'function'
812✔
233
                    ? { ...this.partitionConfig, metadata: this.partitionConfig.metadata(partitionShortName) }
234
                    : this.partitionConfig;
235
                this.partitions[partitionIdentifier] = this.createPartition(partitionName, partitionConfig);
812✔
236
                this.emit('partition-created', partitionIdentifier);
812✔
237
            }
238
            this.partitions[partitionIdentifier].open();
2,628✔
239
            return this.partitions[partitionIdentifier];
2,628✔
240
        }
241
        return super.getPartition(partitionIdentifier);
3,992✔
242
    }
243

244
    /**
245
     * @api
246
     * @param {object} document The document to write to storage.
247
     * @param {function} [callback] A function that will be called when the document is written to disk.
248
     * @returns {number} The 1-based document sequence number in the storage.
249
     */
250
    write(document, callback) {
251
        const data = this.serializer.serialize(document).toString();
2,612✔
252
        const dataSize = Buffer.byteLength(data, 'utf8');
2,612✔
253

254
        const partitionName = this.partitioner(document, this.index.length + 1);
2,612✔
255
        const partition = this.getPartition(partitionName);
2,612✔
256
        if (this.listenerCount('preCommit') > 0) {
2,612✔
257
            this.emit('preCommit', document, partition.metadata);
56✔
258
        }
259
        const position = partition.write(data, this.length, callback);
2,604✔
260

261
        assert(position !== false, 'Error writing document.');
2,604✔
262

263
        const indexEntry = this.addIndex(partition.id, position, dataSize, document);
2,604✔
264
        this.forEachSecondaryIndex((index, name) => {
2,604✔
265
            if (!index.isOpen()) {
1,480✔
266
                index.open();
4✔
267
            }
268
            index.add(indexEntry);
1,480✔
269
            this.emit('index-add', name, index.length, document);
1,480✔
270
        }, document);
271

272
        return this.index.length;
2,604✔
273
    }
274

275
    /**
276
     * Ensure that an index with the given name and document matcher exists.
277
     * Will create the index if it doesn't exist, otherwise return the existing index.
278
     *
279
     * @api
280
     * @param {string} name The index name.
281
     * @param {Matcher} [matcher] An object that describes the document properties that need to match to add it this index or a function that receives a document and returns true if the document should be indexed.
282
     * @returns {ReadableIndex} The index containing all documents that match the query.
283
     * @throws {Error} if the index doesn't exist yet and no matcher was specified.
284
     */
285
    ensureIndex(name, matcher) {
286
        if (name === '_all') {
860✔
287
            return this.index;
4✔
288
        }
289
        if (name in this.secondaryIndexes) {
856✔
290
            return this.secondaryIndexes[name].index;
4✔
291
        }
292

293
        const indexName = this.storageFile + '.' + name + '.index';
852✔
294
        if (fs.existsSync(path.join(this.indexDirectory, indexName))) {
852✔
295
            return this.openIndex(name, matcher);
16✔
296
        }
297

298
        assert((typeof matcher === 'object' || typeof matcher === 'function') && matcher !== null, 'Need to specify a matcher.');
836✔
299

300
        const metadata = buildMetadataForMatcher(matcher, this.hmac);
832✔
301
        const { index } = this.createIndex(indexName, Object.assign({}, this.indexOptions, { metadata }));
832✔
302
        try {
832✔
303
            this.forEachDocument((document, indexEntry) => {
832✔
304
                if (matches(document, matcher)) {
1,752✔
305
                    index.add(indexEntry);
16✔
306
                }
307
            });
308
        } catch (e) {
309
            index.destroy();
4✔
310
            throw e;
4✔
311
        }
312

313
        this.secondaryIndexes[name] = { index, matcher };
828✔
314
        this.emit('index-created', name);
828✔
315
        return index;
828✔
316
    }
317

318
    /**
319
     * Flush all write buffers to disk.
320
     * This is a sync method and will invoke all previously registered flush callbacks.
321
     *
322
     * @api
323
     * @returns {boolean} Returns true if a flush on any partition or the main index was executed.
324
     */
325
    flush() {
326
        let result = this.index.flush();
76✔
327
        this.forEachPartition(partition => result = result | partition.flush());
76✔
328
        this.forEachSecondaryIndex(index => index.flush());
76✔
329
        return result;
76✔
330
    }
331

332
    /**
333
     * Iterate all distinct partitions in which the given iterable list of entries are stored.
334
     * @param {Iterable<Index.Entry>} entries
335
     * @param {function(Index.Entry)} iterationHandler
336
     */
337
    forEachDistinctPartitionOf(entries, iterationHandler) {
338
        const partitions = [];
40✔
339
        const numPartitions = Object.keys(this.partitions).length;
40✔
340
        for (let entry of entries) {
40✔
341
            if (partitions.indexOf(entry.partition) >= 0) {
60✔
342
                continue;
16✔
343
            }
344
            partitions.push(entry.partition);
44✔
345
            iterationHandler(entry);
44✔
346
            if (partitions.length === numPartitions) {
44✔
347
                break;
24✔
348
            }
349
        }
350
    }
351

352
    /**
353
     * Truncate all partitions after the given (global) sequence number.
354
     *
355
     * @private
356
     * @param {number} after The document sequence number to truncate after.
357
     */
358
    truncatePartitions(after) {
359
        if (after === 0) {
60✔
360
            this.forEachPartition(partition => partition.truncate(0));
12✔
361
            return;
12✔
362
        }
363

364
        const entries = this.index.range(after + 1);  // We need the first entry that is cut off
48✔
365
        if (entries === false || entries.length === 0) {
48✔
366
            return;
8✔
367
        }
368

369
        this.forEachDistinctPartitionOf(entries, entry => this.getPartition(entry.partition).truncate(entry.position));
44✔
370
    }
371

372
    /**
373
     * Truncate the storage after the given sequence number.
374
     *
375
     * @param {number} after The document sequence number to truncate after.
376
     */
377
    truncate(after) {
378
        /*
379
         To truncate the store following steps need to be done:
380

381
         1) find all partition positions after which their files should be truncated
382
         2) truncate all partitions accordingly
383
         3) truncate/rewrite all indexes
384
         */
385
        if (!this.index.isOpen()) {
60✔
386
            this.index.open();
4✔
387
        }
388
        if (after < 0) {
60!
UNCOV
389
            after += this.index.length;
×
390
        }
391

392
        this.truncatePartitions(after);
60✔
393

394
        this.index.truncate(after);
60✔
395
        this.forEachSecondaryIndex(index => {
60✔
396
            /* istanbul ignore if */
397
            if (!(index instanceof WritableIndex)) {
32✔
398
                return;
399
            }
400
            let closeIndex = false;
32✔
401
            if (!index.isOpen()) {
32✔
402
                index.open();
4✔
403
                closeIndex = true;
4✔
404
            }
405
            index.truncate(index.find(after));
32✔
406
            if (closeIndex) {
32✔
407
                index.close();
4✔
408
            }
409
        });
410
    }
411

412
    /**
413
     * @inheritDoc
414
     * Open an existing secondary index and repair any stale entries beyond the current primary
415
     * index length. Stale entries can be present when checkTornWrites() truncated the primary
416
     * index before this secondary index was loaded into memory.
417
     */
418
    openIndex(name, matcher) {
419
        const index = super.openIndex(name, matcher);
705✔
420
        const lastEntry = index.lastEntry;
689✔
421
        if (lastEntry !== false && lastEntry.number > this.index.length) {
689✔
422
            index.truncate(index.find(this.index.length));
8✔
423
        }
424
        return index;
689✔
425
    }
426

427
    /**
428
     * @protected
429
     * @param {string} name
430
     * @param {object} [options]
431
     * @returns {{ index: WritableIndex, matcher: Matcher }}
432
     */
433
    createIndex(name, options = {}) {
×
434
        const index = new WritableIndex(name, options);
1,784✔
435
        let matcher;
436

437
        // If the index contains a matcher (possibly a serialized function) we check HMAC
438
        // to prevent evaluating unknown code.
439
        if (index.metadata.matcher) {
1,776✔
440
            try {
928✔
441
                matcher = buildMatcherFromMetadata(index.metadata, this.hmac);
928✔
442
            } catch (e) {
443
                index.destroy();
4✔
444
                throw e;
4✔
445
            }
446
        }
447

448
        return { index, matcher };
1,772✔
449
    }
450

451
    /**
452
     * @protected
453
     * @param {string} name
454
     * @param {object} [config]
455
     * @returns {WritablePartition}
456
     */
457
    createPartition(name, config = {}) {
×
458
        return new WritablePartition(name, config);
904✔
459
    }
460

461
}
462

463
module.exports = WritableStorage;
4✔
464
module.exports.StorageLockedError = StorageLockedError;
4✔
465
module.exports.CorruptFileError = ReadableStorage.CorruptFileError;
4✔
466
module.exports.LOCK_THROW = LOCK_THROW;
4✔
467
module.exports.LOCK_RECLAIM = LOCK_RECLAIM;
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc