• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

suculent / thinx-device-api / #252646958

14 Mar 2026 04:20PM UTC coverage: 62.123% (-0.1%) from 62.27%
#252646958

push

suculent
Share database init across test bootstraps

1739 of 3722 branches covered (46.72%)

Branch coverage included in aggregate %.

34 of 53 new or added lines in 1 file covered. (64.15%)

20 existing lines in 3 files now uncovered.

7208 of 10680 relevant lines covered (67.49%)

8.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.98
/lib/thinx/queue.js
1
// Build Queue Manager
2

3
const Globals = require("./globals");
1✔
4
const Notifier = require("./notifier");
1✔
5
const Action = require("./queue_action");
1✔
6

7
const schedule = require('node-schedule');
1✔
8
const io = require('socket.io-client');
1✔
9

10
const express = require("express");
1✔
11
let app = express(); // may be replaced by application's main Express instance, this is for stand-alone testing only
1✔
12
app.disable('x-powered-by');
1✔
13
const helmet = require("helmet");
1✔
14
app.use(helmet.frameguard());
1✔
15
module.exports = class Queue {
1✔
16

17
    checkSocketIoConnect(url, timeout) {
18
        return new Promise(function (resolve, reject) {
3✔
19
            timeout = timeout || 5000;
3✔
20
            let socket = io(url, { reconnection: false, timeout: timeout });
3✔
21
            let timer;
22
            let errAlready = false;
3✔
23

24
            // common error handler
25
            function error(data) {
26
                if (timer) {
3!
27
                    clearTimeout(timer);
3✔
28
                    timer = null;
3✔
29
                }
30
                if (!errAlready) {
3!
31
                    errAlready = true;
3✔
32
                    reject(data);
3✔
33
                    socket.disconnect();
3✔
34
                }
35
            }
36

37
            // set our own timeout in case the socket ends some other way than what we are listening for
38
            timer = setTimeout(function () {
3✔
39
                timer = null;
×
40
                error("local timeout");
×
41
            }, timeout);
42

43
            // success
44
            socket.on("connect", function () {
3✔
45
                clearTimeout(timer);
2✔
46
                resolve();
2✔
47
                socket.close();
2✔
48
            });
49

50
            // errors
51
            socket.on("connect_error", error);
3✔
52
            socket.on("connect_timeout", error);
3✔
53
            socket.on("error", error);
3✔
54
            socket.on("disconnect", error);
3✔
55

56
        });
57
    }
58

59

60
    constructor(redis, builder, di_app, ssl_options, opt_thx) {
61

62
        if (typeof (redis) === "undefined") throw new Error("Queue now requires connected Redis.");
18!
63

64
        this.thx = opt_thx;
18✔
65

66
        this.port = 4000;
18✔
67

68
        if (typeof (Globals.app_config().builder) !== "undefined") {
18!
69
            this.maxRunningBuilds = Globals.app_config().builder.concurrency;
18✔
70
        } else {
71
            console.log("⚠️ [warning] builder.concurrency integer not set in config.json, falling back to default 1 (disabled)");
×
72
            this.maxRunningBuilds = 1;
×
73
        }
74

75
        this.redis = redis;
18✔
76
        this.notifier = new Notifier();
18✔
77

78
        if ((typeof (di_app) !== "undefined") && (di_app !== null)) app = di_app;
18✔
79

80
        if (builder !== null) {
18!
81

82
            this.builder = builder;
18✔
83

84
            const skipServerBootstrap = process.env.ENVIRONMENT === "test" && typeof (opt_thx) !== "undefined" && opt_thx !== null;
18✔
85

86
            if (skipServerBootstrap) {
18✔
87
                console.log("ℹ️ [info] Skipping Queue listener bootstrap for THiNX test instance.");
15✔
88
                this.io = null;
15✔
89
                this.builder.io = null;
15✔
90
                this.workers = [];
15✔
91
                return;
15✔
92
            }
93

94
            try {
3✔
95

96
                // HTTP Fallback is for testing only
97
                if ((typeof (ssl_options) !== "undefined") && (ssl_options !== null)) {
3!
98
                    this.https = require('https').Server(ssl_options, di_app);
×
99
                } else {
100
                    this.https = require('http').Server(di_app);
3✔
101
                }
102

103
                this.initloop(this.https, this.port, true);
3✔
104
                this.io = require('socket.io')(this.https,
3✔
105
                    {
106
                        //transports: ['websocket'],
107
                        //allowUpgrades: false,
108
                        pingTimeout: 300000
109
                    }
110
                );
111
                this.setupIo(this.io);
3✔
112
                this.builder.io = this.io;
3✔
113

114
            } catch (e) {
115
                console.log("[queue] server init failed, eating exception, this Queue is initialized WITHOUT listener...");
×
116
            }
117

118
        } else {
119
            console.log("☣️ [error] No builder defined for queue, not setting up IO!");
×
120
        }
121

122
        this.workers = [];
3✔
123
    }
124

125
    initloop(server, port, firstRun) {
126

127
        // if (server.isRunning) return;
128

129
        let sock_url = "http://localhost:" + port;
3✔
130

131
        console.log(`ℹ️ [warning] [queue] checking socket port availability at ${sock_url} seems to destroy existing test socket`);
3✔
132

133
        this.checkSocketIoConnect(sock_url).then(function () {
3✔
134
            console.log(`ℹ️ [info] BuildServer Manager already listening on port ${port}`);
2✔
135
        }, function (reason) {
136
            if (reason.toString().indexOf("Error: xhr poll error") === -1) {
1!
137
                console.log(`[error] [queue] initloop unexpected reason: ${reason}`);
×
138
                return;
×
139
            }
140
            try {
1✔
141
                server.listen(port, function () {
1✔
142
                    console.log(`ℹ️ [info] BuildServer Manager listening on port ${port}`);
1✔
143
                });
144
            } catch (e) {
145
                if (firstRun) {
×
146
                    port += 1;
×
147
                    console.log(`ℹ️ [info] BuildServer Manager restarting on port ${port}`);
×
148
                    this.initloop(server, port, false);
×
149
                }
150
            }
151
        });
152
    }
153

154

155
    getWorkers() {
156
        return this.workers || [];
2!
157
    }
158

159
    cron() {
160
        let cron_rule = "*/5 * * * *";
1✔
161
        this.schedule = schedule.scheduleJob(cron_rule, () => {
1✔
UNCOV
162
            this.loop();
×
163
        });
164
    }
165

166
    add(udid, source, owner_id, opt_callback) {
167
        let action = new Action(udid, this.redis);
4✔
168
        action.queueWithSource(source, owner_id, opt_callback);
4✔
169
    }
170

171
    pruneIfCompleted(action) {
172
        if ((action.status == "success") || (action.status == "error")) {
12!
173
            console.log("ℹ️ [info] Pruning completed build action...");
×
174
            action.delete();
×
175
            return true;
×
176
        }
177
        return false;
12✔
178
    }
179

180
    // TODO: FIXME: This is ugly and wrong and needs refactoring. There cannot be external variable "limit"
181
    // to the async callback that controlls it... the callback should just report true and action (is running, is waiting)
182
    // the limit must be held for actions that are waiting only so this could be filtered first
183
    // there is a side effect of pruning...
184
    async findNext() {
185
        let action_keys = await this.redis.v4.keys("queue:*");
12✔
186
        if ((typeof(action_keys) === "undefined") || (action_keys === null)) {
12!
187
            return Promise.resolve(false);
×
188
        }
189

190
        //console.log("[DEBUG] findNext action_keys:", action_keys);
191
        
192
        let limit = Math.abs(this.maxRunningBuilds); // abs to force copy
12✔
193

194
        for (let i = 0, len = action_keys.length; i < len; i++) {
12✔
195

196
            if (limit < 1) continue;
12!
197
            let action_key = action_keys[i];
12✔
198
            let uaid = action_key.replace("queue:", "");
12✔
199

200
            let contents = await this.redis.v4.get(action_key);
12✔
201

202
            let action = new Action(uaid, this.redis).withString(contents);
12✔
203

204
            // Prune completed actions instead of running...
205
            this.pruneIfCompleted(action);
12✔
206

207
            // Count-in running actions
208
            if (action && action.isRunning()) {
12!
209
                limit--;
×
210
            }
211

212
            // Return next waiting action
213
            if (action.isWaiting() && (limit > 0)) {
12!
214
                let a = action.action;
12✔
215
                console.log(`ℹ️ [info] Scheduling waiting build action ${a.build_id} with remaining concurrent job limit ${limit}`);
12✔
216
                limit--;
12✔
217
                return Promise.resolve(action);
12✔
218
            }
219
        }
220
        return Promise.resolve(false); // added to complete (in tests)
×
221
    }
222

223
    actionWorkerValid(action, worker) {
224
        let valid = true;
11✔
225
        if ((typeof (action) === "undefined") || (action === null) || (action === false)) {
11!
226
            console.log("☣️ [error] actionWorkerValid called with empty action, skipping...");
×
227
            valid = false;
×
228
        }
229
        if ((typeof (worker) === "undefined") || (worker === null) || (worker === false)) {
11!
230
            if (action) {
11!
231
                try {
11✔
232
                    console.log("☣️ [error] actionWorkerValid called with empty worker, skipping, will set error for action", action.action);
11✔
233
                    action.setError();
11✔
234
                } catch (e) {
235
                    console.log("☣️ [error] actionWorkerValid exception", e);
×
236
                }
237
            }
238
            valid = false;
11✔
239
        }
240
        return valid;
11✔
241
    }
242

243
    runNext(action, worker) {
244

245
        if (!this.actionWorkerValid(action, worker)) {
11!
246
            console.log(`☣️ [error] runNext failed, skipping ${action}...`);
11✔
247
            return;
11✔
248
        }
249

250
        // Scheduler runs actions one after each (FIFO), about once a minute.
UNCOV
251
        console.log("ℹ️ [info] runNext:", JSON.stringify(action));
×
252

UNCOV
253
        if (typeof (action.setStarted) === "function") {
×
UNCOV
254
            action.setStarted();
×
255
        } else {
256
            console.log("☣️ [error] Edge case: action has no functions!" + JSON.stringify(action));
×
257
            return;
×
258
        }
259

UNCOV
260
        let source_id = action.action.source;
×
261

UNCOV
262
        const build = {
×
263
            udid: action.action.udid,
264
            source_id: source_id,
265
            dryrun: false
266
        };
267

UNCOV
268
        if ((typeof(worker) !== "undefined") && (worker !== null) && (worker !== false)) worker.running = true;
×
269

UNCOV
270
        this.builder.build(
×
271
            action.action.owner_id,
272
            build,
273
            [], // notifiers
274
            (success, message) => {
UNCOV
275
                console.log("ℹ️ [info] 1 - Build exit state", success, message);
×
UNCOV
276
                console.log("ℹ️ [info] 2 - Deleting action after build request completed, set worker to not running...");
×
UNCOV
277
                action.delete();
×
UNCOV
278
                if ((typeof(worker) !== "undefined") && (worker !== null) && (worker !== false)) {
×
UNCOV
279
                    worker.running = false;
×
280
                }
281
            }, // callback
282
            worker // worker
283
        );
284
    }
285

286
    // should be called using scheduler; can be async! and findNext too!
287
    async loop() {
288
        // check events in queue and schedule one if eligible
289
        let next = await this.findNext();
10✔
290
        if (next) {
10!
291
            let workerAvailable = this.nextAvailableWorker();
10✔
292
            if (workerAvailable !== null) {
10!
293
                this.runNext(next, workerAvailable);
10✔
294
            }
295
        }
296
    }
297

298
    //
299

300
    //
301
    // WebSocket Build Server Extension
302
    //
303

304
    // Interface to Builder
305

306
    nextAvailableWorker() {
307
        for (let index in this.workers) {
13✔
UNCOV
308
            if (
×
309
                (this.workers[index].connected === true) &&
×
310
                (this.workers[index].running === false)
311
            ) {
UNCOV
312
                console.log("ℹ️ [info] Queue found available worker", index);
×
UNCOV
313
                return this.workers[index];
×
314
            }
315
        }
316
        return false;
13✔
317
    }
318

319
    // Internals
320

321
    parseSocketMessage(socket, msg) {
322
        // STATUS
323
        if (typeof (msg.status) === "undefined") return;
1!
324

325
        // Assign client id to new workers...
326
        if (0 === msg.status.indexOf("Hello")) {
1!
327
            socket.emit('client id', socket.id); // important, assigns socket ID to worker
1✔
328
            let previous_id = msg.id || null;
1✔
329
            let running = msg.running || false;
1✔
330
            // tracking
331
            this.workers[socket.id] = {
1✔
332
                previous_id: previous_id,
333
                running: running,
334
                connected: true,
335
                socket: socket
336
            };
337
        }
338
    }
339

340
    setupSocket(socket) {
341

342
        let that = this;
3✔
343

344
        socket.on('connect', () => {
3✔
345
            console.log(`ℹ️ [info] Worker connected: ${socket.id}`);
×
346
            that.workers[socket.id].connected = true;
×
347
            //
348
            if (typeof (this.thx) !== "undefined") this.thx.emit("workerReady");
×
349
        });
350

351
        socket.on('disconnect', () => {
3✔
352
            console.log(`ℹ️ [info] [queue] Unregistering disconnected worker ${socket.id}.`);
2✔
353
            if (typeof (socket.id) !== "undefined") {
2!
354
                delete that.workers[socket.id];
2✔
355
            } else {
356
                console.log("Socket ID undefined on disconnect.");
×
357
            }
358
        });
359

360
        // either by directly modifying the `auth` attribute
361
        socket.on("connect_error", () => {
3✔
362
            if ((typeof (process.env.WORKER_SECRET) !== "undefined")) {
×
363
                socket.auth.token = process.env.WORKER_SECRET;
×
364
                console.log("connect_error attempt to resolve using WORKER_SECRET");
×
365
                socket.connect();
×
366
            }
367
            console.log("onerror workers", that.workers);
×
368
        });
369

370
        // Business Logic events
371

372
        socket.on('register', (msg) => {
3✔
373
            if (typeof (that.workers[socket.id]) === "undefined") {
1!
374
                that.workers[socket.id] = {
1✔
375
                    connected: true,
376
                    socket: socket,
377
                    running: false
378
                };
379
            }
380
            this.parseSocketMessage(socket, msg);
1✔
381

382
            console.log("ℹ️ [info] Currently registered workers", Object.keys(that.workers));
1✔
383
        });
384

385
        socket.on('poll', async (msg) => {
3✔
386
            console.log("ℹ️ [info] Worker is polling...", msg);
×
387
            let next = await this.findNext();
×
388
            if (next) this.runNext(next, socket.id);
×
389
        });
390

391
        socket.on('job-status', (job_status) => {
3✔
392
            this.notifier.process(job_status, (result) => {
×
393
                console.log("ℹ️ [info] [queue] Notifier's Processing result:", result);
×
394
            });
395
            if ((typeof (this.workers[socket.id]) !== "undefined") && (this.workers[socket.id] !== null)) {
×
396
                this.workers[socket.id].running = false;
×
397
            }
398
        });
399
    }
400

401
    setupIo(dio) {
402
        dio.on('connection', (socket) => {
3✔
403
            this.setupSocket(socket);
3✔
404
        });
405
    }
406

407
};
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc