• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

albe / node-event-storage / 26133545023

20 May 2026 12:24AM UTC coverage: 97.956% (-0.2%) from 98.106%
26133545023

Pull #316

github

web-flow
Merge 515a2b27f into 03ae7bb7f
Pull Request #316: Add `event-storage-http` REST API layer with NDJSON reads, consumer endpoints, and raw buffer streaming

1078 of 1131 branches covered (95.31%)

Branch coverage included in aggregate %.

258 of 270 new or added lines in 10 files covered. (95.56%)

22 existing lines in 5 files now uncovered.

5536 of 5621 relevant lines covered (98.49%)

823.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.93
/src/Storage/ReadableStorage.js
1
import fs from 'fs';
4✔
2
import path from 'path';
4✔
3
import events from 'events';
4✔
4
import Partition, { ReadOnly as ReadOnlyPartition } from '../Partition.js';
4✔
5
import Index, { ReadOnly as ReadOnlyIndex } from '../Index.js';
4✔
6
import { assert, wrapAndCheck, iterate, kWayMerge } from '../util.js';
4✔
7
import { scanForFiles } from '../fsUtil.js';
4✔
8
import { createHmac, matches, buildMetadataForMatcher } from '../metadataUtil.js';
4✔
9
import IndexMatcher from '../IndexMatcher.js';
4✔
10
import PartitionPool from '../PartitionPool.js';
4✔
11

4✔
12
const DEFAULT_READ_BUFFER_SIZE = 4 * 1024;
4✔
13
const NDJSON_NEWLINE = Buffer.from('\n');
4✔
14

4✔
15
/**
4✔
16
 * Default ordered list of document property paths used as discriminant keys when
4✔
17
 * classifying object matchers into the fast-lookup table.  Each path may use
4✔
18
 * dot-notation for nested access (e.g. `'payload.type'`).  The first path that
4✔
19
 * resolves to a scalar value in a given matcher wins; remaining paths are not
4✔
20
 * examined for that matcher.
4✔
21
 */
4✔
22
const DEFAULT_MATCHER_PROPERTIES = ['stream', 'payload.type'];
4✔
23

4✔
24
/**
4✔
25
 * Default maximum number of partition file descriptors kept open simultaneously.
4✔
26
 * Partitions beyond this limit are evicted using LRU order. 0 disables the limit.
4✔
27
 */
4✔
28
const DEFAULT_MAX_OPEN_PARTITIONS = 1024;
4✔
29

4✔
30
/**
4✔
31
 * @typedef {object|function(object):boolean} Matcher
4✔
32
 */
4✔
33

4✔
34
/**
4✔
35
 * An append-only storage with highly performant positional range scans.
4✔
36
 * It's highly optimized for an event-store and hence does not support compaction or data-rewrite, nor any querying
4✔
37
 */
4✔
38
class ReadableStorage extends events.EventEmitter {
4✔
39

4✔
40
    /**
4✔
41
     * @param {string} [storageName] The name of the storage.
4✔
42
     * @param {object} [config] An object with storage parameters.
4✔
43
     * @param {object} [config.serializer] A serializer object with methods serialize(document) and deserialize(data).
4✔
44
     * @param {function(object): string} config.serializer.serialize Default is JSON.stringify.
4✔
45
     * @param {function(string): object} config.serializer.deserialize Default is JSON.parse.
4✔
46
     * @param {string} [config.dataDirectory] The path where the storage data should reside. Default '.'.
4✔
47
     * @param {string} [config.indexDirectory] The path where the indexes should be stored. Defaults to dataDirectory.
4✔
48
     * @param {string} [config.indexFile] The name of the primary index. Default '{storageName}.index'.
4✔
49
     * @param {number} [config.readBufferSize] Size of the read buffer in bytes. Default 4096.
4✔
50
     * @param {object} [config.indexOptions] An options object that should be passed to all indexes on construction.
4✔
51
     * @param {string} [config.hmacSecret] A private key that is used to verify matchers retrieved from indexes.
4✔
52
     * @param {object} [config.metadata] A metadata object to be stored in all partitions belonging to this storage.
4✔
53
     * @param {string[]} [config.matcherProperties] Ordered list of document property paths (dot-notation) used as
4✔
54
     *   discriminant keys for the fast secondary-index lookup table. Only the first property that resolves to a scalar
4✔
55
     *   value inside a given object matcher is used; the rest are checked via the full `matches()` fallback.
4✔
56
     *   Default: `['stream', 'payload.type']`.
4✔
57
     * @param {number} [config.maxOpenPartitions] Maximum number of partition file descriptors kept open at one time.
4✔
58
     *   When the limit is reached the least-recently-used partition is closed to make room. 0 disables the limit.
4✔
59
     *   Default: 1024.
4✔
60
     */
4✔
61
    constructor(storageName = 'storage', config = {}) {
4✔
62
        super();
1,252✔
63
        if (typeof storageName !== 'string') {
1,252✔
64
            config = storageName;
52✔
65
            storageName = undefined;
52✔
66
        }
52✔
67

1,252✔
68
        this.storageFile = storageName || 'storage';
1,252✔
69
        const defaults = {
1,252✔
70
            serializer: { serialize: JSON.stringify, deserialize: JSON.parse },
1,252✔
71
            dataDirectory: '.',
1,252✔
72
            indexFile: this.storageFile + '.index',
1,252✔
73
            indexOptions: {},
1,252✔
74
            hmacSecret: '',
1,252✔
75
            metadata: {},
1,252✔
76
            matcherProperties: DEFAULT_MATCHER_PROPERTIES,
1,252✔
77
            maxOpenPartitions: DEFAULT_MAX_OPEN_PARTITIONS
1,252✔
78
        };
1,252✔
79
        config = Object.assign(defaults, config);
1,252✔
80
        this.serializer = config.serializer;
1,252✔
81

1,252✔
82
        this.hmac = createHmac(config.hmacSecret);
1,252✔
83

1,252✔
84
        this.dataDirectory = path.resolve(config.dataDirectory);
1,252✔
85

1,252✔
86
        const partitionDefaults = { readBufferSize: DEFAULT_READ_BUFFER_SIZE };
1,252✔
87
        this.partitionConfig = Object.assign(partitionDefaults, config);
1,252✔
88
        this.partitions = new PartitionPool(config.maxOpenPartitions);
1,252✔
89

1,252✔
90
        // initialized: null = not started (or scan cancelled), false = in progress, true = done
1,252✔
91
        this.initialized = null;
1,252✔
92

1,252✔
93
        this.initializeIndexes(config);
1,252✔
94
    }
1,252✔
95

4✔
96
    /**
4✔
97
     * @protected
4✔
98
     * @param {string} name
4✔
99
     * @param {object} [options]
4✔
100
     * @returns {{ index: ReadableIndex, matcher?: Matcher }}
4✔
101
     */
4✔
102
    createIndex(name, options = {}) {
4✔
103
        /** @type ReadableIndex */
136✔
104
        const index = new ReadOnlyIndex(name, options);
136✔
105
        return { index };
136✔
106
    }
136✔
107

4✔
108
    /**
4✔
109
     * @protected
4✔
110
     * @param {string} name
4✔
111
     * @param {object} [options]
4✔
112
     * @returns {ReadablePartition}
4✔
113
     */
4✔
114
    createPartition(name, options = {}) {
4✔
115
        return new ReadOnlyPartition(name, options);
52✔
116
    }
52✔
117

4✔
118
    /**
4✔
119
     * Create/open the primary index and build the base configuration for all secondary indexes.
4✔
120
     *
4✔
121
     * @private
4✔
122
     * @param {object} config The configuration object
4✔
123
     * @returns void
4✔
124
     */
4✔
125
    initializeIndexes(config) {
4✔
126
        this.indexDirectory = path.resolve(config.indexDirectory || this.dataDirectory);
1,252✔
127

1,252✔
128
        this.indexOptions = config.indexOptions;
1,252✔
129
        this.indexOptions.dataDirectory = this.indexDirectory;
1,252✔
130
        // Safety precaution to prevent accidentally restricting main index
1,252✔
131
        delete this.indexOptions.matcher;
1,252✔
132
        const { index } = this.createIndex(config.indexFile, this.indexOptions);
1,252✔
133
        this.index = index;
1,252✔
134
        this.secondaryIndexes = {};
1,252✔
135
        this.readonlyIndexes = {};
1,252✔
136

1,252✔
137
        /** Fast secondary-index lookup — classifies matchers for O(1) candidate resolution on write. */
1,252✔
138
        this.indexMatcher = new IndexMatcher(config.matcherProperties);
1,252✔
139
    }
1,252✔
140

4✔
141
    /**
4✔
142
     * The amount of documents in the storage.
4✔
143
     * @returns {number}
4✔
144
     */
4✔
145
    get length() {
4✔
146
        return this.index.length;
5,056✔
147
    }
5,056✔
148

4✔
149
    /**
4✔
150
     * Scan partitions and secondary index files; emit 'index-created' for each found index.
4✔
151
     * @param {function} done Called when both scans finish.
4✔
152
     */
4✔
153
    scanFiles(done) {
4✔
154
        const escaped = this.storageFile.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
1,188✔
155
        const partitionPattern = new RegExp(`^(${escaped}.*)$`);
1,188✔
156
        scanForFiles(this.dataDirectory, partitionPattern, (file) => {
1,188✔
157
            if (file.endsWith('.index') || file.endsWith('.branch') || file.endsWith('.lock')) return;
930✔
158
            const partition = this.createPartition(file, this.partitionConfig);
294✔
159
            this.partitions.add(partition.id, partition);
294✔
160
        }, (partErr) => {
1,188✔
161
            /* istanbul ignore if */
1,188✔
162
            if (partErr) throw partErr;
1,188✔
163

1,184✔
164
            // Scan was cancelled by close() between the two scan phases.
1,184✔
165
            if (this.initialized === null) return;
1,188✔
166

340✔
167
            // No secondary indexes exist yet — nothing to scan.
340✔
168
            if (!fs.existsSync(this.indexDirectory)) {
1,188!
169
                return done();
×
170
            }
×
171
            const indexPattern = new RegExp(`^${escaped}\\.(.+)\\.index$`);
340✔
172
            scanForFiles(this.indexDirectory, indexPattern, (name) => {
340✔
173
                this.emit('index-created', name);
152✔
174
            }, (indexErr) => {
340✔
175
                // The directory could disappear between existsSync and readdir (e.g. test cleanup).
340✔
176
                /* istanbul ignore if */
340✔
177
                if (indexErr && indexErr.code !== 'ENOENT') throw indexErr;
340!
178
                done();
340✔
179
            });
340✔
180
        });
1,188✔
181
    }
1,188✔
182

4✔
183
    /**
4✔
184
     * Only the primary index is opened eagerly; secondary indexes open on demand.
4✔
185
     *
4✔
186
     * @protected
4✔
187
     */
4✔
188
    openIndexes() {
4✔
189
        this.index.open();
204✔
190
    }
204✔
191

4✔
192
    /**
4✔
193
     * Open the storage; scans existing partitions and indexes asynchronously on first open.
4✔
194
     * Re-opens after `close()` are synchronous.
4✔
195
     * Will emit an `'opened'` event when finished.
4✔
196
     *
4✔
197
     * @api
4✔
198
     * @param {function(): void} [callback] Called after indexes open, before `'opened'` is emitted.
4✔
199
     *   Can be used as a synchronous alternative to listening to the `'opened'` event.
4✔
200
     * @returns {boolean}
4✔
201
     */
4✔
202
    open(callback) {
4✔
203
        if (this.initialized === true) {
1,200✔
204
            this.openIndexes();
8✔
205
            callback?.();
8✔
206
            this.emit('opened');
8✔
207
            return true;
8✔
208
        }
8✔
209
        if (this.initialized === false) {
1,200✔
210
            return true;
4✔
211
        }
4✔
212
        this.initialized = false;
1,188✔
213
        this.scanFiles(() => {
1,188✔
214
            // Guard: close() while scanning resets initialized to null.
340✔
215
            if (this.initialized === null) return;
340✔
216
            this.initialized = true;
196✔
217
            this.openIndexes();
196✔
218
            callback?.();
340✔
219
            this.emit('opened');
340✔
220
        });
1,188✔
221
        return true;
1,188✔
222
    }
1,200✔
223

4✔
224
    /**
4✔
225
     * Close the storage and free up all resources.
4✔
226
     * Will emit a 'closed' event when finished.
4✔
227
     *
4✔
228
     * @api
4✔
229
     */
4✔
230
    close() {
4✔
231
        // Cancel in-progress scan so the callback does not re-open after an explicit close.
1,860✔
232
        if (this.initialized === false) {
1,860✔
233
            this.initialized = null;
988✔
234
        }
988✔
235
        this.index.close();
1,860✔
236
        this.forEachSecondaryIndex(index => index.close());
1,860✔
237
        for (let index of Object.values(this.readonlyIndexes)) {
1,860✔
238
            index.close();
44✔
239
        }
44✔
240
        this.forEachPartition(partition => partition.close());
1,860✔
241
        this.emit('closed');
1,860✔
242
    }
1,860✔
243

4✔
244
    /**
4✔
245
     * Get a partition by its id.
4✔
246
     * If a partition with the given id does not exist, an error is thrown.
4✔
247
     *
4✔
248
     * @protected
4✔
249
     * @param {number|string} partitionIdentifier The partition Id
4✔
250
     * @returns {ReadablePartition}
4✔
251
     * @throws {Error} If no such partition exists.
4✔
252
     */
4✔
253
    getPartition(partitionIdentifier) {
4✔
254
        assert(this.partitions.has(partitionIdentifier), `Partition #${partitionIdentifier} does not exist.`);
5,876✔
255
        return this.partitions.open(partitionIdentifier);
5,876✔
256
    }
5,876✔
257

4✔
258
    /**
4✔
259
     * Register a handler that is called before a document is read from a partition.
4✔
260
     * The handler receives the position and the partition metadata and may throw to abort the read.
4✔
261
     * Multiple handlers can be registered; all run on every read in registration order.
4✔
262
     * Equivalent to `storage.on('preRead', hook)`.
4✔
263
     *
4✔
264
     * @api
4✔
265
     * @param {function(number, object): void} hook A function receiving (position, partitionMetadata).
4✔
266
     */
4✔
267
    preRead(hook) {
4✔
268
        this.on('preRead', hook);
12✔
269
    }
12✔
270

4✔
271
    /**
4✔
272
     * @protected
4✔
273
     * @param {number} partitionId The partition to read from.
4✔
274
     * @param {number} position The file position to read from.
4✔
275
     * @param {number} [size] The expected byte size of the document at the given position.
4✔
276
     * @param {boolean} [raw] Whether to return raw buffers instead of deserialized objects. Default false.
4✔
277
     * @param {boolean} [backwardsHint] If set to true, will optimize buffering for backwards reading.
4✔
278
     * @returns {object} The document stored at the given position.
4✔
279
     * @throws {Error} if the document at the given position can not be deserialized.
4✔
280
     */
4✔
281
    readFrom(partitionId, position, size, raw = false, backwardsHint = false) {
4✔
282
        const partition = this.getPartition(partitionId);
2,436✔
283
        if (this.listenerCount('preRead') > 0) {
2,436✔
284
            this.emit('preRead', position, partition.metadata);
64✔
285
        }
64✔
286
        const headerOut = {};
2,428✔
287
        const buffer = partition.readFrom(position, size, headerOut, backwardsHint);
2,428✔
288
        return raw ? { buffer, time64: headerOut.time64, sequenceNumber: headerOut.sequenceNumber } : this.serializer.deserialize(buffer.toString('utf8'));
2,436!
289
    }
2,436✔
290

4✔
291
    /**
4✔
292
     * Read a single document from the given position, in the full index or in the provided index.
4✔
293
     *
4✔
294
     * @api
4✔
295
     * @param {number} number The 1-based document number (inside the given index) to read.
4✔
296
     * @param {ReadableIndex} [index] The index to use for finding the document position.
4✔
297
     * @returns {object} The document at the given position inside the index.
4✔
298
     */
4✔
299
    read(number, index) {
4✔
300
        index = index || this.index;
408✔
301
        index.open();
408✔
302

408✔
303
        const entry = index.get(number);
408✔
304
        if (entry === false) {
408✔
305
            return false;
4✔
306
        }
4✔
307

404✔
308
        return this.readFrom(entry.partition, entry.position, entry.size);
404✔
309
    }
408✔
310

4✔
311
    /**
4✔
312
     * Read a range of documents from the given position range, in the full index or in the provided index.
4✔
313
     * Returns a generator in order to reduce memory usage and be able to read lots of documents with little latency.
4✔
314
     *
4✔
315
     * @api
4✔
316
     * @param {number} from The 1-based document number (inclusive) to start reading from.
4✔
317
     * @param {number} [until] The 1-based document number (inclusive) to read until. Defaults to index.length.
4✔
318
     * @param {ReadableIndex|false} [index] The index to use for finding the documents in the range.
4✔
319
     *   Pass `false` to skip the global index and iterate all partitions directly in sequenceNumber order
4✔
320
     *   (useful when the global index is unavailable or corrupted).
4✔
321
     * @param {boolean} [raw] Whether to return raw buffers instead of deserialized objects. Default false.
4✔
322
     * @returns {Generator<object>} A generator that will read each document in the range one by one.
4✔
323
     */
4✔
324
    *readRange(from, until = -1, index = null, raw = false) {
4✔
325
        let length = Number.MAX_SAFE_INTEGER;
712✔
326
        if (index !== false) {
712✔
327
            index = index || this.index;
700✔
328
            index.open();
700✔
329
            length = index.length;
700✔
330
        }
700✔
331

712✔
332
        const readFrom = wrapAndCheck(from, length);
712✔
333
        const readUntil = wrapAndCheck(until, length);
712✔
334
        assert(readFrom > 0 && readUntil > 0, `Range scan error for range ${from} - ${until}.`);
712✔
335

712✔
336
        yield* this.iterateRange(readFrom, readUntil, index, raw);
712✔
337
    }
712✔
338

4✔
339
    /**
4✔
340
     * Iterate all documents in this storage in range from to until inside the index.
4✔
341
     * If index is false, iterates all partitions directly in sequenceNumber order.
4✔
342
     * @private
4✔
343
     * @param {number} from
4✔
344
     * @param {number} until
4✔
345
     * @param {ReadableIndex|false|null} index
4✔
346
     * @param {boolean} [raw] Whether to return raw buffers instead of deserialized objects. Default false.
4✔
347
     * @returns {Generator<object>}
4✔
348
     */
4✔
349
    *iterateRange(from, until, index, raw = false) {
4✔
350
        if (index === false) {
696✔
351
            for (const entry of this.iterateDocumentsNoIndex(from - 1, until - 1)) {
12✔
352
                yield entry.document;
92✔
353
            }
92✔
354
            return;
12✔
355
        }
12✔
356

684✔
357
        const idx = index || this.index;
696!
358
        const forwards = from <= until;
696✔
359
        const lo = Math.min(from, until);
696✔
360
        const hi = Math.max(from, until);
696✔
361
        const entries = idx.range(lo, hi);
696✔
362
        if (!entries) return;
696!
363
        for (const entry of iterate(entries, forwards)) {
696✔
364
            yield this.readFrom(entry.partition, entry.position, entry.size, raw, !forwards);
1,980✔
365
        }
1,948✔
366
    }
696✔
367

4✔
368
    /**
4✔
369
     * Open an existing readonly index for reading, without registering it in the secondary indexes write path.
4✔
370
     * Use this for indexes whose files carry a status marker (e.g. `stream-foo.closed.index`).
4✔
371
     *
4✔
372
     * @api
4✔
373
     * @param {string} name The readonly index name (e.g. 'stream-foo.closed').
4✔
374
     * @returns {ReadableIndex}
4✔
375
     * @throws {Error} if the readonly index does not exist.
4✔
376
     */
4✔
377
    openReadonlyIndex(name) {
4✔
378
        if (name in this.readonlyIndexes) {
44!
379
            return this.readonlyIndexes[name];
×
380
        }
×
381
        const indexName = this.storageFile + '.' + name + '.index';
44✔
382
        assert(fs.existsSync(path.join(this.indexDirectory, indexName)), `Index "${name}" does not exist.`);
44✔
383
        const { index } = this.createIndex(indexName, Object.assign({}, this.indexOptions));
44✔
384
        index.open();
44✔
385
        this.readonlyIndexes[name] = index;
44✔
386
        return index;
44✔
387
    }
44✔
388

4✔
389
    /**
4✔
390
     * Open an existing index.
4✔
391
     *
4✔
392
     * @api
4✔
393
     * @param {string} name The index name.
4✔
394
     * @param {Matcher} [matcher] The matcher object or function that the index needs to have been defined with. If not given it will not be validated.
4✔
395
     * @returns {ReadableIndex}
4✔
396
     * @throws {Error} if the index with that name does not exist.
4✔
397
     * @throws {Error} if the HMAC for the matcher does not match.
4✔
398
     */
4✔
399
    openIndex(name, matcher) {
4✔
400
        if (name === '_all') {
1,120✔
401
            return this.index;
8✔
402
        }
8✔
403
        if (name in this.secondaryIndexes) {
1,120✔
404
            return this.secondaryIndexes[name].index;
996✔
405
        }
996✔
406

116✔
407
        const indexName = this.storageFile + '.' + name + '.index';
116✔
408
        assert(fs.existsSync(path.join(this.indexDirectory, indexName)), `Index "${name}" does not exist.`);
116✔
409

116✔
410
        const metadata = buildMetadataForMatcher(matcher, this.hmac);
116✔
411
        let { index } = this.secondaryIndexes[name] = this.createIndex(indexName, Object.assign({}, this.indexOptions, { metadata }));
116✔
412

116✔
413
        // Register the actual stored matcher (may have been reconstructed from metadata by WritableStorage.createIndex).
116✔
414
        this.indexMatcher.add(name, this.secondaryIndexes[name].matcher);
116✔
415

116✔
416
        index.open();
116✔
417
        return index;
116✔
418
    }
1,120✔
419

4✔
420
    /**
4✔
421
     * Remove a secondary index from the write path and the matcher lookup table.
4✔
422
     *
4✔
423
     * @api
4✔
424
     * @param {string} name The secondary index name to remove.
4✔
425
     */
4✔
426
    removeSecondaryIndex(name) {
4✔
427
        const entry = this.secondaryIndexes[name];
44✔
428
        if (entry) {
44✔
429
            this.indexMatcher.remove(name);
40✔
430
            delete this.secondaryIndexes[name];
40✔
431
        }
40✔
432
    }
44✔
433

4✔
434
    /**
4✔
435
     * Build the standard document result entry from a merge stream state.
4✔
436
     * @private
4✔
437
     */
4✔
438
    buildDocumentEntry(stream) {
4✔
439
        return {
228✔
440
            document: this.serializer.deserialize(stream.data.toString('utf8')),
228✔
441
            sequenceNumber: stream.header.sequenceNumber,
228✔
442
            partitionName: stream.partitionName,
228✔
443
            position: stream.header.position ?? stream.position,
228!
444
            size: stream.header.dataSize,
228✔
445
            partition: stream.partition,
228✔
446
        };
228✔
447
    }
228✔
448

4✔
449
    /**
4✔
450
     * Iterate documents across all partitions in sequenceNumber order using a k-way merge.
4✔
451
     * Opens any closed partition automatically.
4✔
452
     *
4✔
453
     * @protected
4✔
454
     * @param {number} [from=0] The 0-based sequenceNumber to start from (inclusive).
4✔
455
     * @param {number} [until=Number.MAX_SAFE_INTEGER] The 0-based sequenceNumber to read until (inclusive).
4✔
456
     * @returns {Generator<{document: object, sequenceNumber: number, partitionName: string, position: number, size: number, partition: number}>}
4✔
457
     */
4✔
458
    *iterateDocumentsNoIndex(from = 0, until = Number.MAX_SAFE_INTEGER) {
4✔
459
        const streams = [];
64✔
460

64✔
461
        const forwards = from <= until;
64✔
462
        const lo = Math.min(from, until);
64✔
463
        const hi = Math.max(from, until);
64✔
464
        this.forEachPartition(partition => {
64✔
465
            partition.open();
108✔
466
            const found = partition.findDocument(forwards ? lo : hi, forwards);
108✔
467
            if (!found || found.header.sequenceNumber < lo  || found.header.sequenceNumber > hi) {
108✔
468
                return;
16✔
469
            }
16✔
470

92✔
471
            // Keep the located upper-bound document as current stream item and continue before it.
92✔
472
            found.header.position = found.position;
92✔
473
            const reader = forwards ?
108✔
474
                partition.readAll(found.position + partition.documentWriteSize(found.header.dataSize), found.header)
80✔
475
                : partition.readAllBackwards(found.position, found.header);
108✔
476
            streams.push({ ...found, reader, partition: partition.id, partitionName: partition.name });
108✔
477
        });
64✔
478

64✔
479
        yield* kWayMerge(
64✔
480
            streams,
64✔
481
            stream => stream.header.sequenceNumber,
64✔
482
            stream => {
64✔
483
                const next = stream.reader.next();
228✔
484
                if (!next.done && stream.header.sequenceNumber >= lo && stream.header.sequenceNumber <= hi) {
228✔
485
                    stream.data = next.value;
136✔
486
                    return true;
136✔
487
                }
136✔
488
                return false;
92✔
489
            },
64✔
490
            stream => this.buildDocumentEntry(stream),
64✔
491
            forwards
64✔
492
        );
64✔
493
    }
64✔
494

4✔
495
    /**
4✔
496
     * Helper method to iterate over all documents, invoking a callback for each one.
4✔
497
     * Pass `noIndex = true` to iterate all partitions directly in sequenceNumber order
4✔
498
     * (useful when the global index is unavailable or corrupted).
4✔
499
     * When `noIndex` is false the second callback argument is the raw index `EntryInterface`.
4✔
500
     * When `noIndex` is true the second callback argument has `{ partition, position, size, sequenceNumber, partitionName }`.
4✔
501
     *
4✔
502
     * @protected
4✔
503
     * @param {function(object, object): void} iterationHandler
4✔
504
     * @param {boolean} [noIndex=false] When true, bypasses the index and iterates partitions directly.
4✔
505
     */
4✔
506
    forEachDocument(iterationHandler, noIndex = false) {
4✔
507
        /* istanbul ignore if  */
456✔
508
        if (typeof iterationHandler !== 'function') {
456!
UNCOV
509
            return;
×
UNCOV
510
        }
×
511

456✔
512
        if (noIndex) {
456✔
513
            for (const { document, ...entryInfo } of this.iterateDocumentsNoIndex()) {
4✔
514
                iterationHandler(document, entryInfo);
24✔
515
            }
24✔
516
            return;
4✔
517
        }
4✔
518

452✔
519
        const entries = this.index.all();
452✔
520

452✔
521
        for (let entry of entries) {
456✔
522
            const document = this.readFrom(entry.partition, entry.position, entry.size);
40✔
523
            iterationHandler(document, entry);
40✔
524
        }
40✔
525
    }
456✔
526

4✔
527
    /**
4✔
528
     * Helper method to iterate over all secondary indexes.
4✔
529
     *
4✔
530
     * When `matchDocument` is provided, `this.indexMatcher.forEachMatch()` is used to
4✔
531
     * efficiently find only the matching indexes via the discriminant lookup table.
4✔
532
     *
4✔
533
     * @protected
4✔
534
     * @param {function(ReadableIndex, string)} iterationHandler
4✔
535
     * @param {object} [matchDocument] If supplied, only indexes the document matches on will be iterated.
4✔
536
     */
4✔
537
    forEachSecondaryIndex(iterationHandler, matchDocument) {
4✔
538
        /* istanbul ignore if  */
5,592✔
539
        if (typeof iterationHandler !== 'function') {
5,592!
UNCOV
540
            return;
×
UNCOV
541
        }
×
542

5,592✔
543
        if (!matchDocument) {
5,592✔
544
            // No document filter: iterate all secondary indexes unconditionally.
2,128✔
545
            for (const indexName of Object.keys(this.secondaryIndexes)) {
2,128✔
546
                iterationHandler(this.secondaryIndexes[indexName].index, indexName);
1,612✔
547
            }
1,612✔
548
            return;
2,128✔
549
        }
2,128✔
550

3,464✔
551
        this.indexMatcher.forEachMatch(matchDocument, indexName => {
3,464✔
552
            iterationHandler(this.secondaryIndexes[indexName].index, indexName);
1,992✔
553
        });
3,464✔
554
    }
5,592✔
555

4✔
556
    /**
4✔
557
     * Helper method to iterate over all partitions.
4✔
558
     *
4✔
559
     * @protected
4✔
560
     * @param {function(ReadablePartition)} iterationHandler
4✔
561
     */
4✔
562
    forEachPartition(iterationHandler) {
4✔
563
        /* istanbul ignore if  */
2,164✔
564
        if (typeof iterationHandler !== 'function') {
2,164!
UNCOV
565
            return;
×
UNCOV
566
        }
×
567

2,164✔
568
        this.partitions.forEach(iterationHandler);
2,164✔
569
    }
2,164✔
570

4✔
571
}
4✔
572

4✔
573
export default ReadableStorage;
4✔
574
export { matches };
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc