• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

albe / node-event-storage / 26331781152

23 May 2026 11:43AM UTC coverage: 98.028% (-0.08%) from 98.106%
26331781152

Pull #316

github

web-flow
Merge 88893fee7 into 03ae7bb7f
Pull Request #316: Add `event-storage-http` REST API layer with NDJSON reads, consumer endpoints, and raw buffer streaming

1188 of 1244 branches covered (95.5%)

Branch coverage included in aggregate %.

582 of 604 new or added lines in 11 files covered. (96.36%)

10 existing lines in 2 files now uncovered.

5771 of 5855 relevant lines covered (98.57%)

841.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.34
/src/Consumer.js
1
import stream from 'stream';
4✔
2
import fs from 'fs';
4✔
3
import path from 'path';
4✔
4
import { assert } from './util.js';
4✔
5
import { ensureDirectory } from './fsUtil.js';
4✔
6
import Storage from './Storage/ReadableStorage.js';
4✔
7
const MAX_CATCHUP_BATCH = 10;
4✔
8

4✔
9
/**
4✔
10
 * Safely unlink a file and ignore if it doesn't exist.
4✔
11
 * @param {string} filename
4✔
12
 */
4✔
13
const safeUnlink = (filename) => {
4✔
14
    /* istanbul ignore next */
12✔
15
    try {
12✔
16
        fs.unlinkSync(filename);
12✔
17
    } catch (e) {
12✔
18
        if (e.code !== "ENOENT") {
8✔
19
            throw e;
4✔
20
        }
4✔
21
    }
8✔
22
};
4✔
23

4✔
24
/**
4✔
25
 * Implements an event-driven durable Consumer that provides at-least-once delivery semantics or exactly-once processing semantics if only using setState().
4✔
26
 */
4✔
27
class Consumer extends stream.Readable {
4✔
28

4✔
29
    /**
4✔
30
     * @param {Storage} storage The storage to create the consumer for.
4✔
31
     * @param {string} indexName The name of the index to consume.
4✔
32
     * @param {string} identifier The unique name to identify this consumer.
4✔
33
     * @param {object} [initialState={}] The initial state of the consumer.
4✔
34
     * @param {number} [startFrom=0] The revision to start from within the index to consume.
4✔
35
     */
4✔
36
    constructor(storage, indexName, identifier, initialState = {}, startFrom = 0) {
4✔
37
        super({ objectMode: true });
172✔
38

172✔
39
        assert(storage instanceof Storage, 'Must provide a storage for the consumer.');
172✔
40
        assert(typeof indexName === 'string' && indexName !== '', 'Must specify an index name for the consumer.');
172✔
41
        assert(typeof identifier === 'string' && identifier !== '', 'Must specify an identifier name for the consumer.');
172✔
42

172✔
43
        this.initializeStorage(storage, indexName, identifier);
172✔
44
        this.restoreState(initialState, startFrom);
172✔
45
        this.handler = this.handleNewDocument.bind(this);
172✔
46
        this.on('error', () => (this.handleDocument = false));
172✔
47
    }
172✔
48

4✔
49
    /**
4✔
50
     * @private
4✔
51
     * @param {Storage} storage The storage to create the consumer for.
4✔
52
     * @param {string} indexName The name of the index to consume.
4✔
53
     * @param {string} identifier The unique name to identify this consumer.
4✔
54
     */
4✔
55
    initializeStorage(storage, indexName, identifier) {
4✔
56
        this.storage = storage;
160✔
57
        this.index = this.storage.openIndex(indexName);
160✔
58
        this.indexName = indexName;
160✔
59
        const consumerDirectory = path.join(this.storage.indexDirectory, 'consumers');
160✔
60
        this.fileName = path.join(consumerDirectory, this.storage.storageFile + '.' + indexName + '.' + identifier);
160✔
61
        if (ensureDirectory(consumerDirectory)) {
160✔
62
            this.cleanUpFailedWrites();
20✔
63
        }
20✔
64
    }
160✔
65

4✔
66
    /**
4✔
67
     * Iterate over all files in the directory of this consumer and unlink any file that starts with the filename followed by a dot.
4✔
68
     * @private
4✔
69
     */
4✔
70
    cleanUpFailedWrites() {
4✔
71
        const consumerNamePrefix = path.basename(this.fileName) + '.';
24✔
72
        const consumerDirectory = path.dirname(this.fileName);
24✔
73
        const files = fs.readdirSync(consumerDirectory);
24✔
74
        for (let file of files) {
24✔
75
            if (file.startsWith(consumerNamePrefix)) {
12✔
76
                safeUnlink(path.join(consumerDirectory, file));
8✔
77
            }
8✔
78
        }
12✔
79
    }
24✔
80

4✔
81
    /**
4✔
82
     * @private
4✔
83
     * @param {object} initialState The initial state if no persisted state exists.
4✔
84
     * @param {number} startFrom The revision to start from within the index to consume.
4✔
85
     */
4✔
86
    restoreState(initialState, startFrom) {
4✔
87
        /* istanbul ignore if */
160✔
88
        if (!this.fileName) {
160!
89
            return;
×
90
        }
×
91
        if (typeof initialState === 'number') {
160✔
92
            startFrom = initialState;
4✔
93
            initialState = {};
4✔
94
        }
4✔
95
        try {
160✔
96
            const consumerData = fs.readFileSync(this.fileName);
160✔
97
            this.position = consumerData.readInt32LE(0);
160✔
98
            this.state = JSON.parse(consumerData.toString('utf8', 4));
160✔
99
        } catch (e) {
160✔
100
            this.position = startFrom;
156✔
101
            this.state = initialState;
156✔
102
        }
156✔
103
        Object.freeze(this.state);
160✔
104

160✔
105
        this.persisting = null;
160✔
106
        this.consuming = false;
160✔
107
    }
160✔
108

4✔
109
    /**
4✔
110
     * Update the state of this consumer transactionally with the position.
4✔
111
     * May only be called from within the document handling callback.
4✔
112
     *
4✔
113
     * @param {object|function(object):object} newState
4✔
114
     * @param {boolean} [persist=true] Set to false if this state update should not be persisted yet
4✔
115
     * @api
4✔
116
     */
4✔
117
    setState(newState, persist = true) {
4✔
118
        assert(this.handleDocument, 'Called setState outside of document handler!');
84✔
119

84✔
120
        if (typeof newState === 'function') {
84✔
121
            newState = newState(this.state);
56✔
122
        }
56✔
123
        this.state = Object.freeze(newState);
80✔
124
        this.doPersist = persist;
80✔
125
    }
84✔
126

4✔
127
    /**
4✔
128
     * Handler method that is supposed to be triggered for each new document in the storage.
4✔
129
     *
4✔
130
     * @private
4✔
131
     * @param {string} name The name of the index the document was added for.
4✔
132
     * @param {number} position The 1-based position inside the index that the document was added to.
4✔
133
     * @param {object} document The document that was added.
4✔
134
     */
4✔
135
    handleNewDocument(name, position, document) {
4✔
136
        if (name !== this.indexName) {
68✔
137
            return;
8✔
138
        }
8✔
139

60✔
140
        /* istanbul ignore if */
60✔
141
        if (this.position !== position - 1) {
68✔
142
            return;
4✔
143
        }
4✔
144

56✔
145
        this.handleDocument = true;
56✔
146
        this.once('data', () => (this.handleDocument = false));
56✔
147
        if (!this.push(document)) {
68✔
148
            this.stop();
4✔
149
        }
4✔
150
        this.position = position;
56✔
151
        if (this.doPersist) {
68✔
152
            this.persist();
12✔
153
        }
12✔
154
    }
68✔
155

4✔
156
    /**
4✔
157
     * Persist current state of this consumer.
4✔
158
     * This will write the current position and state to the consumer storage file.
4✔
159
     *
4✔
160
     * @private
4✔
161
     */
4✔
162
    persist() {
4✔
163
        if (this.persisting) {
152✔
164
            return;
4✔
165
        }
4✔
166
        this.persisting = setImmediate(() => {
148✔
167
            const consumerState = JSON.stringify(this.state);
148✔
168
            const consumerData = Buffer.allocUnsafe(4 + consumerState.length);
148✔
169
            consumerData.writeInt32LE(this.position, 0);
148✔
170
            consumerData.write(consumerState, 4, consumerState.length, 'utf-8');
148✔
171
            const tmpFile = this.fileName + '.' + this.position;
148✔
172
            this.persisting = null;
148✔
173
            /* istanbul ignore if */
148✔
174
            if (fs.existsSync(tmpFile)) {
148!
UNCOV
175
                throw new Error(`Trying to update consumer ${this.name} concurrently. Keep each single consumer within a single process.`);
×
176
            }
×
177
            try {
148✔
178
                fs.writeFileSync(tmpFile, consumerData);
148✔
179
                // If the write fails (half-way), the consumer state file will not be corrupted
148✔
180
                fs.renameSync(tmpFile, this.fileName);
148✔
181
                this.emit('persisted', consumerState);
148✔
182
            } catch (e) {
148✔
183
                /* istanbul ignore next */
4✔
184
                safeUnlink(tmpFile);
4✔
185
            }
4✔
186
        });
148✔
187
    }
152✔
188

4✔
189
    /**
4✔
190
     * Check if this consumer has caught up. If so, register a handler for the stream and emit a 'caught-up' event.
4✔
191
     *
4✔
192
     * @private
4✔
193
     * @returns {boolean} True if this consumer has caught up and can
4✔
194
     */
4✔
195
    checkCaughtUp() {
4✔
196
        if (this.index.length <= this.position) {
248✔
197
            this.handleDocument = false;
132✔
198
            this.storage.on('index-add', this.handler);
132✔
199
            this.emit('caught-up');
132✔
200
            return true;
132✔
201
        }
132✔
202
        return (this.consuming === false);
116✔
203
    }
248✔
204

4✔
205
    /**
4✔
206
     * Consume (push) a number of documents and update the position record.
4✔
207
     *
4✔
208
     * @private
4✔
209
     * @param {Array|Generator} documents The list or a stream of documents to consume
4✔
210
     */
4✔
211
    consumeDocuments(documents) {
4✔
212
        for (let document of documents) {
108✔
213
            if (!this.push(document)) {
200✔
214
                this.stop();
4✔
215
                break;
4✔
216
            }
4✔
217
            ++this.position;
196✔
218
        }
196✔
219
    }
108✔
220

4✔
221
    /**
4✔
222
     * Start consuming documents.
4✔
223
     *
4✔
224
     * This will also catch up from the last position in case new documents were added.
4✔
225
     * @api
4✔
226
     */
4✔
227
    start() {
4✔
228
        if (this.isPaused()) {
272✔
229
            this.resume();
20✔
230
        }
20✔
231
        if (this.consuming) {
272✔
232
            return;
132✔
233
        }
132✔
234
        this.consuming = true;
140✔
235
        this.handleDocument = true;
140✔
236

140✔
237
        // Catch up to current index position
140✔
238
        const catchUpBatch = () => {
140✔
239
            setImmediate(() => {
248✔
240
                if (this.checkCaughtUp()) {
248✔
241
                    return;
140✔
242
                }
140✔
243

108✔
244
                const maxBatchPosition = Math.min(this.position + MAX_CATCHUP_BATCH + 1, this.index.length);
108✔
245
                const documents = this.storage.readRange(this.position + 1, maxBatchPosition, this.index);
108✔
246
                this.consumeDocuments(documents);
108✔
247
                this.once('persisted', () => catchUpBatch());
108✔
248
                this.persist();
108✔
249
            });
248✔
250
        };
140✔
251
        catchUpBatch();
140✔
252
    }
272✔
253

4✔
254
    /**
4✔
255
     * Stop consuming new documents. Consuming can be started again at any time.
4✔
256
     * @api
4✔
257
     */
4✔
258
    stop() {
4✔
259
        if (this.consuming) {
68✔
260
            this.pause();
48✔
261
        }
48✔
262
        this.storage.removeListener('index-add', this.handler);
68✔
263
        this.consuming = false;
68✔
264
        this.handleDocument = false;
68✔
265
    }
68✔
266

4✔
267
    /**
4✔
268
     * Reset this projection to restart processing all documents again.
4✔
269
     * NOTE: This will overwrite the current state of the projection and hence be destructive.
4✔
270
     * @param {object} [initialState={}] The initial state of the consumer.
4✔
271
     * @param {number} [startFrom=0] The revision to start from within the index to consume.
4✔
272
     * @api
4✔
273
     */
4✔
274
    reset(initialState = {}, startFrom = 0) {
4✔
275
        if (typeof initialState === 'number') {
20✔
276
            startFrom = initialState;
4✔
277
            initialState = {};
4✔
278
        }
4✔
279
        const restart = this.consuming;
20✔
280
        this.stop();
20✔
281
        this.state = Object.freeze(initialState);
20✔
282
        this.position = startFrom;
20✔
283
        this.persist();
20✔
284
        if (restart) {
20✔
285
            this.start();
16✔
286
        }
16✔
287
    }
20✔
288

4✔
289
    // noinspection JSUnusedGlobalSymbols
4✔
290
    /**
4✔
291
     * Readable stream implementation.
4✔
292
     * @private
4✔
293
     */
4✔
294
    _read() {
4✔
295
        if (this.isPaused()) {
244✔
296
            return;
24✔
297
        }
24✔
298
        this.start();
220✔
299
    }
244✔
300
}
4✔
301

4✔
302
export default Consumer;
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc