• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

albe / node-event-storage / 26133545023

20 May 2026 12:24AM UTC coverage: 97.956% (-0.2%) from 98.106%
26133545023

Pull #316

github

web-flow
Merge 515a2b27f into 03ae7bb7f
Pull Request #316: Add `event-storage-http` REST API layer with NDJSON reads, consumer endpoints, and raw buffer streaming

1078 of 1131 branches covered (95.31%)

Branch coverage included in aggregate %.

258 of 270 new or added lines in 10 files covered. (95.56%)

22 existing lines in 5 files now uncovered.

5536 of 5621 relevant lines covered (98.49%)

823.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.65
/src/Storage/WritableStorage.js
1
import fs from 'fs';
4✔
2
import path from 'path';
4✔
3
import WritablePartition from '../Partition/WritablePartition.js';
4✔
4
import WritableIndex, { Entry as WritableIndexEntry } from '../Index/WritableIndex.js';
4✔
5
import ReadableStorage from './ReadableStorage.js';
4✔
6
import { assert } from '../util.js';
4✔
7
import { ensureDirectory } from '../fsUtil.js';
4✔
8
import { matches, buildMetadataForMatcher, buildMatcherFromMetadata } from '../metadataUtil.js';
4✔
9

4✔
10
const DEFAULT_WRITE_BUFFER_SIZE = 16 * 1024;
4✔
11

4✔
12
const LOCK_RECLAIM = 0x1;
4✔
13
const LOCK_THROW = 0x2;
4✔
14

4✔
15
class StorageLockedError extends Error {}
4✔
16

4✔
17
/**
4✔
18
 * @typedef {object|function(object):boolean} Matcher
4✔
19
 */
4✔
20

4✔
21
/**
4✔
22
 * An append-only storage with highly performant positional range scans.
4✔
23
 * It's highly optimized for an event-store and hence does not support compaction or data-rewrite, nor any querying
4✔
24
 */
4✔
25
class WritableStorage extends ReadableStorage {
4✔
26

4✔
27
    /**
4✔
28
     * @param {string} [storageName] The name of the storage.
4✔
29
     * @param {object} [config] An object with storage parameters.
4✔
30
     * @param {object} [config.serializer] A serializer object with methods serialize(document) and deserialize(data).
4✔
31
     * @param {function(object): string} config.serializer.serialize Default is JSON.stringify.
4✔
32
     * @param {function(string): object} config.serializer.deserialize Default is JSON.parse.
4✔
33
     * @param {string} [config.dataDirectory] The path where the storage data should reside. Default '.'.
4✔
34
     * @param {string} [config.indexDirectory] The path where the indexes should be stored. Defaults to dataDirectory.
4✔
35
     * @param {string} [config.indexFile] The name of the primary index. Default '{storageName}.index'.
4✔
36
     * @param {number} [config.readBufferSize] Size of the read buffer in bytes. Default 4096.
4✔
37
     * @param {number} [config.writeBufferSize] Size of the write buffer in bytes. Default 16384.
4✔
38
     * @param {number} [config.maxWriteBufferDocuments] How many documents to have in the write buffer at max. 0 means as much as possible. Default 0.
4✔
39
     * @param {boolean} [config.syncOnFlush] If fsync should be called on write buffer flush. Set this if you need strict durability. Defaults to false.
4✔
40
     * @param {boolean} [config.dirtyReads] If dirty reads should be allowed. This means that writes that are in write buffer but not yet flushed can be read. Defaults to true.
4✔
41
     * @param {function(object, number): string} [config.partitioner] A function that takes a document and sequence number and returns a partition name that the document should be stored in. Defaults to write all documents to the primary partition.
4✔
42
     * @param {object} [config.indexOptions] An options object that should be passed to all indexes on construction.
4✔
43
     * @param {string} [config.hmacSecret] A private key that is used to verify matchers retrieved from indexes.
4✔
44
     * @param {number} [config.lock] One of LOCK_* constants that defines how an existing lock should be handled.
4✔
45
     */
4✔
46
    constructor(storageName = 'storage', config = {}) {
4✔
47
        if (typeof storageName !== 'string') {
1,156✔
48
            config = storageName;
584✔
49
            storageName = undefined;
584✔
50
        }
584✔
51
        const defaults = {
1,156✔
52
            partitioner: (document, number) => '',
1,156✔
53
            writeBufferSize: DEFAULT_WRITE_BUFFER_SIZE,
1,156✔
54
            maxWriteBufferDocuments: 0,
1,156✔
55
            syncOnFlush: false,
1,156✔
56
            dirtyReads: true,
1,156✔
57
            dataDirectory: '.'
1,156✔
58
        };
1,156✔
59
        config = Object.assign(defaults, config);
1,156✔
60
        config.indexOptions = Object.assign({ syncOnFlush: config.syncOnFlush }, config.indexOptions);
1,156✔
61
        ensureDirectory(config.dataDirectory);
1,156✔
62
        super(storageName, config);
1,156✔
63

1,156✔
64
        this.lockFile = path.resolve(this.dataDirectory, this.storageFile + '.lock');
1,156✔
65
        this._lockMode = config.lock;
1,156✔
66
        this.partitioner = config.partitioner;
1,156✔
67
    }
1,156✔
68

4✔
69
    /**
4✔
70
     * @inheritDoc
4✔
71
     * Acquires the write lock synchronously.
4✔
72
     * For LOCK_RECLAIM, removes any orphaned lock before trying to acquire our own; torn-write
4✔
73
     * repair runs after the primary index is open, before `'opened'` is emitted.
4✔
74
     *
4✔
75
     * @returns {boolean}
4✔
76
     * @throws {StorageLockedError} If this storage is locked by another process.
4✔
77
     */
4✔
78
    open(callback) {
4✔
79
        const needsRepair = this._lockMode === LOCK_RECLAIM && this.unlock();
1,112✔
80

1,112✔
81
        if (!this.lock()) {
1,112✔
82
            return true;
4✔
83
        }
4✔
84

1,104✔
85
        const onOpen = needsRepair
1,104✔
86
            ? () => { this.checkTornWrites(); callback?.(); }
1,112!
87
            : callback;
1,112✔
88
        return super.open(onOpen);
1,112✔
89
    }
1,112✔
90

4✔
91
    /**
4✔
92
     * Helper method to iterate over all writable secondary indexes.
4✔
93
     * Opens each index before calling the callback (passing the previous open status),
4✔
94
     * and closes it afterwards if it was not already open.
4✔
95
     *
4✔
96
     * @protected
4✔
97
     * @param {function(WritableIndex, string, boolean)} iterationHandler Called with (index, name, wasOpen).
4✔
98
     * @param {object} [matchDocument] If supplied, only indexes the document matches on will be iterated.
4✔
99
     */
4✔
100
    forEachWritableSecondaryIndex(iterationHandler, matchDocument) {
4✔
101
        this.forEachSecondaryIndex((index, name) => {
224✔
102
            /* istanbul ignore if */
84✔
103
            if (!(index instanceof WritableIndex)) return;
84!
104
            const wasOpen = index.isOpen();
84✔
105
            if (!wasOpen) index.open();
84✔
106
            iterationHandler(index, name, wasOpen);
84✔
107
            if (!wasOpen) index.close();
84✔
108
        }, matchDocument);
224✔
109
    }
224✔
110

4✔
111
    /**
4✔
112
     * Scan every partition's last document to detect torn writes inline.
4✔
113
     * A document is torn when its expected end exceeds the actual file size:
4✔
114
     *   position + documentWriteSize(dataSize) > partition.size
4✔
115
     *
4✔
116
     * @private
4✔
117
     * @returns {{ lastValidSequenceNumber: number, maxPartitionSequenceNumber: number }}
4✔
118
     */
4✔
119
    findTornWriteBoundary() {
4✔
120
        let lastValidSequenceNumber = Number.MAX_SAFE_INTEGER;
28✔
121
        let maxPartitionSequenceNumber = -1;
28✔
122
        this.forEachPartition(partition => {
28✔
123
            partition.open();
32✔
124
            const last = partition.readLast();
32✔
125
            /* istanbul ignore if */
32✔
126
            if (!last) return;
32!
127
            const { header: { sequenceNumber, dataSize }, position } = last;
32✔
128
            if (position + partition.documentWriteSize(dataSize) > partition.size) {
32✔
129
                // Torn write: the document extends beyond the end of the file.
16✔
130
                lastValidSequenceNumber = Math.min(lastValidSequenceNumber, sequenceNumber);
16✔
131
            } else {
16✔
132
                maxPartitionSequenceNumber = Math.max(maxPartitionSequenceNumber, sequenceNumber);
16✔
133
            }
16✔
134
        });
28✔
135
        return { lastValidSequenceNumber, maxPartitionSequenceNumber };
28✔
136
    }
28✔
137

4✔
138
    /**
4✔
139
     * Check all partitions for torn writes, physically repair each partition, truncate all indexes
4✔
140
     * to the torn-write boundary, and then reindex to rebuild any missing index entries.
4✔
141
     *
4✔
142
     * A document is torn when the partition file ends before the document's expected end position
4✔
143
     * (i.e. position + documentWriteSize(dataSize) > partition.size).  Detected inline in
4✔
144
     * findTornWriteBoundary(), without any checkTornWrite() call.
4✔
145
     *
4✔
146
     * Repair flow:
4✔
147
     * 1. findTornWriteBoundary() reads the last document of every partition and finds the global
4✔
148
     *    torn-write boundary (minimum torn sequence number across all partitions).
4✔
149
     * 2. If torn writes were found, truncateAfterSequence() removes all documents at or beyond
4✔
150
     *    the boundary from each partition.
4✔
151
     * 3. Truncate all indexes to the torn-write boundary, then reindex to fill any lagging entries.
4✔
152
     * 4. If no torn writes were found but the index is lagging, reindex directly.
4✔
153
     */
4✔
154
    checkTornWrites() {
4✔
155
        const { lastValidSequenceNumber, maxPartitionSequenceNumber } = this.findTornWriteBoundary();
28✔
156

28✔
157
        if (lastValidSequenceNumber < Number.MAX_SAFE_INTEGER) {
28✔
158
            // Phase 2: remove all documents at or beyond the torn-write boundary from each partition.
16✔
159
            this.forEachPartition(partition => {
16✔
160
                partition.open();
20✔
161
                partition.truncateAfterSequence(lastValidSequenceNumber - 1);
20✔
162
            });
16✔
163

16✔
164
            // Truncate all indexes to the torn-write boundary.
16✔
165
            this.index.open();
16✔
166
            this.index.truncate(lastValidSequenceNumber);
16✔
167
            /* istanbul ignore next */
16✔
168
            this.forEachWritableSecondaryIndex(index => {
16✔
169
                index.truncate(index.find(lastValidSequenceNumber));
12✔
170
            });
16✔
171

16✔
172
            // Reindex to fill in any missing complete-document entries.
16✔
173
            this.reindex(this.index.length);
16✔
174
        } else if (maxPartitionSequenceNumber >= 0 && maxPartitionSequenceNumber + 1 > this.index.length) {
28✔
175
            // No torn writes, but the index is lagging — repair it.
8✔
176
            this.reindex(this.index.length);
8✔
177
        }
8✔
178

28✔
179
        this.forEachPartition(partition => partition.close());
28✔
180
        // Partitions were closed directly (bypassing the pool), so reset the open-handle tracking.
28✔
181
        this.partitions.clearOpenHandles();
28✔
182
    }
28✔
183

4✔
184
    /**
4✔
185
     * Rebuild the primary index and all loaded secondary indexes starting from the given sequence
4✔
186
     * number by scanning the partition data directly.
4✔
187
     * This is the building block for both auto-repair (invoked automatically when the primary
4✔
188
     * index is found to be lagging in checkTornWrites()) and for user-driven re-indexing after
4✔
189
     * index corruption.
4✔
190
     *
4✔
191
     * @api
4✔
192
     * @param {number} [fromSequenceNumber=0] The number of primary index entries to keep intact.
4✔
193
     *   All index entries beyond this position will be removed and rebuilt from partition data.
4✔
194
     *   Defaults to 0, which rebuilds all indexes from scratch.
4✔
195
     */
4✔
196
    reindex(fromSequenceNumber = 0) {
4✔
197
        this.index.truncate(fromSequenceNumber);
48✔
198

48✔
199
        // Truncate all loaded secondary indexes to match the new primary length.
48✔
200
        this.forEachWritableSecondaryIndex(index => {
48✔
201
            // find(0) returns 0, so truncate(0) will remove all entries when fromSequenceNumber===0
20✔
202
            index.truncate(fromSequenceNumber === 0 ? 0 : index.find(fromSequenceNumber));
20✔
203
        });
48✔
204

48✔
205
        // Scan partitions in sequence-number order and rebuild index entries.
48✔
206
        // iterateDocumentsNoIndex opens any closed partitions automatically.
48✔
207
        for (const { document, partition, position, size } of this.iterateDocumentsNoIndex(fromSequenceNumber)) {
48✔
208
            const newEntry = new WritableIndexEntry(this.index.length + 1, position, size, partition);
112✔
209
            this.index.add(newEntry);
112✔
210

112✔
211
            this.forEachWritableSecondaryIndex((secIndex) => {
112✔
212
                secIndex.add(newEntry);
20✔
213
            }, document);
112✔
214
        }
112✔
215

48✔
216
        this.flush();
48✔
217
    }
48✔
218

4✔
219
    /**
4✔
220
     * Attempt to lock this storage by means of a lock directory.
4✔
221
     * @returns {boolean} True if the lock was created or false if the lock is already in place.
4✔
222
     * @throws {StorageLockedError} If this storage is already locked by another process.
4✔
223
     * @throws {Error} If the lock could not be created.
4✔
224
     */
4✔
225
    lock() {
4✔
226
        if (this.locked) {
1,112✔
227
            return false;
4✔
228
        }
4✔
229
        try {
1,108✔
230
            fs.mkdirSync(this.lockFile);
1,108✔
231
            this.locked = true;
1,108✔
232
        } catch (e) {
1,112✔
233
            /* istanbul ignore if */
4✔
234
            if (e.code !== 'EEXIST') {
4!
UNCOV
235
                throw new Error(`Error creating lock for storage ${this.storageFile}: ` + e.message);
×
236
            }
×
237
            throw new StorageLockedError(`Storage ${this.storageFile} is locked by another process`);
4✔
238
        }
4✔
239
        return true;
1,104✔
240
    }
1,112✔
241

4✔
242
    /**
4✔
243
     * Unlock this storage, no matter if it was previously locked by this writer.
4✔
244
     * Only use this if you are sure there is no other process still having a writer open.
4✔
245
     * Current implementation just deletes a lock file that is named like the storage.
4✔
246
     * @returns {boolean} True if an orphaned lock from another process was removed.
4✔
247
     */
4✔
248
    unlock() {
4✔
249
        const lockExists = fs.existsSync(this.lockFile);
1,108✔
250
        const orphaned = lockExists && !this.locked;
1,108✔
251
        if (lockExists) {
1,108✔
252
            fs.rmdirSync(this.lockFile);
1,100✔
253
        }
1,100✔
254
        this.locked = false;
1,108✔
255
        return orphaned;
1,108✔
256
    }
1,108✔
257

4✔
258
    /**
4✔
259
     * @inheritDoc
4✔
260
     * Unlocks the storage, then delegates to the parent close().
4✔
261
     */
4✔
262
    close() {
4✔
263
        if (this.locked) {
1,712✔
264
            this.unlock();
1,096✔
265
        }
1,096✔
266
        super.close();
1,712✔
267
    }
1,712✔
268

4✔
269
    /**
4✔
270
     * Add an index entry for the given document at the position and size.
4✔
271
     *
4✔
272
     * @private
4✔
273
     * @param {number} partitionId The partition where the document is stored.
4✔
274
     * @param {number} position The file offset where the document is stored.
4✔
275
     * @param {number} size The size of the stored document.
4✔
276
     * @param {object} document The document to add to the index.
4✔
277
     * @param {function} [callback] The callback to call when the index is written to disk.
4✔
278
     * @returns {EntryInterface} The index entry item.
4✔
279
     */
4✔
280
    addIndex(partitionId, position, size, document, callback) {
4✔
281
        if (!this.index.isOpen()) {
3,352✔
282
            this.index.open();
4✔
283
        }
4✔
284

3,352✔
285
        /*if (this.index.lastEntry.position + this.index.lastEntry.size !== position) {
3,352✔
286
         this.emit('index-corrupted');
3,352✔
287
         throw new Error('Corrupted index, needs to be rebuilt!');
3,352✔
288
         }*/
3,352✔
289

3,352✔
290
        const entry = new WritableIndexEntry(this.index.length + 1, position, size, partitionId);
3,352✔
291
        this.index.add(entry, (indexPosition) => {
3,352✔
292
            this.emit('wrote', document, entry, indexPosition);
3,352✔
293
            /* istanbul ignore if  */
3,352✔
294
            if (typeof callback === 'function') {
3,352!
UNCOV
295
                return callback(indexPosition);
×
296
            }
×
297
        });
3,352✔
298
        return entry;
3,352✔
299
    }
3,352✔
300

4✔
301
    /**
4✔
302
     * Register a handler that is called before a document is written to storage.
4✔
303
     * The handler receives the document and the partition metadata and may throw to abort the write.
4✔
304
     * Multiple handlers can be registered; all run on every write in registration order.
4✔
305
     * Equivalent to `storage.on('preCommit', hook)`.
4✔
306
     *
4✔
307
     * @api
4✔
308
     * @param {function(object, object): void} hook A function receiving (document, partitionMetadata).
4✔
309
     */
4✔
310
    preCommit(hook) {
4✔
311
        this.on('preCommit', hook);
16✔
312
    }
16✔
313

4✔
314
    /**
4✔
315
     * Get a partition either by name or by id.
4✔
316
     * If a partition with the given name does not exist, a new one will be created.
4✔
317
     * If a partition with the given id does not exist, an error is thrown.
4✔
318
     *
4✔
319
     * Partition opening and LRU tracking are delegated to `super.getPartition()`.
4✔
320
     *
4✔
321
     * @protected
4✔
322
     * @param {string|number} partitionIdentifier The partition name or the partition Id
4✔
323
     * @returns {ReadablePartition}
4✔
324
     * @throws {Error} If an id is given and no such partition exists.
4✔
325
     */
4✔
326
    getPartition(partitionIdentifier) {
4✔
327
        if (typeof partitionIdentifier === 'string') {
5,828✔
328
            const partitionShortName = partitionIdentifier;
3,376✔
329
            const partitionName = this.storageFile + (partitionIdentifier.length ? '.' + partitionIdentifier : '');
3,376✔
330
            partitionIdentifier = WritablePartition.idFor(partitionName);
3,376✔
331
            if (!this.partitions.has(partitionIdentifier)) {
3,376✔
332
                const partitionConfig = typeof this.partitionConfig.metadata === 'function'
1,224✔
333
                    ? { ...this.partitionConfig, metadata: this.partitionConfig.metadata(partitionShortName) }
1,224✔
334
                    : this.partitionConfig;
1,224✔
335
                if (partitionName.includes('/')) {
1,224✔
336
                    ensureDirectory(path.join(this.dataDirectory, path.dirname(partitionName)));
88✔
337
                }
88✔
338
                this.partitions.add(partitionIdentifier, this.createPartition(partitionName, partitionConfig));
1,224✔
339
                this.emit('partition-created', partitionIdentifier);
1,224✔
340
            }
1,224✔
341
        }
3,376✔
342
        return super.getPartition(partitionIdentifier);
5,828✔
343
    }
5,828✔
344

4✔
345
    /**
4✔
346
     * @api
4✔
347
     * @param {object} document The document to write to storage.
4✔
348
     * @param {function} [callback] A function that will be called when the document is written to disk.
4✔
349
     * @returns {number} The 1-based document sequence number in the storage.
4✔
350
     */
4✔
351
    write(document, callback) {
4✔
352
        const data = this.serializer.serialize(document).toString();
3,360✔
353
        const dataSize = Buffer.byteLength(data, 'utf8');
3,360✔
354

3,360✔
355
        const partitionName = this.partitioner(document, this.index.length + 1);
3,360✔
356
        const partition = this.getPartition(partitionName);
3,360✔
357
        if (this.listenerCount('preCommit') > 0) {
3,360✔
358
            this.emit('preCommit', document, partition.metadata);
56✔
359
        }
56✔
360
        const position = partition.write(data, this.length, callback);
3,352✔
361

3,352✔
362
        assert(position !== false, 'Error writing document.');
3,352✔
363

3,352✔
364
        const indexEntry = this.addIndex(partition.id, position, dataSize, document);
3,352✔
365
        this.forEachSecondaryIndex((index, name) => {
3,352✔
366
            if (!index.isOpen()) {
1,972✔
367
                index.open();
4✔
368
            }
4✔
369
            index.add(indexEntry);
1,972✔
370
            this.emit('index-add', name, index.length, document);
1,972✔
371
        }, document);
3,352✔
372

3,352✔
373
        return this.index.length;
3,352✔
374
    }
3,360✔
375

4✔
376
    /**
4✔
377
     * Ensure that an index with the given name and document matcher exists.
4✔
378
     * Will create the index if it doesn't exist, otherwise return the existing index.
4✔
379
     *
4✔
380
     * @api
4✔
381
     * @param {string} name The index name.
4✔
382
     * @param {Matcher} [matcher] An object that describes the document properties that need to match to add it this index or a function that receives a document and returns true if the document should be indexed.
4✔
383
     * @param {boolean} [reindex=true] Whether to scan existing documents and populate the new index. Set to false when it is known that no existing documents can match the matcher.
4✔
384
     * @returns {ReadableIndex} The index containing all documents that match the query.
4✔
385
     * @throws {Error} if the index doesn't exist yet and no matcher was specified.
4✔
386
     */
4✔
387
    ensureIndex(name, matcher, reindex = true) {
4✔
388
        if (name === '_all') {
1,272✔
389
            return this.index;
4✔
390
        }
4✔
391
        if (name in this.secondaryIndexes) {
1,272✔
392
            return this.secondaryIndexes[name].index;
4✔
393
        }
4✔
394

1,264✔
395
        const indexName = this.storageFile + '.' + name + '.index';
1,264✔
396
        if (fs.existsSync(path.join(this.indexDirectory, indexName))) {
1,272✔
397
            return this.openIndex(name, matcher);
16✔
398
        }
16✔
399

1,248✔
400
        assert((typeof matcher === 'object' || typeof matcher === 'function') && matcher !== null, 'Need to specify a matcher.');
1,272✔
401

1,272✔
402
        const metadata = buildMetadataForMatcher(matcher, this.hmac);
1,272✔
403
        const { index } = this.createIndex(indexName, Object.assign({}, this.indexOptions, { metadata }));
1,272✔
404
        if (reindex) {
1,272✔
405
            try {
448✔
406
                this.forEachDocument((document, indexEntry) => {
448✔
407
                    if (matches(document, matcher)) {
40✔
408
                        index.add(indexEntry);
28✔
409
                    }
28✔
410
                });
448✔
411
            } catch (e) {
448✔
412
                index.destroy();
4✔
413
                throw e;
4✔
414
            }
4✔
415
        }
448✔
416

1,240✔
417
        this.secondaryIndexes[name] = { index, matcher };
1,240✔
418
        this.indexMatcher.add(name, matcher);
1,240✔
419
        this.emit('index-created', name);
1,240✔
420
        return index;
1,240✔
421
    }
1,272✔
422

4✔
423
    /**
4✔
424
     * Flush all write buffers to disk.
4✔
425
     * This is a sync method and will invoke all previously registered flush callbacks.
4✔
426
     *
4✔
427
     * @api
4✔
428
     * @returns {boolean} Returns true if a flush on any partition or the main index was executed.
4✔
429
     */
4✔
430
    flush() {
4✔
431
        let result = this.index.flush();
156✔
432
        this.forEachPartition(partition => result = result | partition.flush());
156✔
433
        this.forEachSecondaryIndex(index => index.flush());
156✔
434
        return result;
156✔
435
    }
156✔
436

4✔
437
    /**
4✔
438
     * Iterate all distinct partitions in which the given iterable list of entries are stored.
4✔
439
     * @param {Iterable<Index.Entry>} entries
4✔
440
     * @param {function(Index.Entry)} iterationHandler
4✔
441
     */
4✔
442
    forEachDistinctPartitionOf(entries, iterationHandler) {
4✔
443
        const partitions = [];
36✔
444
        const numPartitions = this.partitions.count;
36✔
445
        for (let entry of entries) {
36✔
446
            if (partitions.indexOf(entry.partition) >= 0) {
56✔
447
                continue;
16✔
448
            }
16✔
449
            partitions.push(entry.partition);
40✔
450
            iterationHandler(entry);
40✔
451
            if (partitions.length === numPartitions) {
56✔
452
                break;
24✔
453
            }
24✔
454
        }
56✔
455
    }
36✔
456

4✔
457
    /**
4✔
458
     * Truncate all partitions after the given (global) sequence number.
4✔
459
     *
4✔
460
     * Assumes the primary index is fully consistent with the partition data. Looks up the first
4✔
461
     * index entry after `after` for each affected partition and truncates the partition file
4✔
462
     * at that entry's byte position.
4✔
463
     *
4✔
464
     * @private
4✔
465
     * @param {number} after The document sequence number to truncate after.
4✔
466
     */
4✔
467
    truncatePartitions(after) {
4✔
468
        if (after === 0) {
48✔
469
            this.forEachPartition(partition => partition.truncate(0));
8✔
470
            return;
8✔
471
        }
8✔
472

40✔
473
        const entries = this.index.range(after + 1);  // We need the first entry that is cut off
40✔
474
        if (entries === false || entries.length === 0) {
48✔
475
            return;
4✔
476
        }
4✔
477

36✔
478
        this.forEachDistinctPartitionOf(entries, entry => this.getPartition(entry.partition).truncate(entry.position));
36✔
479
    }
48✔
480

4✔
481
    /**
4✔
482
     * Truncate the storage after the given sequence number.
4✔
483
     *
4✔
484
     * @param {number} after The document sequence number to truncate after.
4✔
485
     */
4✔
486
    truncate(after) {
4✔
487
        /*
48✔
488
         To truncate the store following steps need to be done:
48✔
489

48✔
490
         1) find all partition positions after which their files should be truncated
48✔
491
         2) truncate all partitions accordingly
48✔
492
         3) truncate/rewrite all indexes
48✔
493
         */
48✔
494
        if (!this.index.isOpen()) {
48✔
495
            this.index.open();
24✔
496
        }
24✔
497
        if (after < 0) {
48✔
498
            after += this.index.length;
4✔
499
        }
4✔
500

48✔
501
        this.truncatePartitions(after);
48✔
502

48✔
503
        this.index.truncate(after);
48✔
504
        this.forEachWritableSecondaryIndex(index => {
48✔
505
            index.truncate(index.find(after));
32✔
506
        });
48✔
507
    }
48✔
508

4✔
509
    /**
4✔
510
     * @inheritDoc
4✔
511
     * Open an existing secondary index and repair any stale entries beyond the current primary
4✔
512
     * index length. Stale entries can be present when checkTornWrites() truncated the primary
4✔
513
     * index before this secondary index was loaded into memory.
4✔
514
     */
4✔
515
    openIndex(name, matcher) {
4✔
516
        const index = super.openIndex(name, matcher);
1,084✔
517
        const lastEntry = index.lastEntry;
1,084✔
518
        if (lastEntry !== false && lastEntry.number > this.index.length) {
1,084✔
519
            // Secondary index is ahead of primary: truncate stale entries.
4✔
520
            index.truncate(index.find(this.index.length));
4✔
521
        }
4✔
522
        return index;
1,068✔
523
    }
1,084✔
524

4✔
525
    /**
4✔
526
     * @protected
4✔
527
     * @param {string} name
4✔
528
     * @param {object} [options]
4✔
529
     * @returns {{ index: WritableIndex, matcher: Matcher }}
4✔
530
     */
4✔
531
    createIndex(name, options = {}) {
4✔
532
        const index = new WritableIndex(name, options);
2,516✔
533
        let matcher;
2,516✔
534

2,516✔
535
        // If the index contains a matcher (possibly a serialized function) we check HMAC
2,516✔
536
        // to prevent evaluating unknown code.
2,516✔
537
        if (index.metadata.matcher) {
2,516✔
538
            try {
1,352✔
539
                matcher = buildMatcherFromMetadata(index.metadata, this.hmac);
1,352✔
540
            } catch (e) {
1,352✔
541
                index.destroy();
4✔
542
                throw e;
4✔
543
            }
4✔
544
        }
1,352✔
545

2,504✔
546
        return { index, matcher };
2,504✔
547
    }
2,516✔
548

4✔
549
    /**
4✔
550
     * @protected
4✔
551
     * @param {string} name
4✔
552
     * @param {object} [config]
4✔
553
     * @returns {WritablePartition}
4✔
554
     */
4✔
555
    createPartition(name, config = {}) {
4✔
556
        return new WritablePartition(name, config);
1,474✔
557
    }
1,474✔
558

4✔
559
}
4✔
560

4✔
561
export default WritableStorage;
4✔
562
export { StorageLockedError, LOCK_THROW, LOCK_RECLAIM };
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc