• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

suculent / thinx-device-api / #252646983

21 Mar 2022 12:58PM UTC coverage: 12.085% (+0.6%) from 11.517%
#252646983

push

suculent
submodule sync

111 of 2797 branches covered (3.97%)

Branch coverage included in aggregate %.

973 of 6173 relevant lines covered (15.76%)

0.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

24.39
/lib/thinx/queue.js
1
// Build Queue Manager
2

3
var Globals = require("./globals.js");
1✔
4
var redis = require("redis");
1✔
5
var Notifier = require("./notifier");
1✔
6
var Action = require("./queue_action");
1✔
7

8
var schedule = require('node-schedule');
1✔
9

10
var io = require('socket.io-client');
1✔
11

12
// WebSocket Build Server Extension
13
let app = require('express')();
1✔
14
module.exports = class Queue {
1✔
15

16
    checkSocketIoConnect(url, timeout) {
17
        return new Promise(function(resolve, reject) {            
1✔
18
            timeout = timeout || 5000;
1✔
19
            var socket = io(url, {reconnection: false, timeout: timeout});
1✔
20
            var timer;
21
            var errAlready = false;
1✔
22

23
            // common error handler
24
            function error(data) {
25
                if (timer) {
1!
26
                    clearTimeout(timer);
1✔
27
                    timer = null;
1✔
28
                }
29
                if (!errAlready) {
1!
30
                    errAlready = true;
1✔
31
                    reject(data);
1✔
32
                    socket.disconnect();
1✔
33
                }
34
            }
35

36
            // set our own timeout in case the socket ends some other way than what we are listening for
37
            timer = setTimeout(function() {
1✔
38
                timer = null;
×
39
                error("local timeout");
×
40
            }, timeout);
41

42
            // success
43
            socket.on("connect", function() {
1✔
44
                clearTimeout(timer);
×
45
                resolve();
×
46
                socket.close();
×
47
            });
48
    
49
            // errors
50
            socket.on("connect_error", error);
1✔
51
            socket.on("connect_timeout", error);
1✔
52
            socket.on("error", error);
1✔
53
            socket.on("disconnect", error);
1✔
54
    
55
        });
56
    }
57

58
    constructor(builder, di_app) {
59
        
60
        this.port = process.env.PORT || 3000;
1✔
61

62
        if (process.env.environment == "test") {
1!
63
            this.port += 1;
×
64
        }
65

66
        if (typeof(Globals.app_config().builder) !== "undefined") {
1!
67
            this.maxRunningBuilds = Globals.app_config().builder.concurrency;
1✔
68
        } else {
69
            console.log("⚠️ [warning] builder.concurrency integer not set in config.json, falling back to default 1 (disabled)");
×
70
            this.maxRunningBuilds = 1;
×
71
        }
72

73
        // dependency injection for the app server
74
        if (typeof(di_app) !== "undefined") app = di_app;
1!
75
        
76
        this.client = redis.createClient(Globals.redis_options());
1✔
77
        this.notifier = new Notifier();
1✔
78

79
        if (builder !== null) {
1!
80

81
            this.builder = builder;
1✔
82

83
            // TODO (21): Make this a HTTPS server, use existing certificate to secure the socket.
84
            // https://github.com/suculent/thinx-device-api/issues/314
85
            this.http = require('http').Server(app); 
1✔
86
            this.initloop(this.http, this.port, true);
1✔
87
            this.io = require('socket.io')(this.http,
1✔
88
                {
89
                    //transports: ['websocket'],
90
                    //allowUpgrades: false,
91
                    pingTimeout: 300000
92
                }
93
            );
94
            this.setupIo(this.io);
1✔
95
            this.builder.io = this.io;
1✔
96
        } else {
97
            console.log("☣️ [error] No builder defined for queue, not setting up IO!");
×
98
        }
99
        
100
        this.workers = [];
1✔
101

102
        console.log("✅ [info] Loaded module: Queue");
1✔
103
    }
104

105
    initloop(server, port, firstRun) {
106

107
        let sock_url = "http://localhost:" + port;
1✔
108

109
        console.log(`ℹ️ [info] [queue] checking socket port availability at ${sock_url}`);
1✔
110

111
        this.checkSocketIoConnect(sock_url).then(function () {
1✔
112
            console.log(`ℹ️ [info] BuildServer Manager already listening on port ${port}`);
×
113
        }, function (reason) {
114
            if (reason.toString().indexOf("Error: xhr poll error") === -1) {
1!
115
                console.log(`[error] [queue] initloop unexpected reason: ${reason}`);
×
116
                return;
×
117
            }
118
            try {
1✔
119
                server.listen(port, function () {
1✔
120
                    console.log(`ℹ️ [info] BuildServer Manager listening on port ${port}`);
1✔
121
                });
122
            } catch (e) {
123
                if (firstRun) {
×
124
                    port += 1;
×
125
                    console.log(`[info] BuildServer Manager restarting on port ${port}`);
×
126
                    this.initloop(server, port, false);
×
127
                }
128
            }
129
        });
130
    }
131

132

133
    getWorkers() {
134
        return this.workers || [];
×
135
    }
136

137
    cron() {
138
        var cron_rule = "*/5 * * * *";
×
139
        this.schedule = schedule.scheduleJob(cron_rule, () => {
×
140
            this.loop();
×
141
        });
142
    }
143

144
    add(udid, source, owner_id) {
145
        let action = new Action(udid);
×
146
        action.queueWithSource(source, owner_id);
×
147
    }
148

149
    pruneIfCompleted(action) {
150
        if ((action.status == "success") || (action.status == "error")) {
×
151
            console.log("ℹ️ [info] Pruning completed build action...");
×
152
            action.delete();
×
153
            return true;
×
154
        }
155
        return false;
×
156
    }
157

158
    findNext(callback) {
159
        this.client.keys("queue:*", (error, action_keys) => {
×
160
            if (error) {
×
161
                callback(false, error);
×
162
                return;
×
163
            }
164
            let limit = Math.abs(this.maxRunningBuilds); // abs to force copy
×
165
            for (var i = 0, len = action_keys.length; i < len; i++) {
×
166
                if (limit < 1) continue;
×
167
                let action_key = action_keys[i];
×
168
                let uaid = action_key.replace("queue:", "");
×
169
                this.client.get(action_key, (job_error, contents) => {
×
170
                    if (job_error) {
×
171
                        console.log(`☣️ [error] findNext job error ${job_error}`);
×
172
                    }
173
                    let action = new Action(uaid).withString(contents);
×
174
                    
175
                    // Count-in running actions
176
                    if (action && action.isRunning()) {
×
177
                        limit--;
×
178
                    }
179
                    // Return next running action
180
                    if (action.isWaiting() && (limit > 0)) {
×
181
                        let a = action.action;
×
182
                        console.log(`ℹ️ [info] Scheduling build action ${a.build_id} with remaining concurrent job limit ${limit}`);
×
183
                        limit--;
×
184
                        callback(action);
×
185
                    }
186
                    // Prune completed actions
187
                    this.pruneIfCompleted(action);
×
188
                });
189
            }
190
            callback(null); // added to complete (in tests)
×
191
        });
192
    }
193

194
    runNext(action, worker) {
195

196
        if ((typeof(worker) === "undefined") || (worker === null) || (worker === false)) {
×
197
            console.log("⚠️ [warning] runNext called with empty worker, skipping...");
×
198
            return true;
×
199
        }
200

201
        if ((typeof(action) === "undefined") || (action === null) || (action === false)) {
×
202
            console.log("⚠️ [warning] runNext called with empty action, skipping...");
×
203
            return false;
×
204
        }
205

206
        // Scheduler runs actions one after each (FIFO), about once a minute.
207
        console.log("runNext:", action.toString());
×
208

209
        if (typeof(action.setStarted) === "function") {
×
210
            action.setStarted();
×
211
        } else {
212
            console.log("☣️ [error] Edge case: action has no functions:" + JSON.stringify(action));
×
213
            return;
×
214
        }
215

216
        var build = {
×
217
            udid: action.action.udid,
218
            source_id: action.action.source,
219
            dryrun: false
220
        };
221

222
        worker.running = true;
×
223

224
        this.builder.build(
×
225
            action.action.owner_id,
226
            build,
227
            [], // notifiers
228
            (success, message) => {
229
                console.log("ℹ️ [info] 1 - Build exit state", success, message);
×
230
                console.log("ℹ️ [info] 2 - Deleting action after build request completed, set worker to not running...");
×
231
                action.delete();
×
232
                if (worker !== null) {
×
233
                    worker.running = false;
×
234
                }
235
            }, // callback
236
            worker // worker
237
        );
238
    }
239

240
    // should be called using scheduler
241
    loop() {
242
        // check events in queue and schedule one if eligible
243
        this.findNext((next) => {
×
244
            if (next !== null) {
×
245
                let workerAvailable = this.nextAvailableWorker();
×
246
                if (workerAvailable !== null) {
×
247
                    this.runNext(next, workerAvailable);
×
248
                }
249
            }
250
        });
251
    }
252

253
    //
254

255
    //
256
        // WebSocket Build Server Extension
257
        //
258

259
        // Interface to Builder
260

261
        nextAvailableWorker() {
262
                for (let index in this.workers) {
×
263
            if (
×
264
                (this.workers[index].connected === true) &&
×
265
                (this.workers[index].running === false) 
266
            ) {
267
                console.log("ℹ️ [info] Queue found available worker", index);
×
268
                                return this.workers[index];
×
269
                        }
270
                }
271
                return false;
×
272
        }
273

274
        // Internals
275

276
        parseSocketMessage(socket, msg) {
277
        // STATUS
278
        if (typeof(msg.status) === "undefined") return;
×
279
        
280
        // Assign client id to new workers...
281
        if (0 === msg.status.indexOf("Hello")) {
×
282
            socket.emit('client id', socket.id); // important, assigns socket ID to worker
×
283
            let previous_id = msg.id || null;
×
284
            let running = msg.running || false;
×
285
            // tracking
286
            this.workers[socket.id] = {
×
287
                previous_id: previous_id,
288
                running: running,
289
                connected: true,
290
                socket: socket
291
            };
292
        }
293
    }
294

295
    setupSocket(socket) {
296

297
        let that = this;
×
298

299
        socket.on('connect', () => {
×
300
            console.log(`ℹ️ [info] Worker connected: ${socket.id}`);
×
301
            that.workers[socket.id].connected = true;
×
302
        });
303

304
        socket.on('disconnect', () => {
×
305
            console.log(`ℹ️ [info] Unregistering disconnected worker ${socket.id}.`);
×
306
            if (typeof(socket.id) !== "undefined") {
×
307
                delete that.workers[socket.id];
×
308
            } else {
309
                console.log("Socket ID undefined on disconnect.");
×
310
            }
311
        });
312

313
        // either by directly modifying the `auth` attribute
314
        socket.on("connect_error", () => {
×
315
            if ((typeof(process.env.WORKER_SECRET) !== "undefined")) {
×
316
                socket.auth.token = process.env.WORKER_SECRET;
×
317
                console.log("connect_error attempt to resolve using WORKER_SECRET");
×
318
                socket.connect();
×
319
            }
320
            console.log("onerror workers", that.workers);
×
321
        });
322

323
        // Business Logic events
324

325
        socket.on('register', (msg) => {
×
326
            if (typeof(that.workers[socket.id]) === "undefined") {
×
327
                that.workers[socket.id] = {
×
328
                    connected: true,
329
                    socket: socket,
330
                    running: false
331
                };
332
            }
333
            this.parseSocketMessage(socket, msg);
×
334
            
335
            console.log("ℹ️ [info] Currently registered workers", Object.keys(that.workers));
×
336
        });
337

338
        socket.on('poll', (msg) => {
×
339
            console.log("ℹ️ [info] Worker is polling...", msg);
×
340
            this.findNext((next) => {
×
341
                if (next !== null) {
×
342
                    this.runNext(next, socket.id);
×
343
                }
344
            });
345
        });
346

347
        socket.on('job-status', (job_status) => {
×
348
            this.notifier.process(job_status, (result) => {
×
349
                console.log("ℹ️ [info] Notifier's Processing result:", result);
×
350
            });
351
            if ((typeof(this.workers[socket.id]) !== "undefined") && (this.workers[socket.id] !== null)) {
×
352
                this.workers[socket.id].running = false;
×
353
            }
354
        });
355
    }
356

357
    setupIo(dio) {
358
        dio.on('connection', (socket) => {
1✔
359
            this.setupSocket(socket);
×
360
        });
361
    }
362

363
};
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc