• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Inist-CNRS / ezs / 16826319737

08 Aug 2025 08:56AM UTC coverage: 95.548%. First build
16826319737

Pull #461

github

web-flow
Merge 7ce1aa035 into caf9e19d3
Pull Request #461: refactor: 💡 avoid dynamic require

2207 of 2385 branches covered (92.54%)

Branch coverage included in aggregate %.

50 of 52 new or added lines in 3 files covered. (96.15%)

4553 of 4690 relevant lines covered (97.08%)

79285.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.1
/packages/core/src/engine.js
1
/* eslint no-underscore-dangle: ["error", { "allow": ["_readableState"] }] */
2
import debug from 'debug';
3
import { types } from 'util';
4
import queue from 'concurrent-queue';
5
import { hrtime } from 'process';
6
import eos from 'end-of-stream';
7
import pWaitFor from 'p-wait-for';
8
import stringify from 'json-stringify-safe';
9
import Feed from './feed.js';
10
import Shell from './shell.js';
11

12
import SafeTransform from './SafeTransform.js';
13

14
/**
15
 * Engine scope object type
16
 * @private
17
 * @typedef {Object} EngineScope
18
 *
19
 * @property {Engine} ezs
20
 * @property {(d: unknown, c: unknown) => void} emit
21
 * @property {() => any} getParams
22
 * @property {() => boolean} isFirst
23
 * @property {() => number} getIndex
24
 * @property {() => boolean} isLast
25
 * @property {() => string} getCumulativeTime
26
 * @property {() => number} getCumulativeTimeMS
27
 * @property {() => number} getCounter
28
 * @property {(name: string, defval?: string | string[], chunk?: unknown) => string | string[] | undefined} getParam
29
 */
30

31
const nanoZero = () => BigInt(0);
59,281✔
32

33
const nano2sec = (ns) => {
1,237✔
34
    const sec = ns / BigInt(1e9);
2,324✔
35
    const msec = (ns / BigInt(1e6)) - (sec * BigInt(1e3));
2,324✔
36
    const time = Number(sec) + (Number(msec) / 10000);
2,324✔
37
    return Number(time).toFixed(4);
2,324✔
38
};
39
let counter = 0;
1,237✔
40
function increaseCounter() {
41
    counter += 1;
59,281✔
42
}
43
function decreaseCounter() {
44
    counter -= 1;
56,369✔
45
}
46

47
function createErrorWith(error, index, funcName, funcParams, chunk) {
48
    const stk = String(error.stack).split('\n');
73,645✔
49
    const prefix = `item #${index} `;
73,645✔
50
    const erm = stk.shift().replace(prefix, '');
73,645✔
51
    const msg = `${prefix}[${funcName}] <${erm}>\n\t${stk.slice(0, 10).join('\n\t')}`;
73,645✔
52
    const err = Error(msg);
73,645✔
53
    err.sourceError = error;
73,645✔
54
    err.sourceChunk = stringify(chunk);
73,645✔
55
    err.type = error.type || 'Standard error';
73,645✔
56
    err.scope = error.scope || 'code';
73,645✔
57
    err.date = error.date || new Date();
73,645✔
58
    err.message = msg.split('\n').shift();
73,645✔
59
    err.func = funcName;
73,645✔
60
    err.params = funcParams;
73,645✔
61
    err.traceback = stk.slice(0,10);
73,645✔
62
    err.index = index;
73,645✔
63
    Error.captureStackTrace(err, createErrorWith);
73,645✔
64
    return err;
73,645✔
65
}
66

67
export default class Engine extends SafeTransform {
68
    constructor(ezs, func, params, environment) {
69
        super(ezs.objectMode());
59,281✔
70
        this.funcName = 'Not yet defined';
59,281✔
71
        this.funcPromise = types.isPromise(func) ? func : Promise.resolve(func);
59,281!
72
        this.index = 0;
59,281✔
73
        this.ttime = nanoZero();
59,281✔
74
        this.stime = hrtime.bigint();
59,281✔
75
        this.params = params || {};
59,281✔
76
        this.ezs = ezs;
59,281✔
77
        this.environment = environment || {};
59,281✔
78
        this.errorWasSent = false;
59,281✔
79
        this.nullWasSent = false;
59,281✔
80
        this.queue = queue().limit(ezs.settings.queue).process((task, cb) => {
59,281✔
81
            this.execWith(task, cb);
4,016,741✔
82
        });
83
        this.on('pipe', (src) => {
59,281✔
84
            this.parentStream = src;
59,159✔
85
        });
86
        increaseCounter();
59,281✔
87
        eos(this, decreaseCounter);
59,281✔
88
        this.shell = new Shell(ezs, this.environment);
59,281✔
89
        this.chunk = {};
59,281✔
90
        /**
91
         * @private
92
         * @type {EngineScope}
93
         */
94
        this.scope = {};
59,281✔
95
        this.scope.getEnv = (name) => (name === undefined ? this.environment : this.environment[name]);
68,002✔
96
        this.scope.ezs = this.ezs;
59,281✔
97
        this.scope.emit = (d, c) => this.emit(d, c);
59,281✔
98
        this.scope.getParams = () => this.params;
59,281✔
99
        this.scope.isFirst = () => (this.index === 1);
1,401,448✔
100
        this.scope.getIndex = () => this.index;
59,281✔
101
        this.scope.isLast = () => (this.chunk === null);
3,541,154✔
102
        this.scope.getCumulativeTime = () => nano2sec(hrtime.bigint() - this.stime);
59,281✔
103
        this.scope.getCumulativeTimeMS = () => nano2sec(hrtime.bigint() - this.stime) * 1000;
59,281✔
104
        this.scope.getCounter = () => counter;
59,281✔
105
        this.scope.getParam = (name, defval, chunk) => {
59,281✔
106
            if (this.params[name] !== undefined) {
3,846,235✔
107
                return this.shell.run(this.params[name], chunk || this.chunk);
3,362,613✔
108
            }
109
            return defval;
483,622✔
110
        };
111
    }
112

113
    _transform(chunk, encoding, done) {
114
        const start = hrtime.bigint();
3,966,676✔
115
        const next = () => {
3,966,676✔
116
            if (debug.enabled('ezs:trace')) {
3,966,351✔
117
                this.ttime += (hrtime.bigint() - start);
9,331✔
118
            }
119
            done();
3,966,351✔
120
        };
121
        if (this.nullWasSent) {
3,966,676✔
122
            if (this.parentStream && this.parentStream.unpipe) {
1,884✔
123
                this.parentStream.unpipe(this);
1,296✔
124
            }
125
            return next();
1,884✔
126
        }
127
        this.index += 1;
3,964,792✔
128
        if (chunk instanceof Error) {
3,964,792✔
129
            this.push(chunk);
3,126✔
130
            return next();
3,126✔
131
        }
132
        this.queuing(chunk, next);
3,961,666✔
133
    }
134

135
    _flush(done) {
136
        if (this.nullWasSent) {
55,868✔
137
            return done();
655✔
138
        }
139
        this.index += 1;
55,213✔
140
        return this.queuing(null, () => {
55,213✔
141
            const stop = hrtime.bigint();
52,976✔
142
            if (debug.enabled('ezs:trace')) {
52,976✔
143
                const cumulative = nano2sec(stop - this.stime);
1,113✔
144
                const elapsed = nano2sec(this.ttime);
1,113✔
145
                debug('ezs:trace')(`${cumulative}s cumulative ${elapsed}s elapsed for [${this.funcName}]`);
1,113✔
146
            }
147
            done();
52,976✔
148
        });
149
    }
150

151
    queuing(chunk, next) {
152
        if (this.func) {
4,016,879✔
153
            return this.queue(chunk, next);
3,958,010✔
154
        } else {
155
            this.funcPromise
58,869✔
156
                .then((func) => {
157
                    if (typeof func != 'function'){
58,869✔
158
                        this.emit('error', new Error(
84✔
159
                            `'  ${func}' is not loaded. It's not a valid statement function.`,
160
                        ));
161
                        return next();
84✔
162
                    }
163
                    this.func = func;
58,785✔
164
                    this.funcName = String(func.name || 'unamed');
58,785✔
165
                    return this.queue(chunk, next);
58,785✔
166
                })
167
                .catch( (e) => {
NEW
168
                    this.emit('error', e);
×
NEW
169
                    return next();
×
170
                });
171
        }
172
    }
173

174
    isReady() {
175
        return ((!this._readableState.ended
17,636✔
176
            && (this._readableState.length < this._readableState.highWaterMark
177
                || this._readableState.length === 0))
178
            || this.nullWasSent);
179
    }
180

181
    execWith(chunk, done) {
182
        if (this.errorWasSent || this.nullWasSent) {
4,016,741✔
183
            return done();
43✔
184
        }
185
        const currentIndex = this.index;
4,016,698✔
186
        if (chunk === null && currentIndex === 1) {
4,016,698✔
187
            this.nullWasSent = true;
885✔
188
            this.push(null);
885✔
189
            return done();
885✔
190
        }
191
        const warn = (error) => {
4,015,813✔
192
            if (!this.errorWasSent) {
2,452!
193
              this.errorWasSent = true;
2,452✔
194
              const warnErr = createErrorWith(error, currentIndex, this.funcName, this.params, chunk);
2,452✔
195
              debug('ezs:warn')('ezs engine emit an', this.ezs.serializeError(warnErr));
2,452✔
196
              this.emit('error', warnErr);
2,452✔
197
            }
198
        };
199
        const push = (data) => {
4,015,813✔
200
            if (data === null) {
4,392,904✔
201
                this.nullWasSent = true;
60,280✔
202
                this.nullWasSentError = createErrorWith(new Error('As a reminder, the end was recorded at this point'), currentIndex, this.funcName, this.params, chunk);
60,280✔
203
            } else if (this.nullWasSent && !this.errorWasSent) {
4,332,624✔
204
                debug('ezs:warn')('Unstable state', this.ezs.serializeError(createErrorWith(new Error('Oops, that\'s going to crash ?'), currentIndex, this.funcName, this.params, chunk)));
7✔
205
                return warn(this.nullWasSentError);
7✔
206
            }
207
            if (!this.nullWasSent && this._readableState.ended) {
4,392,897!
208
                return warn(new Error('No back pressure control ?'));
×
209
            }
210
            if (data instanceof Error) {
4,392,897✔
211
                const ignoreErr = createErrorWith(data, currentIndex, this.funcName, this.params, chunk);
8,351✔
212
                debug('ezs:info')(`Ignoring error at item #${currentIndex}`, this.ezs.serializeError(ignoreErr));
8,351✔
213
                return this.push(ignoreErr);
8,351✔
214
            }
215
            if (!this.errorWasSent) {
4,384,546✔
216
                return this.push(data);
4,382,130✔
217
            }
218
        };
219
        const wait = async () => {
4,015,813✔
220
            this.pause();
8,720✔
221
            await pWaitFor(() => (this.isReady()), { interval: 20 });
17,636✔
222
            return this.resume();
8,720✔
223
        };
224
        const feed = new Feed(this.ezs, push, done, warn, wait);
4,015,813✔
225
        feed.engine = this;
4,015,813✔
226
        try {
4,015,813✔
227
            this.chunk = chunk;
4,015,813✔
228
            return Promise.resolve(this.func.call(this.scope, chunk, feed, this.scope)).catch((e) => {
4,015,813✔
229
                const asyncErr = createErrorWith(e, currentIndex, this.funcName, this.params, chunk);
168✔
230
                debug('ezs:error')(`Async error thrown at item #${currentIndex}, pipeline is broken`, this.ezs.serializeError(asyncErr));
168✔
231
                this.emit('error', asyncErr);
168✔
232
                done();
168✔
233
            });
234
        } catch (e) {
235
            const syncErr = createErrorWith(e, currentIndex, this.funcName, this.params, chunk);
2,387✔
236
            debug('ezs:error')(`Sync error thrown at item #${currentIndex}, pipeline carries errors`, this.ezs.serializeError(syncErr));
2,387✔
237
            this.push(syncErr);
2,387✔
238
            return done();
2,380✔
239
        }
240
    }
241
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc