• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

albe / node-event-storage / 23383240597

21 Mar 2026 03:55PM UTC coverage: 97.927% (+0.1%) from 97.826%
23383240597

Pull #107

github

web-flow
Merge 2c94d3694 into eaa8ee102
Pull Request #107: Start implementing auto-repair

639 of 672 branches covered (95.09%)

Branch coverage included in aggregate %.

32 of 33 new or added lines in 5 files covered. (96.97%)

2 existing lines in 2 files now uncovered.

1487 of 1499 relevant lines covered (99.2%)

1295.2 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.36
/src/Storage/WritableStorage.js
1
const fs = require('fs');
4✔
2
const path = require('path');
4✔
3
const WritablePartition = require('../Partition/WritablePartition');
4✔
4
const WritableIndex = require('../Index/WritableIndex');
4✔
5
const ReadableStorage = require('./ReadableStorage');
4✔
6
const { assert, matches, buildMetadataForMatcher, buildMatcherFromMetadata, ensureDirectory } = require('../util');
4✔
7

8
const DEFAULT_WRITE_BUFFER_SIZE = 16 * 1024;
4✔
9

10
const LOCK_RECLAIM = 0x1;
4✔
11
const LOCK_THROW = 0x2;
4✔
12

13
class StorageLockedError extends Error {}
14

15
/**
16
 * @typedef {object|function(object):boolean} Matcher
17
 */
18

19
/**
20
 * An append-only storage with highly performant positional range scans.
21
 * It's highly optimized for an event-store and hence does not support compaction or data-rewrite, nor any querying
22
 */
23
class WritableStorage extends ReadableStorage {
24

25
    /**
26
     * @param {string} [storageName] The name of the storage.
27
     * @param {object} [config] An object with storage parameters.
28
     * @param {object} [config.serializer] A serializer object with methods serialize(document) and deserialize(data).
29
     * @param {function(object): string} config.serializer.serialize Default is JSON.stringify.
30
     * @param {function(string): object} config.serializer.deserialize Default is JSON.parse.
31
     * @param {string} [config.dataDirectory] The path where the storage data should reside. Default '.'.
32
     * @param {string} [config.indexDirectory] The path where the indexes should be stored. Defaults to dataDirectory.
33
     * @param {string} [config.indexFile] The name of the primary index. Default '{storageName}.index'.
34
     * @param {number} [config.readBufferSize] Size of the read buffer in bytes. Default 4096.
35
     * @param {number} [config.writeBufferSize] Size of the write buffer in bytes. Default 16384.
36
     * @param {number} [config.maxWriteBufferDocuments] How many documents to have in the write buffer at max. 0 means as much as possible. Default 0.
37
     * @param {boolean} [config.syncOnFlush] If fsync should be called on write buffer flush. Set this if you need strict durability. Defaults to false.
38
     * @param {boolean} [config.dirtyReads] If dirty reads should be allowed. This means that writes that are in write buffer but not yet flushed can be read. Defaults to true.
39
     * @param {function(object, number): string} [config.partitioner] A function that takes a document and sequence number and returns a partition name that the document should be stored in. Defaults to write all documents to the primary partition.
40
     * @param {object} [config.indexOptions] An options object that should be passed to all indexes on construction.
41
     * @param {string} [config.hmacSecret] A private key that is used to verify matchers retrieved from indexes.
42
     * @param {number} [config.lock] One of LOCK_* constants that defines how an existing lock should be handled.
43
     */
44
    constructor(storageName = 'storage', config = {}) {
448!
45
        if (typeof storageName !== 'string') {
820✔
46
            config = storageName;
448✔
47
            storageName = undefined;
448✔
48
        }
49
        const defaults = {
820✔
50
            partitioner: (document, number) => '',
1,500✔
51
            writeBufferSize: DEFAULT_WRITE_BUFFER_SIZE,
52
            maxWriteBufferDocuments: 0,
53
            syncOnFlush: false,
54
            dirtyReads: true,
55
            dataDirectory: '.'
56
        };
57
        config = Object.assign(defaults, config);
820✔
58
        config.indexOptions = Object.assign({ syncOnFlush: config.syncOnFlush }, config.indexOptions);
820✔
59
        ensureDirectory(config.dataDirectory);
820✔
60
        super(storageName, config);
820✔
61

62
        this.lockFile = path.resolve(this.dataDirectory, this.storageFile + '.lock');
820✔
63
        if (config.lock === LOCK_RECLAIM) {
820✔
64
            this.unlock();
8✔
65
        }
66
        this.partitioner = config.partitioner;
820✔
67
    }
68

69
    /**
70
     * @inheritDoc
71
     * @returns {boolean}
72
     * @throws {StorageLockedError} If this storage is locked by another process.
73
     */
74
    open() {
75
        if (!this.lock()) {
756✔
76
            return true;
4✔
77
        }
78
        return super.open();
748✔
79
    }
80

81
    /**
82
     * Check all partitions torn writes and truncate the storage to the position before the first torn write.
83
     * This might delete correctly written events in partitions, if their sequence number is higher than the
84
     * torn write in another partition.
85
     */
86
    checkTornWrites() {
87
        let lastValidSequenceNumber = Number.MAX_SAFE_INTEGER;
8✔
88
        this.forEachPartition(partition => {
8✔
89
            partition.open();
12✔
90
            const tornSequenceNumber = partition.checkTornWrite();
12✔
91
            if (tornSequenceNumber >= 0) {
12✔
92
                lastValidSequenceNumber = Math.min(lastValidSequenceNumber, tornSequenceNumber);
8✔
93
            }
94
        });
95
        if (lastValidSequenceNumber < Number.MAX_SAFE_INTEGER) {
8!
96
            this.truncate(lastValidSequenceNumber);
8✔
97
        }
98
        this.forEachPartition(partition => partition.close());
12✔
99
    }
100

101
    /**
102
     * Attempt to lock this storage by means of a lock directory.
103
     * @returns {boolean} True if the lock was created or false if the lock is already in place.
104
     * @throws {StorageLockedError} If this storage is already locked by another process.
105
     * @throws {Error} If the lock could not be created.
106
     */
107
    lock() {
108
        if (this.locked) {
756✔
109
            return false;
4✔
110
        }
111
        try {
752✔
112
            fs.mkdirSync(this.lockFile);
752✔
113
            this.locked = true;
748✔
114
        } catch (e) {
115
            /* istanbul ignore if */
116
            if (e.code !== 'EEXIST') {
4✔
117
                throw new Error(`Error creating lock for storage ${this.storageFile}: ` + e.message);
118
            }
119
            throw new StorageLockedError(`Storage ${this.storageFile} is locked by another process`);
4✔
120
        }
121
        return true;
748✔
122
    }
123

124
    /**
125
     * Unlock this storage, no matter if it was previously locked by this writer.
126
     * Only use this if you are sure there is no other process still having a writer open.
127
     * Current implementation just deletes a lock file that is named like the storage.
128
     */
129
    unlock() {
130
        if (fs.existsSync(this.lockFile)) {
756✔
131
            if (!this.locked) {
748✔
132
                this.checkTornWrites();
8✔
133
            }
134
            fs.rmdirSync(this.lockFile);
748✔
135
        }
136
        this.locked = false;
756✔
137
    }
138

139
    /**
140
     * @inheritDoc
141
     */
142
    close() {
143
        if (this.locked) {
1,236✔
144
            this.unlock();
748✔
145
        }
146
        super.close();
1,236✔
147
    }
148

149
    /**
150
     * Add an index entry for the given document at the position and size.
151
     *
152
     * @private
153
     * @param {number} partitionId The partition where the document is stored.
154
     * @param {number} position The file offset where the document is stored.
155
     * @param {number} size The size of the stored document.
156
     * @param {object} document The document to add to the index.
157
     * @param {function} [callback] The callback to call when the index is written to disk.
158
     * @returns {EntryInterface} The index entry item.
159
     */
160
    addIndex(partitionId, position, size, document, callback) {
161
        if (!this.index.isOpen()) {
2,524✔
162
            this.index.open();
4✔
163
        }
164

165
        /*if (this.index.lastEntry.position + this.index.lastEntry.size !== position) {
166
         this.emit('index-corrupted');
167
         throw new Error('Corrupted index, needs to be rebuilt!');
168
         }*/
169

170
        const entry = new WritableIndex.Entry(this.index.length + 1, position, size, partitionId);
2,524✔
171
        this.index.add(entry, (indexPosition) => {
2,524✔
172
            this.emit('wrote', document, entry, indexPosition);
2,524✔
173
            /* istanbul ignore if  */
174
            if (typeof callback === 'function') {
2,524✔
175
                return callback(indexPosition);
176
            }
177
        });
178
        return entry;
2,524✔
179
    }
180

181
    /**
182
     * Register a handler that is called before a document is written to storage.
183
     * The handler receives the document and the partition metadata and may throw to abort the write.
184
     * Multiple handlers can be registered; all run on every write in registration order.
185
     * Equivalent to `storage.on('preCommit', hook)`.
186
     *
187
     * @api
188
     * @param {function(object, object): void} hook A function receiving (document, partitionMetadata).
189
     */
190
    preCommit(hook) {
191
        this.on('preCommit', hook);
16✔
192
    }
193

194
    /**
195
     * Get a partition either by name or by id.
196
     * If a partition with the given name does not exist, a new one will be created.
197
     * If a partition with the given id does not exist, an error is thrown.
198
     *
199
     * @protected
200
     * @param {string|number} partitionIdentifier The partition name or the partition Id
201
     * @returns {ReadablePartition}
202
     * @throws {Error} If an id is given and no such partition exists.
203
     */
204
    getPartition(partitionIdentifier) {
205
        if (typeof partitionIdentifier === 'string') {
6,536✔
206
            const partitionShortName = partitionIdentifier;
2,548✔
207
            const partitionName = this.storageFile + (partitionIdentifier.length ? '.' + partitionIdentifier : '');
2,548✔
208
            partitionIdentifier = WritablePartition.idFor(partitionName);
2,548✔
209
            if (!this.partitions[partitionIdentifier]) {
2,548✔
210
                const partitionConfig = typeof this.partitionConfig.metadata === 'function'
796✔
211
                    ? { ...this.partitionConfig, metadata: this.partitionConfig.metadata(partitionShortName) }
212
                    : this.partitionConfig;
213
                this.partitions[partitionIdentifier] = this.createPartition(partitionName, partitionConfig);
796✔
214
                this.emit('partition-created', partitionIdentifier);
796✔
215
            }
216
            this.partitions[partitionIdentifier].open();
2,548✔
217
            return this.partitions[partitionIdentifier];
2,548✔
218
        }
219
        return super.getPartition(partitionIdentifier);
3,988✔
220
    }
221

222
    /**
223
     * @api
224
     * @param {object} document The document to write to storage.
225
     * @param {function} [callback] A function that will be called when the document is written to disk.
226
     * @returns {number} The 1-based document sequence number in the storage.
227
     */
228
    write(document, callback) {
229
        const data = this.serializer.serialize(document).toString();
2,532✔
230
        const dataSize = Buffer.byteLength(data, 'utf8');
2,532✔
231

232
        const partitionName = this.partitioner(document, this.index.length + 1);
2,532✔
233
        const partition = this.getPartition(partitionName);
2,532✔
234
        this.emit('preCommit', document, partition.metadata);
2,532✔
235
        const position = partition.write(data, this.length, callback);
2,524✔
236

237
        assert(position !== false, 'Error writing document.');
2,524✔
238

239
        const indexEntry = this.addIndex(partition.id, position, dataSize, document);
2,524✔
240
        this.forEachSecondaryIndex((index, name) => {
2,524✔
241
            if (!index.isOpen()) {
1,480✔
242
                index.open();
4✔
243
            }
244
            index.add(indexEntry);
1,480✔
245
            this.emit('index-add', name, index.length, document);
1,480✔
246
        }, document);
247

248
        return this.index.length;
2,524✔
249
    }
250

251
    /**
252
     * Ensure that an index with the given name and document matcher exists.
253
     * Will create the index if it doesn't exist, otherwise return the existing index.
254
     *
255
     * @api
256
     * @param {string} name The index name.
257
     * @param {Matcher} [matcher] An object that describes the document properties that need to match to add it this index or a function that receives a document and returns true if the document should be indexed.
258
     * @returns {ReadableIndex} The index containing all documents that match the query.
259
     * @throws {Error} if the index doesn't exist yet and no matcher was specified.
260
     */
261
    ensureIndex(name, matcher) {
262
        if (name === '_all') {
860✔
263
            return this.index;
4✔
264
        }
265
        if (name in this.secondaryIndexes) {
856✔
266
            return this.secondaryIndexes[name].index;
4✔
267
        }
268

269
        const indexName = this.storageFile + '.' + name + '.index';
852✔
270
        if (fs.existsSync(path.join(this.indexDirectory, indexName))) {
852✔
271
            return this.openIndex(name, matcher);
16✔
272
        }
273

274
        assert((typeof matcher === 'object' || typeof matcher === 'function') && matcher !== null, 'Need to specify a matcher.');
836✔
275

276
        const metadata = buildMetadataForMatcher(matcher, this.hmac);
832✔
277
        const { index } = this.createIndex(indexName, Object.assign({}, this.indexOptions, { metadata }));
832✔
278
        try {
832✔
279
            this.forEachDocument((document, indexEntry) => {
832✔
280
                if (matches(document, matcher)) {
1,752✔
281
                    index.add(indexEntry);
16✔
282
                }
283
            });
284
        } catch (e) {
285
            index.destroy();
4✔
286
            throw e;
4✔
287
        }
288

289
        this.secondaryIndexes[name] = { index, matcher };
828✔
290
        this.emit('index-created', name);
828✔
291
        return index;
828✔
292
    }
293

294
    /**
295
     * Flush all write buffers to disk.
296
     * This is a sync method and will invoke all previously registered flush callbacks.
297
     *
298
     * @api
299
     * @returns {boolean} Returns true if a flush on any partition or the main index was executed.
300
     */
301
    flush() {
302
        let result = this.index.flush();
60✔
303
        this.forEachPartition(partition => result = result | partition.flush());
60✔
304
        this.forEachSecondaryIndex(index => index.flush());
60✔
305
        return result;
60✔
306
    }
307

308
    /**
309
     * Iterate all distinct partitions in which the given iterable list of entries are stored.
310
     * @param {Iterable<Index.Entry>} entries
311
     * @param {function(Index.Entry)} iterationHandler
312
     */
313
    forEachDistinctPartitionOf(entries, iterationHandler) {
314
        const partitions = [];
36✔
315
        const numPartitions = Object.keys(this.partitions).length;
36✔
316
        for (let entry of entries) {
36✔
317
            if (partitions.indexOf(entry.partition) >= 0) {
56✔
318
                continue;
16✔
319
            }
320
            partitions.push(entry.partition);
40✔
321
            iterationHandler(entry);
40✔
322
            if (partitions.length === numPartitions) {
40✔
323
                break;
20✔
324
            }
325
        }
326
    }
327

328
    /**
329
     * Truncate all partitions after the given (global) sequence number.
330
     *
331
     * @private
332
     * @param {number} after The document sequence number to truncate after.
333
     */
334
    truncatePartitions(after) {
335
        if (after === 0) {
52✔
336
            this.forEachPartition(partition => partition.truncate(0));
12✔
337
            return;
12✔
338
        }
339

340
        const entries = this.index.range(after + 1);  // We need the first entry that is cut off
40✔
341
        if (entries === false || entries.length === 0) {
40✔
342
            return;
4✔
343
        }
344

345
        this.forEachDistinctPartitionOf(entries, entry => this.getPartition(entry.partition).truncate(entry.position));
40✔
346
    }
347

348
    /**
349
     * Truncate the storage after the given sequence number.
350
     *
351
     * @param {number} after The document sequence number to truncate after.
352
     */
353
    truncate(after) {
354
        /*
355
         To truncate the store following steps need to be done:
356

357
         1) find all partition positions after which their files should be truncated
358
         2) truncate all partitions accordingly
359
         3) truncate/rewrite all indexes
360
         */
361
        if (!this.index.isOpen()) {
52✔
362
            this.index.open();
4✔
363
        }
364
        if (after < 0) {
52!
UNCOV
365
            after += this.index.length;
×
366
        }
367

368
        this.truncatePartitions(after);
52✔
369

370
        this.index.truncate(after);
52✔
371
        this.forEachSecondaryIndex(index => {
52✔
372
            /* istanbul ignore if */
373
            if (!(index instanceof WritableIndex)) {
32✔
374
                return;
375
            }
376
            let closeIndex = false;
32✔
377
            if (!index.isOpen()) {
32✔
378
                index.open();
4✔
379
                closeIndex = true;
4✔
380
            }
381
            index.truncate(index.find(after));
32✔
382
            if (closeIndex) {
32✔
383
                index.close();
4✔
384
            }
385
        });
386
    }
387

388
    /**
389
     * @inheritDoc
390
     * Open an existing secondary index and repair any stale entries beyond the current primary
391
     * index length. Stale entries can be present when checkTornWrites() truncated the primary
392
     * index before this secondary index was loaded into memory.
393
     */
394
    openIndex(name, matcher) {
395
        const index = super.openIndex(name, matcher);
705✔
396
        const lastEntry = index.lastEntry;
689✔
397
        if (lastEntry !== false && lastEntry.number > this.index.length) {
689✔
398
            index.truncate(index.find(this.index.length));
8✔
399
        }
400
        return index;
689✔
401
    }
402

403
    /**
404
     * @protected
405
     * @param {string} name
406
     * @param {object} [options]
407
     * @returns {{ index: WritableIndex, matcher: Matcher }}
408
     */
409
    createIndex(name, options = {}) {
×
410
        const index = new WritableIndex(name, options);
1,756✔
411
        let matcher;
412

413
        // If the index contains a matcher (possibly a serialized function) we check HMAC
414
        // to prevent evaluating unknown code.
415
        if (index.metadata.matcher) {
1,748✔
416
            try {
928✔
417
                matcher = buildMatcherFromMetadata(index.metadata, this.hmac);
928✔
418
            } catch (e) {
419
                index.destroy();
4✔
420
                throw e;
4✔
421
            }
422
        }
423

424
        return { index, matcher };
1,744✔
425
    }
426

427
    /**
428
     * @protected
429
     * @param {string} name
430
     * @param {object} [config]
431
     * @returns {WritablePartition}
432
     */
433
    createPartition(name, config = {}) {
×
434
        return new WritablePartition(name, config);
876✔
435
    }
436

437
}
438

439
module.exports = WritableStorage;
4✔
440
module.exports.StorageLockedError = StorageLockedError;
4✔
441
module.exports.CorruptFileError = ReadableStorage.CorruptFileError;
4✔
442
module.exports.LOCK_THROW = LOCK_THROW;
4✔
443
module.exports.LOCK_RECLAIM = LOCK_RECLAIM;
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc