• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

albe / node-event-storage / 23773194165

30 Mar 2026 11:43PM UTC coverage: 97.958% (-0.2%) from 98.121%
23773194165

Pull #287

github

web-flow
Merge ecbe4dd31 into 2da6b5220
Pull Request #287: Fix readAllBackwards ignoring write buffer when dirtyReads is disabled

715 of 751 branches covered (95.21%)

Branch coverage included in aggregate %.

11 of 12 new or added lines in 1 file covered. (91.67%)

5 existing lines in 1 file now uncovered.

1588 of 1600 relevant lines covered (99.25%)

1286.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.12
/src/Partition/ReadablePartition.js
1
const fs = require('fs');
4✔
2
const path = require('path');
4✔
3
const events = require('events');
4✔
4
const { assert, alignTo, hash } = require('../util');
4✔
5

6
const DEFAULT_READ_BUFFER_SIZE = 64 * 1024;
4✔
7
const DOCUMENT_HEADER_SIZE = 16;
4✔
8
const DOCUMENT_ALIGNMENT = 4;
4✔
9
const DOCUMENT_SEPARATOR = "\x00\x00\x1E\n";
4✔
10
const DOCUMENT_FOOTER_SIZE = 4 /* additional data size footer */ + DOCUMENT_SEPARATOR.length;
4✔
11

12
// node-event-store partition V03
13
const HEADER_MAGIC = "nesprt03";
4✔
14

15
const NES_EPOCH = new Date('2020-01-01T00:00:00');
4✔
16

17
class CorruptFileError extends Error {}
18
class InvalidDataSizeError extends Error {}
19

20
/**
21
 * A partition is a single file where the storage will write documents to depending on some partitioning rules.
22
 * In the case of an event store, this is most likely the (write) streams.
23
 */
24
class ReadablePartition extends events.EventEmitter {
25

26
    /**
27
     * Get the id for a specific partition name.
28
     *
29
     * @param {string} name
30
     * @returns {number}
31
     */
32
    static idFor(name) {
33
        return hash(name);
4,368✔
34
    }
35

36
    /**
37
     * @param {string} name The name of the partition.
38
     * @param {object} [config] An object with storage parameters.
39
     * @param {string} [config.dataDirectory] The path where the storage data should reside. Default '.'.
40
     * @param {number} [config.readBufferSize] Size of the read buffer in bytes. Default 4096.
41
     */
42
    constructor(name, config = {}) {
×
43
        super();
1,404✔
44
        assert(typeof name === 'string' && name !== '', 'Must specify a partition name.');
1,404✔
45

46
        let defaults = {
1,396✔
47
            dataDirectory: '.',
48
            readBufferSize: DEFAULT_READ_BUFFER_SIZE
49
        };
50
        config = Object.assign(defaults, config);
1,396✔
51
        this.dataDirectory = path.resolve(config.dataDirectory);
1,396✔
52

53
        this.name = name;
1,396✔
54
        this.id = ReadablePartition.idFor(name);
1,396✔
55
        this.fileName = path.resolve(this.dataDirectory, this.name);
1,396✔
56
        this.fileMode = 'r';
1,396✔
57
        this.headerSize = 0;
1,396✔
58

59
        this.readBufferSize = config.readBufferSize >>> 0;  // jshint ignore:line
1,396✔
60
    }
61

62
    /**
63
     * Check if the partition file is opened.
64
     *
65
     * @returns {boolean}
66
     */
67
    isOpen() {
68
        return !!this.fd;
120✔
69
    }
70

71
    /**
72
     * Open the partition storage and create read buffers.
73
     *
74
     * @api
75
     * @returns {boolean} Returns false if the file is not a valid partition.
76
     */
77
    open() {
78
        if (this.fd) {
2,732✔
79
            return true;
16✔
80
        }
81

82
        this.fd = fs.openSync(this.fileName, this.fileMode);
2,716✔
83

84
        // allocUnsafeSlow because we don't need buffer pooling for these relatively long-lived buffers
85
        this.readBuffer = Buffer.allocUnsafeSlow(this.readBufferSize);
2,716✔
86
        // Where inside the file the read buffer starts
87
        this.readBufferPos = -1;
2,716✔
88
        this.readBufferLength = 0;
2,716✔
89

90
        this.headerSize = 0;
2,716✔
91
        this.size = this.readFileSize();
2,716✔
92
        if (this.size <= 0) {
2,716✔
93
            this.close();
1,164✔
94
            return false;
1,164✔
95
        }
96

97
        this.size -= this.readMetadata();
1,552✔
98

99
        return true;
1,544✔
100
    }
101

102
    /**
103
     * Read the partition metadata from the file.
104
     *
105
     * @private
106
     * @returns {number} The size of the metadata header.
107
     * @throws {Error} if the file header magic value is invalid.
108
     * @throws {Error} if the metadata size in the header is invalid.
109
     */
110
    readMetadata() {
111
        assert(this.size >= 16, `Invalid file.`);
1,552✔
112

113
        const headerBuffer = Buffer.allocUnsafe(8 + 4);
1,544✔
114
        fs.readSync(this.fd, headerBuffer, 0, 8 + 4, 0);
1,544✔
115
        const headerMagic = headerBuffer.toString('utf8', 0, 8);
1,544✔
116

117
        assert(headerMagic.substr(0, 6) === HEADER_MAGIC.substr(0, 6), `Invalid file header in partition ${this.name}.`);
1,544✔
118

119
        this.header = headerMagic;
1,544✔
120
        assert(headerMagic === HEADER_MAGIC, `Invalid file version. The partition ${this.name} was created with a different library version (${headerMagic.substr(6)}).`);
1,544✔
121

122
        const metadataSize = headerBuffer.readUInt32BE(8);
1,544✔
123
        assert(metadataSize > 2 && metadataSize <= 4096, 'Invalid metadata size.');
1,544✔
124

125
        const metadataBuffer = Buffer.allocUnsafe(metadataSize - 1);
1,544✔
126
        metadataBuffer.fill(" ");
1,544✔
127
        fs.readSync(this.fd, metadataBuffer, 0, metadataSize - 1, 8 + 4);
1,544✔
128
        const metadata = metadataBuffer.toString('utf8').trim();
1,544✔
129
        try {
1,544✔
130
            this.metadata = JSON.parse(metadata);
1,544✔
131
            this.metadata.epoch = this.metadata.epoch /* istanbul ignore next */|| NES_EPOCH.getTime();
1,544!
132
        } catch (e) {
UNCOV
133
            throw new Error('Invalid metadata.');
×
134
        }
135
        this.headerSize = 8 + 4 + metadataSize;
1,544✔
136
        return this.headerSize;
1,544✔
137
    }
138

139
    /**
140
     * Get the storage size for a document of a given size.
141
     *
142
     * @param {number} dataSize The actual data size of the document.
143
     * @returns {number} The size of the data including header, padded to 16 bytes alignment and ended with a line break.
144
     */
145
    documentWriteSize(dataSize) {
146
        const padSize = alignTo(dataSize + DOCUMENT_FOOTER_SIZE, DOCUMENT_ALIGNMENT);
24,424✔
147
        return DOCUMENT_HEADER_SIZE + dataSize + padSize + DOCUMENT_FOOTER_SIZE;
24,424✔
148
    }
149

150
    /**
151
     * @protected
152
     * @returns {number} The file size not including the file header.
153
     */
154
    readFileSize() {
155
        const stat = fs.statSync(this.fileName);
2,728✔
156
        return stat.size - this.headerSize;
2,728✔
157
    }
158

159
    /**
160
     * Close the partition and frees up all resources.
161
     *
162
     * @api
163
     * @returns void
164
     */
165
    close() {
166
        if (this.fd) {
3,268✔
167
            fs.closeSync(this.fd);
2,716✔
168
            this.fd = null;
2,716✔
169
        }
170
        if (this.readBuffer) {
3,268✔
171
            this.readBuffer = null;
2,716✔
172
            this.readBufferPos = -1;
2,716✔
173
            this.readBufferLength = 0;
2,716✔
174
        }
175
    }
176

177
    /**
178
     * Fill the internal read buffer starting from the given position.
179
     *
180
     * @private
181
     * @param {number} [from] The file position to start filling the read buffer from. Default 0.
182
     */
183
    fillBuffer(from = 0) {
×
184
        this.readBufferLength = fs.readSync(this.fd, this.readBuffer, 0, this.readBuffer.byteLength, this.headerSize + from);
824✔
185
        this.readBufferPos = from;
824✔
186
    }
187

188
    /**
189
     * @private
190
     * @param {Buffer} buffer The buffer to read the data length from.
191
     * @param {number} offset The position inside the buffer to start reading from.
192
     * @param {number} position The file position to start reading from.
193
     * @param {number} [size] The expected byte size of the document at the given position.
194
     * @returns {{ dataSize: number, sequenceNumber: number, time64: number }} The metadata fields of the document
195
     * @throws {Error} if the storage entry at the given position is corrupted.
196
     * @throws {InvalidDataSizeError} if the document size at the given position does not match the provided size.
197
     * @throws {CorruptFileError} if the document at the given position can not be read completely.
198
     */
199
    readDocumentHeader(buffer, offset, position, size) {
200
        const dataSize = buffer.readUInt32BE(offset + 0);
10,016✔
201
        assert(dataSize > 0 && dataSize <= 64 * 1024 * 1024, `Error reading document size from ${position}, got ${dataSize}.`);
10,016✔
202

203
        if (size && dataSize !== size) {
10,016✔
204
            throw new InvalidDataSizeError(`Invalid document size ${dataSize} at position ${position}, expected ${size}.`);
8✔
205
        }
206

207
        const sequenceNumber = buffer.readUInt32BE(offset + 4);
10,008✔
208
        const time64 = buffer.readDoubleBE(offset + 8);
10,008✔
209
        return ({ dataSize, sequenceNumber, time64 });
10,008✔
210
    }
211

212
    /**
213
     * Prepare the read buffer for reading from the specified position.
214
     *
215
     * @protected
216
     * @param {number} position The position in the file to prepare the read buffer for reading from.
217
     * @returns {{ buffer: Buffer|null, cursor: number, length: number }} A reader object with properties `buffer`, `cursor` and `length`.
218
     */
219
    prepareReadBuffer(position) {
220
        if (position + DOCUMENT_HEADER_SIZE >= this.size) {
7,204!
UNCOV
221
            return ({ buffer: null, cursor: 0, length: 0 });
×
222
        }
223
        let bufferCursor = position - this.readBufferPos;
7,204✔
224
        if (this.readBufferPos < 0 || bufferCursor < 0 || bufferCursor + DOCUMENT_HEADER_SIZE + DOCUMENT_ALIGNMENT > this.readBufferLength) {
7,204✔
225
            this.fillBuffer(position);
668✔
226
            bufferCursor = 0;
668✔
227
        }
228
        return ({ buffer: this.readBuffer, cursor: bufferCursor, length: this.readBufferLength });
7,204✔
229
    }
230

231
    /**
232
     * Prepare the read buffer for reading *before* the specified position. Don't try to reader *after* the returned cursor.
233
     *
234
     * @protected
235
     * @param {number} position The position in the file to prepare the read buffer for reading before.
236
     * @returns {{ buffer: Buffer|null, cursor: number, length: number }} A reader object with properties `buffer`, `cursor` and `length`.
237
     */
238
    prepareReadBufferBackwards(position) {
239
        if (position < 0) {
1,016!
UNCOV
240
            return ({ buffer: null, cursor: 0, length: 0 });
×
241
        }
242
        let bufferCursor = position - this.readBufferPos;
1,016✔
243
        if (this.readBufferPos < 0 || (this.readBufferPos > 0 && bufferCursor < DOCUMENT_FOOTER_SIZE)) {
1,016✔
244
            this.fillBuffer(Math.max(position - this.readBuffer.byteLength, 0));
156✔
245
            bufferCursor = position - this.readBufferPos;
156✔
246
        }
247
        return ({ buffer: this.readBuffer, cursor: bufferCursor, length: this.readBufferLength });
1,016✔
248
    }
249

250
    /**
251
     * Read the data from the given position.
252
     *
253
     * @api
254
     * @param {number} position The file position to read from.
255
     * @param {number} [size] The expected byte size of the document at the given position.
256
     * @param {object|null} [headerOut] Optional object to populate with the document header fields
257
     *   (`dataSize`, `sequenceNumber`, `time64`). Pass an existing object to avoid extra allocation.
258
     * @returns {string|boolean} The data stored at the given position or false if no data could be read.
259
     * @throws {Error} if the storage entry at the given position is corrupted.
260
     * @throws {InvalidDataSizeError} if the document size at the given position does not match the provided size.
261
     * @throws {CorruptFileError} if the document at the given position can not be read completely.
262
     */
263
    readFrom(position, size = 0, headerOut = null) {
13,760✔
264
        assert(this.fd, 'Partition is not opened.');
9,992✔
265
        assert((position % DOCUMENT_ALIGNMENT) === 0, `Invalid read position ${position}. Needs to be a multiple of ${DOCUMENT_ALIGNMENT}.`);
9,988✔
266

267
        const reader = this.prepareReadBuffer(position);
9,980✔
268
        if (reader.length < size + DOCUMENT_HEADER_SIZE) {
9,980✔
269
            return false;
132✔
270
        }
271

272
        let dataPosition = reader.cursor + DOCUMENT_HEADER_SIZE;
9,848✔
273
        const { dataSize, sequenceNumber, time64 } = this.readDocumentHeader(reader.buffer, reader.cursor, position, size);
9,848✔
274
        if (headerOut !== null) {
9,840✔
275
            headerOut.dataSize = dataSize;
944✔
276
            headerOut.sequenceNumber = sequenceNumber;
944✔
277
            headerOut.time64 = time64;
944✔
278
        }
279

280
        // TODO: This should only be checked on opening
281
        const writeSize = this.documentWriteSize(dataSize);
9,840✔
282
        if (position + writeSize > this.size) {
9,840✔
283
            throw new CorruptFileError(`Invalid document at position ${position}. This may be caused by an unfinished write.`);
24✔
284
        }
285

286
        if (dataSize + DOCUMENT_HEADER_SIZE > reader.buffer.byteLength) {
9,816✔
287
            //console.log('sync read for large document size', dataLength, 'at position', position);
288
            const tempReadBuffer = Buffer.allocUnsafe(dataSize);
4✔
289
            fs.readSync(this.fd, tempReadBuffer, 0, dataSize, this.headerSize + position + DOCUMENT_HEADER_SIZE);
4✔
290
            return tempReadBuffer.toString('utf8');
4✔
291
        }
292

293
        if (reader.cursor > 0 && dataPosition + dataSize > reader.length) {
9,812!
UNCOV
294
            this.fillBuffer(position);
×
UNCOV
295
            dataPosition = DOCUMENT_HEADER_SIZE;
×
296
        }
297

298
        return reader.buffer.toString('utf8', dataPosition, dataPosition + dataSize);
9,812✔
299
    }
300

301
    /**
302
     * Find the start position of the document that precedes the given position.
303
     *
304
     * @protected
305
     * @param {number} position The file position to read backwards from.
306
     * @returns {number|boolean} The start position of the first document before the given position or false if no header could be found.
307
     */
308
    findDocumentPositionBefore(position) {
309
        assert(this.fd, 'Partition is not opened.');
840✔
310
        position -= (position % DOCUMENT_ALIGNMENT);
840✔
311
        if (position <= 0) {
840✔
312
            return false;
32✔
313
        }
314

315
        const separatorSize = DOCUMENT_SEPARATOR.length;
808✔
316
        // Optimization if we are at an exact document boundary, where we can just read the document size
317
        let reader = this.prepareReadBufferBackwards(position);
808✔
318
        const block = reader.buffer.toString('ascii', reader.cursor - separatorSize, reader.cursor);
808✔
319
        if (block === DOCUMENT_SEPARATOR) {
808✔
320
            const dataSize = reader.buffer.readUInt32BE(reader.cursor - separatorSize - 4);
764✔
321
            return position - this.documentWriteSize(dataSize);
764✔
322
        }
323

324
        do {
44✔
325
            reader = this.prepareReadBufferBackwards(position - separatorSize);
60✔
326

327
            const bufferSeparatorPosition = reader.buffer.lastIndexOf(DOCUMENT_SEPARATOR, reader.cursor - separatorSize, 'ascii');
60✔
328
            if (bufferSeparatorPosition >= 0) {
60✔
329
                position = this.readBufferPos + bufferSeparatorPosition + separatorSize;
28✔
330
                break;
28✔
331
            }
332
            position -= this.readBufferLength;
32✔
333
        } while (position > 0);
334
        return Math.max(0, position);
44✔
335
    }
336

337
    /**
338
     * Read the header and file position of the last document in this partition.
339
     *
340
     * @api
341
     * @returns {{ header: {dataSize: number, sequenceNumber: number, time64: number}, position: number } | null}
342
     *   The last document's header and its file position, or null if the partition is empty or unreadable.
343
     */
344
    readLast() {
345
        if (this.size === 0) return null;
152✔
346
        const position = this.findDocumentPositionBefore(this.size);
140✔
347
        /* istanbul ignore if */
348
        if (position === false || position < 0) return null;
140✔
349
        const reader = this.prepareReadBufferBackwards(position);
140✔
350
        /* istanbul ignore if */
351
        if (!reader.buffer) return null;
140✔
352
        const header = this.readDocumentHeader(reader.buffer, reader.cursor, position);
140✔
353
        return { header, position };
140✔
354
    }
355

356
    /**
357
     * Find the first document whose sequenceNumber is >= the given value.
358
     * Uses readLast() to short-circuit when the partition contains no such document.
359
     *
360
     * @api
361
     * @param {number} sequenceNumber The 0-based sequence number to search for.
362
     * @returns {{ reader: Generator<string>, headerOut: object, data: string }|null}
363
     *   The matched document with its reader and shared headerOut, or null if no such document exists.
364
     */
365
    findDocument(sequenceNumber) {
366
        const last = this.readLast();
108✔
367
        if (!last || last.header.sequenceNumber < sequenceNumber) {
108✔
368
            return null;
16✔
369
        }
370
        const headerOut = {};
92✔
371
        const reader = this.readAll(0, headerOut);
92✔
372
        let result = reader.next();
92✔
373
        while (!result.done && headerOut.sequenceNumber < sequenceNumber) {
92✔
374
            result = reader.next();
96✔
375
        }
376
        /* istanbul ignore if */
377
        if (result.done) {
92✔
378
            return null;
379
        }
380
        return { reader, headerOut, data: result.value };
92✔
381
    }
382

383
    /**
384
     * @api
385
     * @param {number} [after] The document position to start reading from.
386
     * @param {object|null} [headerOut] Optional object to populate with document header fields
387
     *   (`dataSize`, `sequenceNumber`, `time64`, `position`) on each yield. Pass an existing object
388
     *   to avoid extra allocation. The object is mutated in place before each yield.
389
     * @returns {Generator<string>} A generator that returns all documents in this partition.
390
     */
391
    *readAll(after = 0, headerOut = null) {
36✔
392
        let position = after < 0 ? this.size + after + 1 : after;
112✔
393
        const internalHeader = headerOut !== null ? headerOut : {};
112✔
394
        let data;
395
        while ((data = this.readFrom(position, 0, internalHeader)) !== false) {
112✔
396
            if (headerOut !== null) {
944✔
397
                headerOut.position = position;
332✔
398
            }
399
            yield data;
944✔
400
            position += this.documentWriteSize(internalHeader.dataSize);
936✔
401
        }
402
    }
403

404
    /**
405
     * @api
406
     * @param {number} [before] The document position to start reading backward from.
407
     * @returns {Generator<string>} A generator that returns all documents in this partition in reverse order.
408
     */
409
    *readAllBackwards(before = -1) {
×
410
        let position = before < 0 ? this.size + before + 1 : before;
24✔
411
        while ((position = this.findDocumentPositionBefore(position)) !== false) {
24✔
412
            const data = this.readFrom(position);
636✔
413
            yield data;
636✔
414
        }
415
    }
416
}
417

418
module.exports = ReadablePartition;
4✔
419
module.exports.CorruptFileError = CorruptFileError;
4✔
420
module.exports.InvalidDataSizeError = InvalidDataSizeError;
4✔
421
module.exports.HEADER_MAGIC = HEADER_MAGIC;
4✔
422
module.exports.DOCUMENT_SEPARATOR = DOCUMENT_SEPARATOR;
4✔
423
module.exports.DOCUMENT_ALIGNMENT = DOCUMENT_ALIGNMENT;
4✔
424
module.exports.DOCUMENT_HEADER_SIZE = DOCUMENT_HEADER_SIZE;
4✔
425
module.exports.DOCUMENT_FOOTER_SIZE = DOCUMENT_FOOTER_SIZE;
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc