• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

albe / node-event-storage / 27492865541

14 Jun 2026 08:10AM UTC coverage: 98.686% (-0.1%) from 98.787%
27492865541

Pull #322

github

web-flow
Merge 128175f25 into 388cb5d1f
Pull Request #322: fix: resolve race condition crashing read-only live-watch reader

1355 of 1394 branches covered (97.2%)

Branch coverage included in aggregate %.

132 of 139 new or added lines in 2 files covered. (94.96%)

29 existing lines in 5 files now uncovered.

6684 of 6752 relevant lines covered (98.99%)

799.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.9
/src/Storage/ReadOnlyStorage.js
1
import ReadableStorage from './ReadableStorage.js';
4✔
2
import ReadablePartition from '../Partition/ReadablePartition.js';
4✔
3
import Watcher from '../Watcher.js';
4✔
4
import { scanForFilesSync } from '../utils/fsUtil.js';
4✔
5

4✔
6
/**
4✔
7
 * An append-only storage with highly performant positional range scans.
4✔
8
 * It's highly optimized for an event-store and hence does not support compaction or data-rewrite, nor any querying
4✔
9
 */
4✔
10
class ReadOnlyStorage extends ReadableStorage {
4✔
11

4✔
12
    /**
4✔
13
     * @inheritdoc
4✔
14
     */
4✔
15
    constructor(storageName = 'storage', config = {}) {
4✔
16
        super(storageName, config);
124✔
17
        this.storageFilesFilter = this.storageFilesFilter.bind(this);
124✔
18
        this.onStorageFileChanged = this.onStorageFileChanged.bind(this);
124✔
19
    }
124✔
20

4✔
21
    /**
4✔
22
     * Returns true if the given filename belongs to this storage.
4✔
23
     * @param {string} filename
4✔
24
     * @returns {boolean}
4✔
25
     */
4✔
26
    storageFilesFilter(filename) {
4✔
27
        return !filename.endsWith('.branch') && filename.substring(0, this.storageFile.length) === this.storageFile;
56✔
28
    }
56✔
29

4✔
30
    /**
4✔
31
     * Open the storage and indexes and create read and write buffers eagerly.
4✔
32
     * Will emit an 'opened' event if finished.
4✔
33
     *
4✔
34
     * @api
4✔
35
     * @param {function(): void} [callback] Called after indexes open, before `'opened'` is emitted.
4✔
36
     *   Can be used as a synchronous alternative to listening to the `'opened'` event.
4✔
37
     * @returns {boolean}
4✔
38
     */
4✔
39
    open(callback) {
4✔
40
        if (!this.watcher) {
124✔
41
            this.watcher = new Watcher([this.dataDirectory, this.indexDirectory], this.storageFilesFilter);
120✔
42
            this.watcher.on('rename', this.onStorageFileChanged);
120✔
43
        }
120✔
44
        return super.open(callback);
124✔
45
    }
124✔
46

4✔
47
    /**
4✔
48
     * @private
4✔
49
     * @param {string} filename
4✔
50
     */
4✔
51
    onStorageFileChanged(filename) {
4✔
52
        if (filename.endsWith('.index')) {
32✔
53
            const indexName = filename.substring(this.storageFile.length + 1, filename.length - 6);
20✔
54
            // New indexes are not automatically opened in the reader
20✔
55
            this.emit('index-created', indexName);
20✔
56
            return;
20✔
57
        }
20✔
58

12✔
59
        this.registerPartitionFile(filename);
12✔
60
    }
32✔
61

4✔
62
    /**
4✔
63
     * Register a partition by its relative file name if it is not already known.
4✔
64
     * Shared by the file-watch path and the index-append path so both stay consistent.
4✔
65
     *
4✔
66
     * @private
4✔
67
     * @param {string} filename
4✔
68
     * @returns {number} The id of the (now registered) partition.
4✔
69
     */
4✔
70
    registerPartitionFile(filename) {
4✔
71
        const partitionId = ReadablePartition.idFor(filename);
16✔
72
        if (!this.partitions.has(partitionId)) {
16✔
73
            const partition = this.createPartition(filename, this.partitionConfig);
16✔
74
            this.partitions.add(partition.id, partition);
16✔
75
            this.emit('partition-created', partition.id);
16✔
76
        }
16✔
77
        return partitionId;
16✔
78
    }
16✔
79

4✔
80
    /**
4✔
81
     * Ensure the partition referenced by an appended index entry is registered.
4✔
82
     *
4✔
83
     * The index file and the partition file are watched independently, so an `'append'` can be
4✔
84
     * observed before the corresponding partition-creation event has been dispatched. The writer
4✔
85
     * always flushes the partition before appending to the index, so the file already exists on
4✔
86
     * disk; a one-off synchronous scan registers it on demand and closes the race.
4✔
87
     *
4✔
88
     * @private
4✔
89
     * @param {number} partitionId
4✔
90
     * @returns {boolean} True if the partition is registered after this call.
4✔
91
     */
4✔
92
    ensurePartitionRegistered(partitionId) {
4✔
93
        if (this.partitions.has(partitionId)) {
20✔
94
            return true;
16✔
95
        }
16✔
96
        const escaped = this.storageFile.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
4✔
97
        const partitionPattern = new RegExp(`^(${escaped}.*)$`);
4✔
98
        scanForFilesSync(this.dataDirectory, partitionPattern, (file) => {
4✔
99
            if (file.endsWith('.index') || file.endsWith('.branch') || file.endsWith('.lock')) return;
8✔
100
            this.registerPartitionFile(file);
4✔
101
        });
4✔
102
        return this.partitions.has(partitionId);
4✔
103
    }
20✔
104

4✔
105
    /**
4✔
106
     * Close the storage and frees up all resources.
4✔
107
     * Will emit a 'closed' event when finished.
4✔
108
     *
4✔
109
     * @api
4✔
110
     * @returns void
4✔
111
     */
4✔
112
    close() {
4✔
113
        if (this.watcher) {
184✔
114
            this.watcher.close();
120✔
115
            this.watcher = null;
120✔
116
        }
120✔
117
        super.close();
184✔
118
    }
184✔
119

4✔
120
    /**
4✔
121
     * @protected
4✔
122
     * @param {string} name
4✔
123
     * @param {object} [options]
4✔
124
     * @returns {{ index: ReadableIndex, matcher?: object|function }}
4✔
125
     */
4✔
126
    createIndex(name, options = {}) {
4✔
127
        const { index } = super.createIndex(name, options);
176✔
128
        const indexShortName = name.replace(this.storageFile + '.', '').replace('.index', '');
176✔
129
        index.on('append', (prevLength, newLength) => {
176✔
130
            if (!this.watcher) {
20!
UNCOV
131
                // If the watcher has been removed, this means this storage was closed and we don't want to handle events any more
×
132
                return;
×
UNCOV
133
            }
×
134
            const entries = index.range(prevLength + 1, newLength);
20✔
135
            /* c8 ignore next 3 */
4✔
136
            if (entries === false) {
4✔
137
                return;
4✔
138
            }
4✔
139
            this.processAppendedEntries(index, indexShortName, entries);
20✔
140
        });
176✔
141
        index.on('truncate', (prevLength, newLength) => {
176✔
142
            if (index === this.index) {
8✔
143
                this.emit('truncate', prevLength, newLength);
8✔
144
            }
8✔
145
        });
176✔
146
        return { index };
176✔
147
    }
176✔
148

4✔
149
    /**
4✔
150
     * Emit `'wrote'` / `'index-add'` for each appended index entry.
4✔
151
     *
4✔
152
     * If an entry references a partition that has not been registered yet (the partition-creation
4✔
153
     * watch event is still pending), the remaining entries are re-processed on the next tick instead
4✔
154
     * of throwing a hard `Partition #… does not exist.` error out of the synchronous emit path.
4✔
155
     *
4✔
156
     * @private
4✔
157
     * @param {ReadableIndex} index
4✔
158
     * @param {string} indexShortName
4✔
159
     * @param {IndexEntry[]} entries
4✔
160
     * @param {number} [startIndex] The entry offset to resume from on a retry.
4✔
161
     */
4✔
162
    processAppendedEntries(index, indexShortName, entries, startIndex = 0) {
4✔
163
        for (let i = startIndex; i < entries.length; i++) {
24✔
164
            const entry = entries[i];
24✔
165
            if (!this.ensurePartitionRegistered(entry.partition)) {
24✔
166
                this.scheduleAppendRetry(index, indexShortName, entries, i);
4✔
167
                return;
4✔
168
            }
4✔
169
            this.emitAppendedEntry(index, indexShortName, entry);
20✔
170
        }
20✔
171
    }
24✔
172

4✔
173
    /**
4✔
174
     * Re-process appended entries from the given offset on the next tick.
4✔
175
     * @private
4✔
176
     */
4✔
177
    scheduleAppendRetry(index, indexShortName, entries, startIndex) {
4✔
178
        setTimeout(() => {
4✔
179
            if (!this.watcher) {
4!
NEW
180
                return;
×
NEW
181
            }
×
182
            this.processAppendedEntries(index, indexShortName, entries, startIndex);
4✔
183
        }, 1);
4✔
184
    }
4✔
185

4✔
186
    /**
4✔
187
     * Read a single appended entry and emit the corresponding event.
4✔
188
     * A read failure is surfaced as an `'error'` event (when observed) rather than crashing the reader.
4✔
189
     * @private
4✔
190
     */
4✔
191
    emitAppendedEntry(index, indexShortName, entry) {
4✔
192
        let document;
20✔
193
        try {
20✔
194
            document = this.readFrom(entry.partition, entry.position, entry.size);
20✔
195
        } catch (error) {
20!
196
            /* c8 ignore next 3 */
4✔
197
            if (this.listenerCount('error') > 0) {
4✔
198
                this.emit('error', error);
4✔
199
            }
4✔
NEW
UNCOV
200
            return;
×
NEW
UNCOV
201
        }
×
202
        if (index === this.index) {
20✔
203
            this.emit('wrote', document, entry, entry.position);
16✔
204
        } else {
20✔
205
            this.emit('index-add', indexShortName, entry.number, document);
4✔
206
        }
4✔
207
    }
20✔
208
}
4✔
209

4✔
210
export default ReadOnlyStorage;
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc