• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

suculent / thinx-device-api / #252646833

01 Dec 2025 10:23AM UTC coverage: 71.912% (-0.01%) from 71.926%
#252646833

push

suculent
base update

1878 of 3548 branches covered (52.93%)

Branch coverage included in aggregate %.

8222 of 10497 relevant lines covered (78.33%)

13.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.73
/lib/thinx/queue.js
1
// Build Queue Manager
2

3
const Globals = require("./globals");
1✔
4
const Notifier = require("./notifier");
1✔
5
const Action = require("./queue_action");
1✔
6

7
const schedule = require('node-schedule');
1✔
8
const io = require('socket.io-client');
1✔
9

10
const express = require("express");
1✔
11
let app = express(); // may be replaced by application's main Express instance, this is for stand-alone testing only
1✔
12
app.disable('x-powered-by');
1✔
13
const helmet = require("helmet");
1✔
14
app.use(helmet.frameguard());
1✔
15
module.exports = class Queue {
1✔
16

17
    checkSocketIoConnect(url, timeout) {
18
        return new Promise(function (resolve, reject) {
18✔
19
            timeout = timeout || 5000;
18✔
20
            let socket = io(url, { reconnection: false, timeout: timeout });
18✔
21
            let timer;
22
            let errAlready = false;
18✔
23

24
            // common error handler
25
            function error(data) {
26
                if (timer) {
18!
27
                    clearTimeout(timer);
18✔
28
                    timer = null;
18✔
29
                }
30
                if (!errAlready) {
18!
31
                    errAlready = true;
18✔
32
                    reject(data);
18✔
33
                    socket.disconnect();
18✔
34
                }
35
            }
36

37
            // set our own timeout in case the socket ends some other way than what we are listening for
38
            timer = setTimeout(function () {
18✔
39
                timer = null;
×
40
                error("local timeout");
×
41
            }, timeout);
42

43
            // success
44
            socket.on("connect", function () {
18✔
45
                clearTimeout(timer);
17✔
46
                resolve();
17✔
47
                socket.close();
17✔
48
            });
49

50
            // errors
51
            socket.on("connect_error", error);
18✔
52
            socket.on("connect_timeout", error);
18✔
53
            socket.on("error", error);
18✔
54
            socket.on("disconnect", error);
18✔
55

56
        });
57
    }
58

59

60
    constructor(redis, builder, di_app, ssl_options, opt_thx) {
61

62
        if (typeof (redis) === "undefined") throw new Error("Queue now requires connected Redis.");
18!
63

64
        this.thx = opt_thx;
18✔
65

66
        this.port = 4000;
18✔
67

68
        if (typeof (Globals.app_config().builder) !== "undefined") {
18!
69
            this.maxRunningBuilds = Globals.app_config().builder.concurrency;
18✔
70
        } else {
71
            console.log("⚠️ [warning] builder.concurrency integer not set in config.json, falling back to default 1 (disabled)");
×
72
            this.maxRunningBuilds = 1;
×
73
        }
74

75
        this.redis = redis;
18✔
76
        this.notifier = new Notifier();
18✔
77

78
        if ((typeof (di_app) !== "undefined") && (di_app !== null)) app = di_app;
18✔
79

80
        if (builder !== null) {
18!
81

82
            this.builder = builder;
18✔
83

84
            try {
18✔
85

86
                // HTTP Fallback is for testing only
87
                if ((typeof (ssl_options) !== "undefined") && (ssl_options !== null)) {
18!
88
                    this.https = require('https').Server(ssl_options, di_app);
×
89
                } else {
90
                    this.https = require('http').Server(di_app);
18✔
91
                }
92

93
                this.initloop(this.https, this.port, true);
18✔
94
                this.io = require('socket.io')(this.https,
18✔
95
                    {
96
                        //transports: ['websocket'],
97
                        //allowUpgrades: false,
98
                        pingTimeout: 300000
99
                    }
100
                );
101
                this.setupIo(this.io);
18✔
102
                this.builder.io = this.io;
18✔
103

104
            } catch (e) {
105
                console.log("[queue] server init failed, eating exception, this Queue is initialized WITHOUT listener...");
×
106
            }
107

108
        } else {
109
            console.log("☣️ [error] No builder defined for queue, not setting up IO!");
×
110
        }
111

112
        this.workers = [];
18✔
113
    }
114

115
    initloop(server, port, firstRun) {
116

117
        // if (server.isRunning) return;
118

119
        let sock_url = "http://localhost:" + port;
18✔
120

121
        console.log(`ℹ️ [warning] [queue] checking socket port availability at ${sock_url} seems to destroy existing test socket`);
18✔
122

123
        this.checkSocketIoConnect(sock_url).then(function () {
18✔
124
            console.log(`ℹ️ [info] BuildServer Manager already listening on port ${port}`);
17✔
125
        }, function (reason) {
126
            if (reason.toString().indexOf("Error: xhr poll error") === -1) {
1!
127
                console.log(`[error] [queue] initloop unexpected reason: ${reason}`);
×
128
                return;
×
129
            }
130
            try {
1✔
131
                server.listen(port, function () {
1✔
132
                    console.log(`ℹ️ [info] BuildServer Manager listening on port ${port}`);
1✔
133
                });
134
            } catch (e) {
135
                if (firstRun) {
×
136
                    port += 1;
×
137
                    console.log(`ℹ️ [info] BuildServer Manager restarting on port ${port}`);
×
138
                    this.initloop(server, port, false);
×
139
                }
140
            }
141
        });
142
    }
143

144

145
    getWorkers() {
146
        return this.workers || [];
2!
147
    }
148

149
    cron() {
150
        let cron_rule = "*/5 * * * *";
16✔
151
        this.schedule = schedule.scheduleJob(cron_rule, () => {
16✔
152
            this.loop();
×
153
        });
154
    }
155

156
    add(udid, source, owner_id, opt_callback) {
157
        let action = new Action(udid, this.redis);
4✔
158
        action.queueWithSource(source, owner_id, opt_callback);
4✔
159
    }
160

161
    pruneIfCompleted(action) {
162
        if ((action.status == "success") || (action.status == "error")) {
23!
163
            console.log("ℹ️ [info] Pruning completed build action...");
×
164
            action.delete();
×
165
            return true;
×
166
        }
167
        return false;
23✔
168
    }
169

170
    // TODO: FIXME: This is ugly and wrong and needs refactoring. There cannot be external variable "limit"
171
    // to the async callback that controlls it... the callback should just report true and action (is running, is waiting)
172
    // the limit must be held for actions that are waiting only so this could be filtered first
173
    // there is a side effect of pruning...
174
    async findNext() {
175
        let action_keys = await this.redis.v4.keys("queue:*");
12✔
176
        if ((typeof(action_keys) === "undefined") || (action_keys === null)) {
12!
177
            return Promise.resolve(false);
×
178
        }
179

180
        //console.log("[DEBUG] findNext action_keys:", action_keys);
181
        
182
        let limit = Math.abs(this.maxRunningBuilds); // abs to force copy
12✔
183

184
        for (let i = 0, len = action_keys.length; i < len; i++) {
12✔
185

186
            if (limit < 1) continue;
23!
187
            let action_key = action_keys[i];
23✔
188
            let uaid = action_key.replace("queue:", "");
23✔
189

190
            let contents = await this.redis.v4.get(action_key);
23✔
191

192
            let action = new Action(uaid, this.redis).withString(contents);
23✔
193

194
            // Prune completed actions instead of running...
195
            this.pruneIfCompleted(action);
23✔
196

197
            // Count-in running actions
198
            if (action && action.isRunning()) {
23!
199
                limit--;
×
200
            }
201

202
            // Return next waiting action
203
            if (action.isWaiting() && (limit > 0)) {
23✔
204
                let a = action.action;
12✔
205
                console.log(`ℹ️ [info] Scheduling waiting build action ${a.build_id} with remaining concurrent job limit ${limit}`);
12✔
206
                limit--;
12✔
207
                return Promise.resolve(action);
12✔
208
            }
209
        }
210
        return Promise.resolve(false); // added to complete (in tests)
×
211
    }
212

213
    actionWorkerValid(action, worker) {
214
        let valid = true;
11✔
215
        if ((typeof (action) === "undefined") || (action === null) || (action === false)) {
11!
216
            console.log("☣️ [error] actionWorkerValid called with empty action, skipping...");
×
217
            valid = false;
×
218
        }
219
        if ((typeof (worker) === "undefined") || (worker === null) || (worker === false)) {
11!
220
            if (action) {
11!
221
                try {
11✔
222
                    console.log("☣️ [error] actionWorkerValid called with empty worker, skipping, will set error for action", action.action);
11✔
223
                    action.setError();
11✔
224
                } catch (e) {
225
                    console.log("☣️ [error] actionWorkerValid exception", e);
×
226
                }
227
            }
228
            valid = false;
11✔
229
        }
230
        return valid;
11✔
231
    }
232

233
    runNext(action, worker) {
234

235
        if (!this.actionWorkerValid(action, worker)) {
11!
236
            console.log(`☣️ [error] runNext failed, skipping ${action}...`);
11✔
237
            return;
11✔
238
        }
239

240
        // Scheduler runs actions one after each (FIFO), about once a minute.
241
        console.log("ℹ️ [info] runNext:", JSON.stringify(action));
×
242

243
        if (typeof (action.setStarted) === "function") {
×
244
            action.setStarted();
×
245
        } else {
246
            console.log("☣️ [error] Edge case: action has no functions!" + JSON.stringify(action));
×
247
            return;
×
248
        }
249

250
        let source_id = action.action.source;
×
251

252
        const build = {
×
253
            udid: action.action.udid,
254
            source_id: source_id,
255
            dryrun: false
256
        };
257

258
        if ((typeof(worker) !== "undefined") && (worker !== null) && (worker !== false)) worker.running = true;
×
259

260
        this.builder.build(
×
261
            action.action.owner_id,
262
            build,
263
            [], // notifiers
264
            (success, message) => {
265
                console.log("ℹ️ [info] 1 - Build exit state", success, message);
×
266
                console.log("ℹ️ [info] 2 - Deleting action after build request completed, set worker to not running...");
×
267
                action.delete();
×
268
                if ((typeof(worker) !== "undefined") && (worker !== null) && (worker !== false)) {
×
269
                    worker.running = false;
×
270
                }
271
            }, // callback
272
            worker // worker
273
        );
274
    }
275

276
    // should be called using scheduler; can be async! and findNext too!
277
    async loop() {
278
        // check events in queue and schedule one if eligible
279
        let next = await this.findNext();
10✔
280
        if (next) {
10!
281
            let workerAvailable = this.nextAvailableWorker();
10✔
282
            if (workerAvailable !== null) {
10!
283
                this.runNext(next, workerAvailable);
10✔
284
            }
285
        }
286
    }
287

288
    //
289

290
    //
291
    // WebSocket Build Server Extension
292
    //
293

294
    // Interface to Builder
295

296
    nextAvailableWorker() {
297
        for (let index in this.workers) {
13✔
298
            if (
×
299
                (this.workers[index].connected === true) &&
×
300
                (this.workers[index].running === false)
301
            ) {
302
                console.log("ℹ️ [info] Queue found available worker", index);
×
303
                return this.workers[index];
×
304
            }
305
        }
306
        return false;
13✔
307
    }
308

309
    // Internals
310

311
    parseSocketMessage(socket, msg) {
312
        // STATUS
313
        if (typeof (msg.status) === "undefined") return;
1!
314

315
        // Assign client id to new workers...
316
        if (0 === msg.status.indexOf("Hello")) {
1!
317
            socket.emit('client id', socket.id); // important, assigns socket ID to worker
1✔
318
            let previous_id = msg.id || null;
1✔
319
            let running = msg.running || false;
1✔
320
            // tracking
321
            this.workers[socket.id] = {
1✔
322
                previous_id: previous_id,
323
                running: running,
324
                connected: true,
325
                socket: socket
326
            };
327
        }
328
    }
329

330
    setupSocket(socket) {
331

332
        let that = this;
18✔
333

334
        socket.on('connect', () => {
18✔
335
            console.log(`ℹ️ [info] Worker connected: ${socket.id}`);
×
336
            that.workers[socket.id].connected = true;
×
337
            //
338
            if (typeof (this.thx) !== "undefined") this.thx.emit("workerReady");
×
339
        });
340

341
        socket.on('disconnect', () => {
18✔
342
            console.log(`ℹ️ [info] [queue] Unregistering disconnected worker ${socket.id}.`);
17✔
343
            if (typeof (socket.id) !== "undefined") {
17!
344
                delete that.workers[socket.id];
17✔
345
            } else {
346
                console.log("Socket ID undefined on disconnect.");
×
347
            }
348
        });
349

350
        // either by directly modifying the `auth` attribute
351
        socket.on("connect_error", () => {
18✔
352
            if ((typeof (process.env.WORKER_SECRET) !== "undefined")) {
×
353
                socket.auth.token = process.env.WORKER_SECRET;
×
354
                console.log("connect_error attempt to resolve using WORKER_SECRET");
×
355
                socket.connect();
×
356
            }
357
            console.log("onerror workers", that.workers);
×
358
        });
359

360
        // Business Logic events
361

362
        socket.on('register', (msg) => {
18✔
363
            if (typeof (that.workers[socket.id]) === "undefined") {
1!
364
                that.workers[socket.id] = {
1✔
365
                    connected: true,
366
                    socket: socket,
367
                    running: false
368
                };
369
            }
370
            this.parseSocketMessage(socket, msg);
1✔
371

372
            console.log("ℹ️ [info] Currently registered workers", Object.keys(that.workers));
1✔
373
        });
374

375
        socket.on('poll', async (msg) => {
18✔
376
            console.log("ℹ️ [info] Worker is polling...", msg);
×
377
            let next = await this.findNext();
×
378
            if (next) this.runNext(next, socket.id);
×
379
        });
380

381
        socket.on('job-status', (job_status) => {
18✔
382
            this.notifier.process(job_status, (result) => {
×
383
                console.log("ℹ️ [info] [queue] Notifier's Processing result:", result);
×
384
            });
385
            if ((typeof (this.workers[socket.id]) !== "undefined") && (this.workers[socket.id] !== null)) {
×
386
                this.workers[socket.id].running = false;
×
387
            }
388
        });
389
    }
390

391
    setupIo(dio) {
392
        dio.on('connection', (socket) => {
18✔
393
            this.setupSocket(socket);
18✔
394
        });
395
    }
396

397
};
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc