• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

albe / node-event-storage / 23896307326

02 Apr 2026 10:35AM UTC coverage: 98.151% (+0.1%) from 98.054%
23896307326

Pull #257

github

web-flow
Merge 1b4959036 into 50d3642f2
Pull Request #257: Move to ES Modules (ESM)

890 of 929 branches covered (95.8%)

Branch coverage included in aggregate %.

118 of 118 new or added lines in 21 files covered. (100.0%)

55 existing lines in 12 files now uncovered.

4684 of 4750 relevant lines covered (98.61%)

787.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.12
/src/Consumer.js
1
import stream from 'stream';
4✔
2
import fs from 'fs';
4✔
3
import path from 'path';
4✔
4
import { assert, ensureDirectory } from './util.js';
4✔
5
import Storage from './Storage/ReadableStorage.js';
4✔
6
const MAX_CATCHUP_BATCH = 10;
4✔
7

4✔
8
/**
4✔
9
 * Safely unlink a file and ignore if it doesn't exist.
4✔
10
 * @param {string} filename
4✔
11
 */
4✔
12
const safeUnlink = (filename) => {
4✔
13
    /* istanbul ignore next */
4✔
14
    try {
4✔
15
        fs.unlinkSync(filename);
4✔
16
    } catch (e) {
4!
UNCOV
17
        if (e.code !== "ENOENT") {
×
UNCOV
18
            throw e;
×
UNCOV
19
        }
×
UNCOV
20
    }
×
21
};
4✔
22

4✔
23
/**
4✔
24
 * Implements an event-driven durable Consumer that provides at-least-once delivery semantics or exactly-once processing semantics if only using setState().
4✔
25
 */
4✔
26
class Consumer extends stream.Readable {
4✔
27

4✔
28
    /**
4✔
29
     * @param {Storage} storage The storage to create the consumer for.
4✔
30
     * @param {string} indexName The name of the index to consume.
4✔
31
     * @param {string} identifier The unique name to identify this consumer.
4✔
32
     * @param {object} [initialState] The initial state of the consumer.
4✔
33
     * @param {number} [startFrom] The revision to start from within the index to consume.
4✔
34
     */
4✔
35
    constructor(storage, indexName, identifier, initialState = {}, startFrom = 0) {
4✔
36
        super({ objectMode: true });
156✔
37

156✔
38
        assert(storage instanceof Storage, 'Must provide a storage for the consumer.');
156✔
39
        assert(typeof indexName === 'string' && indexName !== '', 'Must specify an index name for the consumer.');
156✔
40
        assert(typeof identifier === 'string' && identifier !== '', 'Must specify an identifier name for the consumer.');
156✔
41

156✔
42
        this.initializeStorage(storage, indexName, identifier);
156✔
43
        this.restoreState(initialState, startFrom);
156✔
44
        this.handler = this.handleNewDocument.bind(this);
156✔
45
        this.on('error', () => (this.handleDocument = false));
156✔
46
    }
156✔
47

4✔
48
    /**
4✔
49
     * @private
4✔
50
     * @param {Storage} storage The storage to create the consumer for.
4✔
51
     * @param {string} indexName The name of the index to consume.
4✔
52
     * @param {string} identifier The unique name to identify this consumer.
4✔
53
     */
4✔
54
    initializeStorage(storage, indexName, identifier) {
4✔
55
        this.storage = storage;
144✔
56
        this.index = this.storage.openIndex(indexName);
144✔
57
        this.indexName = indexName;
144✔
58
        const consumerDirectory = path.join(this.storage.indexDirectory, 'consumers');
144✔
59
        this.fileName = path.join(consumerDirectory, this.storage.storageFile + '.' + indexName + '.' + identifier);
144✔
60
        if (ensureDirectory(consumerDirectory)) {
144✔
61
            this.cleanUpFailedWrites();
20✔
62
        }
20✔
63
    }
144✔
64

4✔
65
    /**
4✔
66
     * Iterate over all files in the directory of this consumer and unlink any file that starts with the filename followed by a dot.
4✔
67
     * @private
4✔
68
     */
4✔
69
    cleanUpFailedWrites() {
4✔
70
        const consumerNamePrefix = path.basename(this.fileName) + '.';
20✔
71
        const consumerDirectory = path.dirname(this.fileName);
20✔
72
        const files = fs.readdirSync(consumerDirectory);
20✔
73
        for (let file of files) {
20✔
74
            if (file.startsWith(consumerNamePrefix)) {
8✔
75
                safeUnlink(path.join(consumerDirectory, file));
4✔
76
            }
4✔
77
        }
8✔
78
    }
20✔
79

4✔
80
    /**
4✔
81
     * @private
4✔
82
     * @param {object} initialState The initial state if no persisted state exists.
4✔
83
     * @param {number} startFrom The revision to start from within the index to consume.
4✔
84
     */
4✔
85
    restoreState(initialState, startFrom) {
4✔
86
        /* istanbul ignore if */
144✔
87
        if (!this.fileName) {
144!
UNCOV
88
            return;
×
UNCOV
89
        }
×
90
        if (typeof initialState === 'number') {
144✔
91
            startFrom = initialState;
4✔
92
            initialState = {};
4✔
93
        }
4✔
94
        try {
144✔
95
            const consumerData = fs.readFileSync(this.fileName);
144✔
96
            this.position = consumerData.readInt32LE(0);
144✔
97
            this.state = JSON.parse(consumerData.toString('utf8', 4));
144✔
98
        } catch (e) {
144✔
99
            this.position = startFrom;
140✔
100
            this.state = initialState;
140✔
101
        }
140✔
102
        Object.freeze(this.state);
144✔
103

144✔
104
        this.persisting = null;
144✔
105
        this.consuming = false;
144✔
106
    }
144✔
107

4✔
108
    /**
4✔
109
     * Update the state of this consumer transactionally with the position.
4✔
110
     * May only be called from within the document handling callback.
4✔
111
     *
4✔
112
     * @param {object|function(object):object} newState
4✔
113
     * @param {boolean} [persist] Set to false if this state update should not be persisted yet
4✔
114
     * @api
4✔
115
     */
4✔
116
    setState(newState, persist = true) {
4✔
117
        assert(this.handleDocument, 'Called setState outside of document handler!');
84✔
118

84✔
119
        if (typeof newState === 'function') {
84✔
120
            newState = newState(this.state);
56✔
121
        }
56✔
122
        this.state = Object.freeze(newState);
80✔
123
        this.doPersist = persist;
80✔
124
    }
84✔
125

4✔
126
    /**
4✔
127
     * Handler method that is supposed to be triggered for each new document in the storage.
4✔
128
     *
4✔
129
     * @private
4✔
130
     * @param {string} name The name of the index the document was added for.
4✔
131
     * @param {number} position The 1-based position inside the index that the document was added to.
4✔
132
     * @param {object} document The document that was added.
4✔
133
     */
4✔
134
    handleNewDocument(name, position, document) {
4✔
135
        if (name !== this.indexName) {
64✔
136
            return;
8✔
137
        }
8✔
138

56✔
139
        /* istanbul ignore if */
56✔
140
        if (this.position !== position - 1) {
64!
UNCOV
141
            return;
×
UNCOV
142
        }
×
143

56✔
144
        this.handleDocument = true;
56✔
145
        this.once('data', () => (this.handleDocument = false));
56✔
146
        if (!this.push(document)) {
64✔
147
            this.stop();
4✔
148
        }
4✔
149
        this.position = position;
56✔
150
        if (this.doPersist) {
64✔
151
            this.persist();
12✔
152
        }
12✔
153
    }
64✔
154

4✔
155
    /**
4✔
156
     * Persist current state of this consumer.
4✔
157
     * This will write the current position and state to the consumer storage file.
4✔
158
     *
4✔
159
     * @private
4✔
160
     */
4✔
161
    persist() {
4✔
162
        if (this.persisting) {
140!
163
            return;
×
UNCOV
164
        }
×
165
        this.persisting = setImmediate(() => {
140✔
166
            const consumerState = JSON.stringify(this.state);
140✔
167
            const consumerData = Buffer.allocUnsafe(4 + consumerState.length);
140✔
168
            consumerData.writeInt32LE(this.position, 0);
140✔
169
            consumerData.write(consumerState, 4, consumerState.length, 'utf-8');
140✔
170
            const tmpFile = this.fileName + '.' + this.position;
140✔
171
            this.persisting = null;
140✔
172
            /* istanbul ignore if */
140✔
173
            if (fs.existsSync(tmpFile)) {
140!
UNCOV
174
                throw new Error(`Trying to update consumer ${this.name} concurrently. Keep each single consumer within a single process.`);
×
UNCOV
175
            }
×
176
            try {
140✔
177
                fs.writeFileSync(tmpFile, consumerData);
140✔
178
                // If the write fails (half-way), the consumer state file will not be corrupted
140✔
179
                fs.renameSync(tmpFile, this.fileName);
140✔
180
                this.emit('persisted', consumerState);
140✔
181
            } catch (e) {
140!
UNCOV
182
                /* istanbul ignore next */
×
UNCOV
183
                safeUnlink(tmpFile);
×
UNCOV
184
            }
×
185
        });
140✔
186
    }
140✔
187

4✔
188
    /**
4✔
189
     * Check if this consumer has caught up. If so, register a handler for the stream and emit a 'caught-up' event.
4✔
190
     *
4✔
191
     * @private
4✔
192
     * @returns {boolean} True if this consumer has caught up and can
4✔
193
     */
4✔
194
    checkCaughtUp() {
4✔
195
        if (this.index.length <= this.position) {
244✔
196
            this.handleDocument = false;
128✔
197
            this.storage.on('index-add', this.handler);
128✔
198
            this.emit('caught-up');
128✔
199
            return true;
128✔
200
        }
128✔
201
        return (this.consuming === false);
116✔
202
    }
244✔
203

4✔
204
    /**
4✔
205
     * Consume (push) a number of documents and update the position record.
4✔
206
     *
4✔
207
     * @private
4✔
208
     * @param {Array|Generator} documents The list or a stream of documents to consume
4✔
209
     */
4✔
210
    consumeDocuments(documents) {
4✔
211
        for (let document of documents) {
108✔
212
            if (!this.push(document)) {
200✔
213
                this.stop();
4✔
214
                break;
4✔
215
            }
4✔
216
            ++this.position;
196✔
217
        }
196✔
218
    }
108✔
219

4✔
220
    /**
4✔
221
     * Start consuming documents.
4✔
222
     *
4✔
223
     * This will also catch up from the last position in case new documents were added.
4✔
224
     * @api
4✔
225
     */
4✔
226
    start() {
4✔
227
        if (this.isPaused()) {
264✔
228
            this.resume();
20✔
229
        }
20✔
230
        if (this.consuming) {
264✔
231
            return;
128✔
232
        }
128✔
233
        this.consuming = true;
136✔
234
        this.handleDocument = true;
136✔
235

136✔
236
        // Catch up to current index position
136✔
237
        const catchUpBatch = () => {
136✔
238
            setImmediate(() => {
244✔
239
                if (this.checkCaughtUp()) {
244✔
240
                    return;
136✔
241
                }
136✔
242

108✔
243
                const maxBatchPosition = Math.min(this.position + MAX_CATCHUP_BATCH + 1, this.index.length);
108✔
244
                const documents = this.storage.readRange(this.position + 1, maxBatchPosition, this.index);
108✔
245
                this.consumeDocuments(documents);
108✔
246
                this.once('persisted', () => catchUpBatch());
108✔
247
                this.persist();
108✔
248
            });
244✔
249
        };
136✔
250
        catchUpBatch();
136✔
251
    }
264✔
252

4✔
253
    /**
4✔
254
     * Stop consuming new documents. Consuming can be started again at any time.
4✔
255
     * @api
4✔
256
     */
4✔
257
    stop() {
4✔
258
        if (this.consuming) {
64✔
259
            this.pause();
48✔
260
        }
48✔
261
        this.storage.removeListener('index-add', this.handler);
64✔
262
        this.consuming = false;
64✔
263
        this.handleDocument = false;
64✔
264
    }
64✔
265

4✔
266
    /**
4✔
267
     * Reset this projection to restart processing all documents again.
4✔
268
     * NOTE: This will overwrite the current state of the projection and hence be destructive.
4✔
269
     * @param {object} [initialState] The initial state of the consumer.
4✔
270
     * @param {number} [startFrom] The revision to start from within the index to consume.
4✔
271
     * @api
4✔
272
     */
4✔
273
    reset(initialState = {}, startFrom = 0) {
4✔
274
        if (typeof initialState === 'number') {
20✔
275
            startFrom = initialState;
4✔
276
            initialState = {};
4✔
277
        }
4✔
278
        const restart = this.consuming;
20✔
279
        this.stop();
20✔
280
        this.state = Object.freeze(initialState);
20✔
281
        this.position = startFrom;
20✔
282
        this.persist();
20✔
283
        if (restart) {
20✔
284
            this.start();
16✔
285
        }
16✔
286
    }
20✔
287

4✔
288
    // noinspection JSUnusedGlobalSymbols
4✔
289
    /**
4✔
290
     * Readable stream implementation.
4✔
291
     * @private
4✔
292
     */
4✔
293
    _read() {
4✔
294
        if (this.isPaused()) {
240✔
295
            return;
24✔
296
        }
24✔
297
        this.start();
216✔
298
    }
240✔
299
}
4✔
300

4✔
301
export default Consumer;
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc