• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

albe / node-event-storage / 25625880534

10 May 2026 10:02AM UTC coverage: 97.684% (-0.3%) from 98.015%
25625880534

push

github

web-flow
Merge pull request #303 from albe/copilot/sub-pr-301

feat(Storage): async partition + index scan in open() with 'opened' event and callback hook

1036 of 1086 branches covered (95.4%)

Branch coverage included in aggregate %.

100 of 102 new or added lines in 4 files covered. (98.04%)

21 existing lines in 2 files now uncovered.

5416 of 5519 relevant lines covered (98.13%)

808.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.51
/src/Storage/ReadableStorage.js
1
import fs from 'fs';
4✔
2
import path from 'path';
4✔
3
import events from 'events';
4✔
4
import Partition, { ReadOnly as ReadOnlyPartition } from '../Partition.js';
4✔
5
import Index, { ReadOnly as ReadOnlyIndex } from '../Index.js';
4✔
6
import { assert, wrapAndCheck, kWayMerge, scanForFiles } from '../util.js';
4✔
7
import { createHmac, matches, buildMetadataForMatcher } from '../metadataUtil.js';
4✔
8
import IndexMatcher from '../IndexMatcher.js';
4✔
9
import PartitionPool from '../PartitionPool.js';
4✔
10

4✔
11
const DEFAULT_READ_BUFFER_SIZE = 4 * 1024;
4✔
12

4✔
13
/**
4✔
14
 * Default ordered list of document property paths used as discriminant keys when
4✔
15
 * classifying object matchers into the fast-lookup table.  Each path may use
4✔
16
 * dot-notation for nested access (e.g. `'payload.type'`).  The first path that
4✔
17
 * resolves to a scalar value in a given matcher wins; remaining paths are not
4✔
18
 * examined for that matcher.
4✔
19
 */
4✔
20
const DEFAULT_MATCHER_PROPERTIES = ['stream', 'payload.type'];
4✔
21

4✔
22
/**
4✔
23
 * Default maximum number of partition file descriptors kept open simultaneously.
4✔
24
 * Partitions beyond this limit are evicted using LRU order. 0 disables the limit.
4✔
25
 */
4✔
26
const DEFAULT_MAX_OPEN_PARTITIONS = 1024;
4✔
27

4✔
28
/**
4✔
29
 * Reverses the items of an iterable
4✔
30
 * @param {Generator|Iterable} iterator
4✔
31
 * @returns {Generator<*>}
4✔
32
 */
4✔
33
function *reverse(iterator) {
68✔
34
    const items = Array.from(iterator);
68✔
35
    for (let i = items.length - 1; i >= 0; i--) {
68✔
36
        yield items[i];
580✔
37
    }
580✔
38
}
68✔
39

4✔
40
/**
4✔
41
 * @typedef {object|function(object):boolean} Matcher
4✔
42
 */
4✔
43

4✔
44
/**
4✔
45
 * An append-only storage with highly performant positional range scans.
4✔
46
 * It's highly optimized for an event-store and hence does not support compaction or data-rewrite, nor any querying
4✔
47
 */
4✔
48
class ReadableStorage extends events.EventEmitter {
4✔
49

4✔
50
    /**
4✔
51
     * @param {string} [storageName] The name of the storage.
4✔
52
     * @param {object} [config] An object with storage parameters.
4✔
53
     * @param {object} [config.serializer] A serializer object with methods serialize(document) and deserialize(data).
4✔
54
     * @param {function(object): string} config.serializer.serialize Default is JSON.stringify.
4✔
55
     * @param {function(string): object} config.serializer.deserialize Default is JSON.parse.
4✔
56
     * @param {string} [config.dataDirectory] The path where the storage data should reside. Default '.'.
4✔
57
     * @param {string} [config.indexDirectory] The path where the indexes should be stored. Defaults to dataDirectory.
4✔
58
     * @param {string} [config.indexFile] The name of the primary index. Default '{storageName}.index'.
4✔
59
     * @param {number} [config.readBufferSize] Size of the read buffer in bytes. Default 4096.
4✔
60
     * @param {object} [config.indexOptions] An options object that should be passed to all indexes on construction.
4✔
61
     * @param {string} [config.hmacSecret] A private key that is used to verify matchers retrieved from indexes.
4✔
62
     * @param {object} [config.metadata] A metadata object to be stored in all partitions belonging to this storage.
4✔
63
     * @param {string[]} [config.matcherProperties] Ordered list of document property paths (dot-notation) used as
4✔
64
     *   discriminant keys for the fast secondary-index lookup table. Only the first property that resolves to a scalar
4✔
65
     *   value inside a given object matcher is used; the rest are checked via the full `matches()` fallback.
4✔
66
     *   Default: `['stream', 'payload.type']`.
4✔
67
     * @param {number} [config.maxOpenPartitions] Maximum number of partition file descriptors kept open at one time.
4✔
68
     *   When the limit is reached the least-recently-used partition is closed to make room. 0 disables the limit.
4✔
69
     *   Default: 1024.
4✔
70
     */
4✔
71
    constructor(storageName = 'storage', config = {}) {
4✔
72
        super();
1,236✔
73
        if (typeof storageName !== 'string') {
1,236✔
74
            config = storageName;
52✔
75
            storageName = undefined;
52✔
76
        }
52✔
77

1,236✔
78
        this.storageFile = storageName || 'storage';
1,236✔
79
        const defaults = {
1,236✔
80
            serializer: { serialize: JSON.stringify, deserialize: JSON.parse },
1,236✔
81
            dataDirectory: '.',
1,236✔
82
            indexFile: this.storageFile + '.index',
1,236✔
83
            indexOptions: {},
1,236✔
84
            hmacSecret: '',
1,236✔
85
            metadata: {},
1,236✔
86
            matcherProperties: DEFAULT_MATCHER_PROPERTIES,
1,236✔
87
            maxOpenPartitions: DEFAULT_MAX_OPEN_PARTITIONS
1,236✔
88
        };
1,236✔
89
        config = Object.assign(defaults, config);
1,236✔
90
        this.serializer = config.serializer;
1,236✔
91

1,236✔
92
        this.hmac = createHmac(config.hmacSecret);
1,236✔
93

1,236✔
94
        this.dataDirectory = path.resolve(config.dataDirectory);
1,236✔
95

1,236✔
96
        const partitionDefaults = { readBufferSize: DEFAULT_READ_BUFFER_SIZE };
1,236✔
97
        this.partitionConfig = Object.assign(partitionDefaults, config);
1,236✔
98
        this.partitions = new PartitionPool(config.maxOpenPartitions);
1,236✔
99

1,236✔
100
        // initialized: null = not started (or scan cancelled), false = in progress, true = done
1,236✔
101
        this.initialized = null;
1,236✔
102

1,236✔
103
        this.initializeIndexes(config);
1,236✔
104
    }
1,236✔
105

4✔
106
    /**
4✔
107
     * @protected
4✔
108
     * @param {string} name
4✔
109
     * @param {object} [options]
4✔
110
     * @returns {{ index: ReadableIndex, matcher?: Matcher }}
4✔
111
     */
4✔
112
    createIndex(name, options = {}) {
4✔
113
        /** @type ReadableIndex */
136✔
114
        const index = new ReadOnlyIndex(name, options);
136✔
115
        return { index };
136✔
116
    }
136✔
117

4✔
118
    /**
4✔
119
     * @protected
4✔
120
     * @param {string} name
4✔
121
     * @param {object} [options]
4✔
122
     * @returns {ReadablePartition}
4✔
123
     */
4✔
124
    createPartition(name, options = {}) {
4✔
125
        return new ReadOnlyPartition(name, options);
52✔
126
    }
52✔
127

4✔
128
    /**
4✔
129
     * Create/open the primary index and build the base configuration for all secondary indexes.
4✔
130
     *
4✔
131
     * @private
4✔
132
     * @param {object} config The configuration object
4✔
133
     * @returns void
4✔
134
     */
4✔
135
    initializeIndexes(config) {
4✔
136
        this.indexDirectory = path.resolve(config.indexDirectory || this.dataDirectory);
1,236✔
137

1,236✔
138
        this.indexOptions = config.indexOptions;
1,236✔
139
        this.indexOptions.dataDirectory = this.indexDirectory;
1,236✔
140
        // Safety precaution to prevent accidentally restricting main index
1,236✔
141
        delete this.indexOptions.matcher;
1,236✔
142
        const { index } = this.createIndex(config.indexFile, this.indexOptions);
1,236✔
143
        this.index = index;
1,236✔
144
        this.secondaryIndexes = {};
1,236✔
145
        this.readonlyIndexes = {};
1,236✔
146

1,236✔
147
        /** Fast secondary-index lookup — classifies matchers for O(1) candidate resolution on write. */
1,236✔
148
        this.indexMatcher = new IndexMatcher(config.matcherProperties);
1,236✔
149
    }
1,236✔
150

4✔
151
    /**
4✔
152
     * The amount of documents in the storage.
4✔
153
     * @returns {number}
4✔
154
     */
4✔
155
    get length() {
4✔
156
        return this.index.length;
5,020✔
157
    }
5,020✔
158

4✔
159
    /**
4✔
160
     * Scan partitions and secondary index files; emit 'index-created' for each found index.
4✔
161
     * @param {function} done Called when both scans finish.
4✔
162
     */
4✔
163
    scanFiles(done) {
4✔
164
        const escaped = this.storageFile.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
1,172✔
165
        const partitionPattern = new RegExp(`^(${escaped}.*)$`);
1,172✔
166
        scanForFiles(this.dataDirectory, partitionPattern, (file) => {
1,172✔
167
            if (file.endsWith('.index') || file.endsWith('.branch') || file.endsWith('.lock')) return;
934✔
168
            const partition = this.createPartition(file, this.partitionConfig);
298✔
169
            this.partitions.add(partition.id, partition);
298✔
170
        }, (partErr) => {
1,172✔
171
            /* istanbul ignore if */
1,172✔
172
            if (partErr) throw partErr;
1,172✔
173

1,168✔
174
            // Scan was cancelled by close() between the two scan phases.
1,168✔
175
            if (this.initialized === null) return;
1,172✔
176

335✔
177
            // No secondary indexes exist yet — nothing to scan.
335✔
178
            if (!fs.existsSync(this.indexDirectory)) {
1,172!
NEW
179
                return done();
×
NEW
180
            }
×
181
            const indexPattern = new RegExp(`^${escaped}\\.(.+)\\.index$`);
335✔
182
            scanForFiles(this.indexDirectory, indexPattern, (name) => {
335✔
183
                this.emit('index-created', name);
147✔
184
            }, (indexErr) => {
335✔
185
                // The directory could disappear between existsSync and readdir (e.g. test cleanup).
335✔
186
                /* istanbul ignore if */
335✔
187
                if (indexErr && indexErr.code !== 'ENOENT') throw indexErr;
335!
188
                done();
335✔
189
            });
335✔
190
        });
1,172✔
191
    }
1,172✔
192

4✔
193
    /**
4✔
194
     * Only the primary index is opened eagerly; secondary indexes open on demand.
4✔
195
     *
4✔
196
     * @protected
4✔
197
     */
4✔
198
    openIndexes() {
4✔
199
        this.index.open();
204✔
200
    }
204✔
201

4✔
202
    /**
4✔
203
     * Open the storage; scans existing partitions and indexes asynchronously on first open.
4✔
204
     * Re-opens after `close()` are synchronous.
4✔
205
     * Will emit an `'opened'` event when finished.
4✔
206
     *
4✔
207
     * @api
4✔
208
     * @param {function(): void} [callback] Called after indexes open, before `'opened'` is emitted.
4✔
209
     *   Can be used as a synchronous alternative to listening to the `'opened'` event.
4✔
210
     * @returns {boolean}
4✔
211
     */
4✔
212
    open(callback) {
4✔
213
        if (this.initialized === true) {
1,184✔
214
            this.openIndexes();
8✔
215
            callback?.();
8✔
216
            this.emit('opened');
8✔
217
            return true;
8✔
218
        }
8✔
219
        if (this.initialized === false) {
1,184✔
220
            return true;
4✔
221
        }
4✔
222
        this.initialized = false;
1,172✔
223
        this.scanFiles(() => {
1,172✔
224
            // Guard: close() while scanning resets initialized to null.
335✔
225
            if (this.initialized === null) return;
335✔
226
            this.initialized = true;
196✔
227
            this.openIndexes();
196✔
228
            callback?.();
335✔
229
            this.emit('opened');
335✔
230
        });
1,172✔
231
        return true;
1,172✔
232
    }
1,184✔
233

4✔
234
    /**
4✔
235
     * Close the storage and free up all resources.
4✔
236
     * Will emit a 'closed' event when finished.
4✔
237
     *
4✔
238
     * @api
4✔
239
     */
4✔
240
    close() {
4✔
241
        // Cancel in-progress scan so the callback does not re-open after an explicit close.
1,844✔
242
        if (this.initialized === false) {
1,844✔
243
            this.initialized = null;
972✔
244
        }
972✔
245
        this.index.close();
1,844✔
246
        this.forEachSecondaryIndex(index => index.close());
1,844✔
247
        for (let index of Object.values(this.readonlyIndexes)) {
1,844✔
248
            index.close();
44✔
249
        }
44✔
250
        this.forEachPartition(partition => partition.close());
1,844✔
251
        this.emit('closed');
1,844✔
252
    }
1,844✔
253

4✔
254
    /**
4✔
255
     * Get a partition by its id.
4✔
256
     * If a partition with the given id does not exist, an error is thrown.
4✔
257
     *
4✔
258
     * @protected
4✔
259
     * @param {number|string} partitionIdentifier The partition Id
4✔
260
     * @returns {ReadablePartition}
4✔
261
     * @throws {Error} If no such partition exists.
4✔
262
     */
4✔
263
    getPartition(partitionIdentifier) {
4✔
264
        assert(this.partitions.has(partitionIdentifier), `Partition #${partitionIdentifier} does not exist.`);
5,860✔
265
        return this.partitions.open(partitionIdentifier);
5,860✔
266
    }
5,860✔
267

4✔
268
    /**
4✔
269
     * Register a handler that is called before a document is read from a partition.
4✔
270
     * The handler receives the position and the partition metadata and may throw to abort the read.
4✔
271
     * Multiple handlers can be registered; all run on every read in registration order.
4✔
272
     * Equivalent to `storage.on('preRead', hook)`.
4✔
273
     *
4✔
274
     * @api
4✔
275
     * @param {function(number, object): void} hook A function receiving (position, partitionMetadata).
4✔
276
     */
4✔
277
    preRead(hook) {
4✔
278
        this.on('preRead', hook);
12✔
279
    }
12✔
280

4✔
281
    /**
4✔
282
     * @protected
4✔
283
     * @param {number} partitionId The partition to read from.
4✔
284
     * @param {number} position The file position to read from.
4✔
285
     * @param {number} [size] The expected byte size of the document at the given position.
4✔
286
     * @returns {object} The document stored at the given position.
4✔
287
     * @throws {Error} if the document at the given position can not be deserialized.
4✔
288
     */
4✔
289
    readFrom(partitionId, position, size) {
4✔
290
        const partition = this.getPartition(partitionId);
2,432✔
291
        if (this.listenerCount('preRead') > 0) {
2,432✔
292
            this.emit('preRead', position, partition.metadata);
64✔
293
        }
64✔
294
        const data = partition.readFrom(position, size);
2,424✔
295
        return this.serializer.deserialize(data);
2,424✔
296
    }
2,432✔
297

4✔
298
    /**
4✔
299
     * Read a single document from the given position, in the full index or in the provided index.
4✔
300
     *
4✔
301
     * @api
4✔
302
     * @param {number} number The 1-based document number (inside the given index) to read.
4✔
303
     * @param {ReadableIndex} [index] The index to use for finding the document position.
4✔
304
     * @returns {object} The document at the given position inside the index.
4✔
305
     */
4✔
306
    read(number, index) {
4✔
307
        index = index || this.index;
408✔
308

408✔
309
        if (!index.isOpen()) {
408✔
310
            index.open();
20✔
311
        }
20✔
312

408✔
313
        const entry = index.get(number);
408✔
314
        if (entry === false) {
408✔
315
            return false;
4✔
316
        }
4✔
317

404✔
318
        return this.readFrom(entry.partition, entry.position, entry.size);
404✔
319
    }
408✔
320

4✔
321
    /**
4✔
322
     * Read a range of documents from the given position range, in the full index or in the provided index.
4✔
323
     * Returns a generator in order to reduce memory usage and be able to read lots of documents with little latency.
4✔
324
     *
4✔
325
     * @api
4✔
326
     * @param {number} from The 1-based document number (inclusive) to start reading from.
4✔
327
     * @param {number} [until] The 1-based document number (inclusive) to read until. Defaults to index.length.
4✔
328
     * @param {ReadableIndex|false} [index] The index to use for finding the documents in the range.
4✔
329
     *   Pass `false` to skip the global index and iterate all partitions directly in sequenceNumber order
4✔
330
     *   (useful when the global index is unavailable or corrupted).
4✔
331
     * @returns {Generator<object>} A generator that will read each document in the range one by one.
4✔
332
     */
4✔
333
    *readRange(from, until = -1, index = null) {
4✔
334
        const lengthSource = index || this.index;
712✔
335
        if (!lengthSource.isOpen()) {
712✔
336
            lengthSource.open();
48✔
337
        }
48✔
338

712✔
339
        const readFrom = wrapAndCheck(from, lengthSource.length);
712✔
340
        const readUntil = wrapAndCheck(until, lengthSource.length);
712✔
341
        assert(readFrom > 0 && readUntil > 0, `Range scan error for range ${from} - ${until}.`);
712✔
342

712✔
343
        if (readFrom > readUntil) {
712✔
344
            const batchSize = 10;
48✔
345
            let batchUntil = readFrom;
48✔
346
            while (batchUntil >= readUntil) {
48✔
347
                const batchFrom = Math.max(readUntil, batchUntil - batchSize);
68✔
348
                yield* reverse(this.iterateRange(batchFrom, batchUntil, index));
68✔
349
                batchUntil = batchFrom - 1;
68✔
350
            }
68✔
351
            return undefined;
48✔
352
        }
48✔
353

648✔
354
        yield* this.iterateRange(readFrom, readUntil, index);
648✔
355
    }
712✔
356

4✔
357
    /**
4✔
358
     * Iterate all documents in this storage in range from to until inside the index.
4✔
359
     * If index is false, iterates all partitions directly in sequenceNumber order.
4✔
360
     * @private
4✔
361
     * @param {number} from
4✔
362
     * @param {number} until
4✔
363
     * @param {ReadableIndex|false|null} index
4✔
364
     * @returns {Generator<object>}
4✔
365
     */
4✔
366
    *iterateRange(from, until, index) {
4✔
367
        if (index === false) {
716✔
368
            // Explicitly disabled index: iterate all partitions and merge by sequenceNumber.
12✔
369
            // Document header sequenceNumber is 0-based; from/until are 1-based index positions.
12✔
370
            for (const entry of this.iterateDocumentsNoIndex(from - 1, until - 1)) {
12✔
371
                yield entry.document;
92✔
372
            }
92✔
373
            return;
12✔
374
        }
12✔
375

704✔
376
        const idx = index || this.index;
716✔
377
        const entries = idx.range(from, until);
716✔
378
        for (let entry of entries) {
716✔
379
            const document = this.readFrom(entry.partition, entry.position, entry.size);
1,980✔
380
            yield document;
1,980✔
381
        }
1,948✔
382
    }
716✔
383

4✔
384
    /**
4✔
385
     * Open an existing readonly index for reading, without registering it in the secondary indexes write path.
4✔
386
     * Use this for indexes whose files carry a status marker (e.g. `stream-foo.closed.index`).
4✔
387
     *
4✔
388
     * @api
4✔
389
     * @param {string} name The readonly index name (e.g. 'stream-foo.closed').
4✔
390
     * @returns {ReadableIndex}
4✔
391
     * @throws {Error} if the readonly index does not exist.
4✔
392
     */
4✔
393
    openReadonlyIndex(name) {
4✔
394
        if (name in this.readonlyIndexes) {
44!
395
            return this.readonlyIndexes[name];
×
396
        }
×
397
        const indexName = this.storageFile + '.' + name + '.index';
44✔
398
        assert(fs.existsSync(path.join(this.indexDirectory, indexName)), `Index "${name}" does not exist.`);
44✔
399
        const { index } = this.createIndex(indexName, Object.assign({}, this.indexOptions));
44✔
400
        index.open();
44✔
401
        this.readonlyIndexes[name] = index;
44✔
402
        return index;
44✔
403
    }
44✔
404

4✔
405
    /**
4✔
406
     * Open an existing index.
4✔
407
     *
4✔
408
     * @api
4✔
409
     * @param {string} name The index name.
4✔
410
     * @param {Matcher} [matcher] The matcher object or function that the index needs to have been defined with. If not given it will not be validated.
4✔
411
     * @returns {ReadableIndex}
4✔
412
     * @throws {Error} if the index with that name does not exist.
4✔
413
     * @throws {Error} if the HMAC for the matcher does not match.
4✔
414
     */
4✔
415
    openIndex(name, matcher) {
4✔
416
        if (name === '_all') {
1,104✔
417
            return this.index;
8✔
418
        }
8✔
419
        if (name in this.secondaryIndexes) {
1,104✔
420
            return this.secondaryIndexes[name].index;
980✔
421
        }
980✔
422

116✔
423
        const indexName = this.storageFile + '.' + name + '.index';
116✔
424
        assert(fs.existsSync(path.join(this.indexDirectory, indexName)), `Index "${name}" does not exist.`);
116✔
425

116✔
426
        const metadata = buildMetadataForMatcher(matcher, this.hmac);
116✔
427
        let { index } = this.secondaryIndexes[name] = this.createIndex(indexName, Object.assign({}, this.indexOptions, { metadata }));
116✔
428

116✔
429
        // Register the actual stored matcher (may have been reconstructed from metadata by WritableStorage.createIndex).
116✔
430
        this.indexMatcher.add(name, this.secondaryIndexes[name].matcher);
116✔
431

116✔
432
        index.open();
116✔
433
        return index;
116✔
434
    }
1,104✔
435

4✔
436
    /**
4✔
437
     * Remove a secondary index from the write path and the matcher lookup table.
4✔
438
     *
4✔
439
     * @api
4✔
440
     * @param {string} name The secondary index name to remove.
4✔
441
     */
4✔
442
    removeSecondaryIndex(name) {
4✔
443
        const entry = this.secondaryIndexes[name];
44✔
444
        if (entry) {
44✔
445
            this.indexMatcher.remove(name);
40✔
446
            delete this.secondaryIndexes[name];
40✔
447
        }
40✔
448
    }
44✔
449

4✔
450
    /**
4✔
451
     * Iterate documents across all partitions in sequenceNumber order using a k-way merge.
4✔
452
     * Opens any closed partition automatically.
4✔
453
     *
4✔
454
     * @protected
4✔
455
     * @param {number} [from=0] The 0-based sequenceNumber to start from (inclusive).
4✔
456
     * @param {number} [until=Number.MAX_SAFE_INTEGER] The 0-based sequenceNumber to read until (inclusive).
4✔
457
     * @returns {Generator<{document: object, sequenceNumber: number, partitionName: string, position: number, size: number, partition: number}>}
4✔
458
     */
4✔
459
    *iterateDocumentsNoIndex(from = 0, until = Number.MAX_SAFE_INTEGER) {
4✔
460
        const streams = [];
64✔
461

64✔
462
        this.forEachPartition(partition => {
64✔
463
            if (!partition.isOpen()) {
108✔
464
                partition.open();
52✔
465
            }
52✔
466

108✔
467
            const found = partition.findDocument(from);
108✔
468
            if (found && found.headerOut.sequenceNumber <= until) {
108✔
469
                const nextPosition = found.headerOut.position + partition.documentWriteSize(found.headerOut.dataSize);
92✔
470
                const reader = partition.readAll(nextPosition, found.headerOut);
92✔
471
                streams.push({ ...found, reader, partition: partition.id, partitionName: partition.name });
92✔
472
            }
92✔
473
        });
64✔
474

64✔
475
        const items = [];
64✔
476
        kWayMerge(
64✔
477
            streams,
64✔
478
            stream => stream.headerOut.sequenceNumber,
64✔
479
            stream => {
64✔
480
                const next = stream.reader.next();
228✔
481
                if (!next.done && stream.headerOut.sequenceNumber <= until) {
228✔
482
                    stream.data = next.value;
136✔
483
                    return true;
136✔
484
                }
136✔
485
                return false;
92✔
486
            },
64✔
487
            stream => items.push({
64✔
488
                document: this.serializer.deserialize(stream.data),
228✔
489
                sequenceNumber: stream.headerOut.sequenceNumber,
228✔
490
                partitionName: stream.partitionName,
228✔
491
                position: stream.headerOut.position,
228✔
492
                size: stream.headerOut.dataSize,
228✔
493
                partition: stream.partition,
228✔
494
            })
228✔
495
        );
64✔
496

64✔
497
        yield* items;
64✔
498
    }
64✔
499

4✔
500
    /**
4✔
501
     * Helper method to iterate over all documents, invoking a callback for each one.
4✔
502
     * Pass `noIndex = true` to iterate all partitions directly in sequenceNumber order
4✔
503
     * (useful when the global index is unavailable or corrupted).
4✔
504
     * When `noIndex` is false the second callback argument is the raw index `EntryInterface`.
4✔
505
     * When `noIndex` is true the second callback argument has `{ partition, position, size, sequenceNumber, partitionName }`.
4✔
506
     *
4✔
507
     * @protected
4✔
508
     * @param {function(object, object): void} iterationHandler
4✔
509
     * @param {boolean} [noIndex=false] When true, bypasses the index and iterates partitions directly.
4✔
510
     */
4✔
511
    forEachDocument(iterationHandler, noIndex = false) {
4✔
512
        /* istanbul ignore if  */
452✔
513
        if (typeof iterationHandler !== 'function') {
452!
514
            return;
×
515
        }
×
516

452✔
517
        if (noIndex) {
452✔
518
            for (const { document, ...entryInfo } of this.iterateDocumentsNoIndex()) {
4✔
519
                iterationHandler(document, entryInfo);
24✔
520
            }
24✔
521
            return;
4✔
522
        }
4✔
523

448✔
524
        const entries = this.index.all();
448✔
525

448✔
526
        for (let entry of entries) {
452✔
527
            const document = this.readFrom(entry.partition, entry.position, entry.size);
36✔
528
            iterationHandler(document, entry);
36✔
529
        }
36✔
530
    }
452✔
531

4✔
532
    /**
4✔
533
     * Helper method to iterate over all secondary indexes.
4✔
534
     *
4✔
535
     * When `matchDocument` is provided, `this.indexMatcher.forEachMatch()` is used to
4✔
536
     * efficiently find only the matching indexes via the discriminant lookup table.
4✔
537
     *
4✔
538
     * @protected
4✔
539
     * @param {function(ReadableIndex, string)} iterationHandler
4✔
540
     * @param {object} [matchDocument] If supplied, only indexes the document matches on will be iterated.
4✔
541
     */
4✔
542
    forEachSecondaryIndex(iterationHandler, matchDocument) {
4✔
543
        /* istanbul ignore if  */
5,564✔
544
        if (typeof iterationHandler !== 'function') {
5,564!
545
            return;
×
546
        }
×
547

5,564✔
548
        if (!matchDocument) {
5,564✔
549
            // No document filter: iterate all secondary indexes unconditionally.
2,112✔
550
            for (const indexName of Object.keys(this.secondaryIndexes)) {
2,112✔
551
                iterationHandler(this.secondaryIndexes[indexName].index, indexName);
1,596✔
552
            }
1,596✔
553
            return;
2,112✔
554
        }
2,112✔
555

3,452✔
556
        this.indexMatcher.forEachMatch(matchDocument, indexName => {
3,452✔
557
            iterationHandler(this.secondaryIndexes[indexName].index, indexName);
1,976✔
558
        });
3,452✔
559
    }
5,564✔
560

4✔
561
    /**
4✔
562
     * Helper method to iterate over all partitions.
4✔
563
     *
4✔
564
     * @protected
4✔
565
     * @param {function(ReadablePartition)} iterationHandler
4✔
566
     */
4✔
567
    forEachPartition(iterationHandler) {
4✔
568
        /* istanbul ignore if  */
2,148✔
569
        if (typeof iterationHandler !== 'function') {
2,148!
570
            return;
×
571
        }
×
572

2,148✔
573
        this.partitions.forEach(iterationHandler);
2,148✔
574
    }
2,148✔
575

4✔
576
}
4✔
577

4✔
578
export default ReadableStorage;
4✔
579
export { matches };
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc