• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Inist-CNRS / ezs / 16829733528

08 Aug 2025 11:52AM UTC coverage: 95.519% (-0.01%) from 95.531%
16829733528

push

github

web-flow
Merge pull request #461 from Inist-CNRS/import-sync

refactor: 💡 avoid dynamic require

2206 of 2385 branches covered (92.49%)

Branch coverage included in aggregate %.

50 of 52 new or added lines in 3 files covered. (96.15%)

1 existing line in 1 file now uncovered.

4552 of 4690 relevant lines covered (97.06%)

79287.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.1
/packages/core/src/engine.js
1
/* eslint no-underscore-dangle: ["error", { "allow": ["_readableState"] }] */
2
import debug from 'debug';
3
import { types } from 'util';
4
import queue from 'concurrent-queue';
5
import { hrtime } from 'process';
6
import eos from 'end-of-stream';
7
import pWaitFor from 'p-wait-for';
8
import stringify from 'json-stringify-safe';
9
import Feed from './feed.js';
10
import Shell from './shell.js';
11

12
import SafeTransform from './SafeTransform.js';
13

14
/**
15
 * Engine scope object type
16
 * @private
17
 * @typedef {Object} EngineScope
18
 *
19
 * @property {Engine} ezs
20
 * @property {(d: unknown, c: unknown) => void} emit
21
 * @property {() => any} getParams
22
 * @property {() => boolean} isFirst
23
 * @property {() => number} getIndex
24
 * @property {() => boolean} isLast
25
 * @property {() => string} getCumulativeTime
26
 * @property {() => number} getCumulativeTimeMS
27
 * @property {() => number} getCounter
28
 * @property {(name: string, defval?: string | string[], chunk?: unknown) => string | string[] | undefined} getParam
29
 */
30

31
const nanoZero = () => BigInt(0);
59,290✔
32

33
const nano2sec = (ns) => {
1,237✔
34
    const sec = ns / BigInt(1e9);
2,324✔
35
    const msec = (ns / BigInt(1e6)) - (sec * BigInt(1e3));
2,324✔
36
    const time = Number(sec) + (Number(msec) / 10000);
2,324✔
37
    return Number(time).toFixed(4);
2,324✔
38
};
39
let counter = 0;
1,237✔
40
function increaseCounter() {
41
    counter += 1;
59,290✔
42
}
43
function decreaseCounter() {
44
    counter -= 1;
56,337✔
45
}
46

47
function createErrorWith(error, index, funcName, funcParams, chunk) {
48
    const stk = String(error.stack).split('\n');
73,463✔
49
    const prefix = `item #${index} `;
73,463✔
50
    const erm = stk.shift().replace(prefix, '');
73,463✔
51
    const msg = `${prefix}[${funcName}] <${erm}>\n\t${stk.slice(0, 10).join('\n\t')}`;
73,463✔
52
    const err = Error(msg);
73,463✔
53
    err.sourceError = error;
73,463✔
54
    err.sourceChunk = stringify(chunk);
73,463✔
55
    err.type = error.type || 'Standard error';
73,463✔
56
    err.scope = error.scope || 'code';
73,463✔
57
    err.date = error.date || new Date();
73,463✔
58
    err.message = msg.split('\n').shift();
73,463✔
59
    err.func = funcName;
73,463✔
60
    err.params = funcParams;
73,463✔
61
    err.traceback = stk.slice(0,10);
73,463✔
62
    err.index = index;
73,463✔
63
    Error.captureStackTrace(err, createErrorWith);
73,463✔
64
    return err;
73,463✔
65
}
66

67
export default class Engine extends SafeTransform {
68
    constructor(ezs, func, params, environment) {
69
        super(ezs.objectMode());
59,290✔
70
        this.funcName = 'Not yet defined';
59,290✔
71
        this.funcPromise = types.isPromise(func) ? func : Promise.resolve(func);
59,290!
72
        this.index = 0;
59,290✔
73
        this.ttime = nanoZero();
59,290✔
74
        this.stime = hrtime.bigint();
59,290✔
75
        this.params = params || {};
59,290✔
76
        this.ezs = ezs;
59,290✔
77
        this.environment = environment || {};
59,290✔
78
        this.errorWasSent = false;
59,290✔
79
        this.nullWasSent = false;
59,290✔
80
        this.queue = queue().limit(ezs.settings.queue).process((task, cb) => {
59,290✔
81
            this.execWith(task, cb);
4,017,109✔
82
        });
83
        this.on('pipe', (src) => {
59,290✔
84
            this.parentStream = src;
59,168✔
85
        });
86
        increaseCounter();
59,290✔
87
        eos(this, decreaseCounter);
59,290✔
88
        this.shell = new Shell(ezs, this.environment);
59,290✔
89
        this.chunk = {};
59,290✔
90
        /**
91
         * @private
92
         * @type {EngineScope}
93
         */
94
        this.scope = {};
59,290✔
95
        this.scope.getEnv = (name) => (name === undefined ? this.environment : this.environment[name]);
68,084✔
96
        this.scope.ezs = this.ezs;
59,290✔
97
        this.scope.emit = (d, c) => this.emit(d, c);
59,290✔
98
        this.scope.getParams = () => this.params;
59,290✔
99
        this.scope.isFirst = () => (this.index === 1);
1,402,968✔
100
        this.scope.getIndex = () => this.index;
59,290✔
101
        this.scope.isLast = () => (this.chunk === null);
3,540,502✔
102
        this.scope.getCumulativeTime = () => nano2sec(hrtime.bigint() - this.stime);
59,290✔
103
        this.scope.getCumulativeTimeMS = () => nano2sec(hrtime.bigint() - this.stime) * 1000;
59,290✔
104
        this.scope.getCounter = () => counter;
59,290✔
105
        this.scope.getParam = (name, defval, chunk) => {
59,290✔
106
            if (this.params[name] !== undefined) {
3,844,123✔
107
                return this.shell.run(this.params[name], chunk || this.chunk);
3,362,670✔
108
            }
109
            return defval;
481,453✔
110
        };
111
    }
112

113
    _transform(chunk, encoding, done) {
114
        const start = hrtime.bigint();
3,967,019✔
115
        const next = () => {
3,967,019✔
116
            if (debug.enabled('ezs:trace')) {
3,966,682✔
117
                this.ttime += (hrtime.bigint() - start);
9,331✔
118
            }
119
            done();
3,966,682✔
120
        };
121
        if (this.nullWasSent) {
3,967,019✔
122
            if (this.parentStream && this.parentStream.unpipe) {
1,910✔
123
                this.parentStream.unpipe(this);
1,325✔
124
            }
125
            return next();
1,910✔
126
        }
127
        this.index += 1;
3,965,109✔
128
        if (chunk instanceof Error) {
3,965,109✔
129
            this.push(chunk);
3,010✔
130
            return next();
3,010✔
131
        }
132
        this.queuing(chunk, next);
3,962,099✔
133
    }
134

135
    _flush(done) {
136
        if (this.nullWasSent) {
55,805✔
137
            return done();
652✔
138
        }
139
        this.index += 1;
55,153✔
140
        return this.queuing(null, () => {
55,153✔
141
            const stop = hrtime.bigint();
52,903✔
142
            if (debug.enabled('ezs:trace')) {
52,903✔
143
                const cumulative = nano2sec(stop - this.stime);
1,113✔
144
                const elapsed = nano2sec(this.ttime);
1,113✔
145
                debug('ezs:trace')(`${cumulative}s cumulative ${elapsed}s elapsed for [${this.funcName}]`);
1,113✔
146
            }
147
            done();
52,903✔
148
        });
149
    }
150

151
    queuing(chunk, next) {
152
        if (this.func) {
4,017,252✔
153
            return this.queue(chunk, next);
3,958,378✔
154
        } else {
155
            this.funcPromise
58,874✔
156
                .then((func) => {
157
                    if (typeof func != 'function'){
58,874✔
158
                        this.emit('error', new Error(
84✔
159
                            `'  ${func}' is not loaded. It's not a valid statement function.`,
160
                        ));
161
                        return next();
84✔
162
                    }
163
                    this.func = func;
58,790✔
164
                    this.funcName = String(func.name || 'unamed');
58,790✔
165
                    return this.queue(chunk, next);
58,790✔
166
                })
167
                .catch( (e) => {
NEW
168
                    this.emit('error', e);
×
NEW
169
                    return next();
×
170
                });
171
        }
172
    }
173

174
    isReady() {
175
        return ((!this._readableState.ended
17,116✔
176
            && (this._readableState.length < this._readableState.highWaterMark
177
                || this._readableState.length === 0))
178
            || this.nullWasSent);
179
    }
180

181
    execWith(chunk, done) {
182
        if (this.errorWasSent || this.nullWasSent) {
4,017,109✔
183
            return done();
45✔
184
        }
185
        const currentIndex = this.index;
4,017,064✔
186
        if (chunk === null && currentIndex === 1) {
4,017,064✔
187
            this.nullWasSent = true;
884✔
188
            this.push(null);
884✔
189
            return done();
884✔
190
        }
191
        const warn = (error) => {
4,016,180✔
192
            if (!this.errorWasSent) {
2,500!
193
              this.errorWasSent = true;
2,500✔
194
              const warnErr = createErrorWith(error, currentIndex, this.funcName, this.params, chunk);
2,500✔
195
              debug('ezs:warn')('ezs engine emit an', this.ezs.serializeError(warnErr));
2,500✔
196
              this.emit('error', warnErr);
2,500✔
197
            }
198
        };
199
        const push = (data) => {
4,016,180✔
200
            if (data === null) {
4,393,094✔
201
                this.nullWasSent = true;
60,208✔
202
                this.nullWasSentError = createErrorWith(new Error('As a reminder, the end was recorded at this point'), currentIndex, this.funcName, this.params, chunk);
60,208✔
203
            } else if (this.nullWasSent && !this.errorWasSent) {
4,332,886✔
204
                debug('ezs:warn')('Unstable state', this.ezs.serializeError(createErrorWith(new Error('Oops, that\'s going to crash ?'), currentIndex, this.funcName, this.params, chunk)));
7✔
205
                return warn(this.nullWasSentError);
7✔
206
            }
207
            if (!this.nullWasSent && this._readableState.ended) {
4,393,087!
208
                return warn(new Error('No back pressure control ?'));
×
209
            }
210
            if (data instanceof Error) {
4,393,087✔
211
                const ignoreErr = createErrorWith(data, currentIndex, this.funcName, this.params, chunk);
8,193✔
212
                debug('ezs:info')(`Ignoring error at item #${currentIndex}`, this.ezs.serializeError(ignoreErr));
8,193✔
213
                return this.push(ignoreErr);
8,193✔
214
            }
215
            if (!this.errorWasSent) {
4,384,894✔
216
                return this.push(data);
4,382,432✔
217
            }
218
        };
219
        const wait = async () => {
4,016,180✔
220
            this.pause();
8,475✔
221
            await pWaitFor(() => (this.isReady()), { interval: 20 });
17,116✔
222
            return this.resume();
8,475✔
223
        };
224
        const feed = new Feed(this.ezs, push, done, warn, wait);
4,016,180✔
225
        feed.engine = this;
4,016,180✔
226
        try {
4,016,180✔
227
            this.chunk = chunk;
4,016,180✔
228
            return Promise.resolve(this.func.call(this.scope, chunk, feed, this.scope)).catch((e) => {
4,016,180✔
229
                const asyncErr = createErrorWith(e, currentIndex, this.funcName, this.params, chunk);
168✔
230
                debug('ezs:error')(`Async error thrown at item #${currentIndex}, pipeline is broken`, this.ezs.serializeError(asyncErr));
168✔
231
                this.emit('error', asyncErr);
168✔
232
                done();
168✔
233
            });
234
        } catch (e) {
235
            const syncErr = createErrorWith(e, currentIndex, this.funcName, this.params, chunk);
2,387✔
236
            debug('ezs:error')(`Sync error thrown at item #${currentIndex}, pipeline carries errors`, this.ezs.serializeError(syncErr));
2,387✔
237
            this.push(syncErr);
2,387✔
238
            return done();
2,380✔
239
        }
240
    }
241
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc