• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

screwdriver-cd / queue-service / #244

05 Dec 2025 09:27AM UTC coverage: 88.149% (-0.1%) from 88.257%
#244

Pull #81

screwdriver

yk634
fix: Ensure build status is updated to QUEUED before enqueueing
Pull Request #81: fix: Ensure build status is updated to QUEUED before enqueueing

360 of 431 branches covered (83.53%)

Branch coverage included in aggregate %.

14 of 15 new or added lines in 1 file covered. (93.33%)

1 existing line in 1 file now uncovered.

1083 of 1206 relevant lines covered (89.8%)

27.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.03
/plugins/queue/scheduler.js
1
'use strict';
2

3
const Config = require('config');
54✔
4
const logger = require('screwdriver-logger');
54✔
5
const configSchema = require('screwdriver-data-schema').config;
54✔
6
const TOKEN_CONFIG_SCHEMA = configSchema.tokenConfig;
54✔
7
const { merge, reach } = require('@hapi/hoek');
54✔
8
const { MultiWorker, Scheduler, Plugins } = require('node-resque');
54✔
9
const cron = require('./utils/cron');
54✔
10
const helper = require('../helper');
54✔
11
const { timeOutOfWindows } = require('./utils/freezeWindows');
54✔
12
const { queueNamespace } = require('../../config/redis');
54✔
13
const ecosystem = Config.get('ecosystem');
54✔
14
const periodicBuildTableEnabled = helper.convertToBool(Config.get('queue').periodicBuildTableEnabled);
54✔
15
const DEFAULT_BUILD_TIMEOUT = 90;
54✔
16
const RETRY_LIMIT = 3;
54✔
17
const RETRY_DELAY = 5;
54✔
18
const EXPIRE_TIME = 1800; // 30 mins
54✔
19
const TEMPORAL_TOKEN_TIMEOUT = 12 * 60; // 12 hours in minutes
54✔
20
const TEMPORAL_UNZIP_TOKEN_TIMEOUT = 2 * 60; // 2 hours in minutes
54✔
21
const BLOCKED_BY_SAME_JOB_WAIT_TIME = 5;
54✔
22

23
/**
24
 * Checks whether the job associated with the build is virtual or not
25
 * @method isVirtualJob
26
 * @param {Object} annotations           Job Annotations
27
 * @return {Boolean}
28
 */
29
function isVirtualJob(annotations) {
30
    return annotations && annotations['screwdriver.cd/virtualJob'];
8✔
31
}
32

33
/**
34
 * Posts a new build event to the API
35
 * @method postBuildEvent
36
 * @param {Object} eventConfig           Configuration
37
 * @param {Number} [eventConfig.eventId] Optional Parent event ID (optional)
38
 * @param {Object} eventConfig.pipeline  Pipeline of the job
39
 * @param {Object} eventConfig.job       Job object to create periodic builds for
40
 * @param {String} eventConfig.apiUri    Base URL of the Screwdriver API
41
 * @return {Promise}
42
 */
43
async function postBuildEvent(executor, eventConfig) {
44
    const { pipeline, job, apiUri, eventId, causeMessage, buildId } = eventConfig;
3✔
45
    const pipelineId = pipeline.id;
3✔
46
    const jobId = job.id;
3✔
47

48
    try {
3✔
49
        const token = executor.tokenGen({
3✔
50
            pipelineId,
51
            service: 'queue',
52
            jobId,
53
            scmContext: pipeline.scmContext,
54
            scope: ['user']
55
        });
56

57
        const admin = await helper.getPipelineAdmin(token, apiUri, pipelineId, helper.requestRetryStrategy);
3✔
58

59
        logger.info(
2✔
60
            `POST event for pipeline ${pipelineId}:${job.name}:${job.id}:${buildId} using user ${admin.username}`
61
        );
62

63
        const jwt = executor.userTokenGen(admin.username, {}, admin.scmContext);
2✔
64

65
        const buildEvent = {
2✔
66
            pipelineId,
67
            startFrom: job.name,
68
            creator: {
69
                name: 'Screwdriver scheduler',
70
                username: 'sd:scheduler'
71
            },
72
            causeMessage: causeMessage || 'Automatically started by scheduler'
2!
73
        };
74

75
        if (eventId) {
2!
76
            buildEvent.parentEventId = eventId;
×
77
        }
78
        if (buildId) {
2!
79
            buildEvent.buildId = buildId;
×
80
        }
81

82
        await helper.createBuildEvent(apiUri, jwt, buildEvent, helper.requestRetryStrategyPostEvent);
2✔
83
    } catch (err) {
84
        if (err.statusCode === 404) {
1!
85
            logger.error(
1✔
86
                `POST event for pipeline failed as no admin found: ${pipelineId}:${job.name}:${job.id}:${buildId}`
87
            );
88

89
            const pipelineToken = executor.tokenGen({
1✔
90
                pipelineId,
91
                service: 'queue',
92
                scmContext: pipeline.scmContext,
93
                scope: ['pipeline']
94
            });
95

96
            const status = 'FAILURE';
1✔
97
            const message = `Pipeline ${pipelineId} does not have admin, unable to start job ${job.name}.`;
1✔
98

99
            await helper.notifyJob(
1✔
100
                {
101
                    token: pipelineToken,
102
                    apiUri,
103
                    jobId,
104
                    payload: { status, message }
105
                },
106
                helper.requestRetryStrategyPostEvent
107
            );
108
        }
109

110
        logger.error(`Error in post build event function ${buildId} ${err}`);
1✔
111
        throw err;
1✔
112
    }
113
}
114

115
/**
116
 * Stops a previously scheduled periodic build in an executor
117
 * @async  _stopPeriodic
118
 * @param  {Object}  config        Configuration
119
 * @param  {Integer} config.jobId  ID of the job with periodic builds
120
 * @return {Promise}
121
 */
122
async function stopPeriodic(executor, config) {
123
    await executor.connect();
5✔
124

125
    await executor.queueBreaker.runCommand('delDelayed', executor.periodicBuildQueue, 'startDelayed', [
5✔
126
        { jobId: config.jobId }
127
    ]);
128

129
    if (periodicBuildTableEnabled) {
5✔
130
        return executor.redisBreaker.runCommand('hdel', executor.periodicBuildTable, config.jobId);
1✔
131
    }
132

133
    return Promise.resolve();
4✔
134
}
135

136
/**
137
 * Stops a previously enqueued frozen build in an executor
138
 * @async  stopFrozen
139
 * @param  {Object}  config        Configuration
140
 * @param  {Integer} config.jobId  ID of the job with frozen builds
141
 * @return {Promise}
142
 */
143
async function stopFrozen(executor, config) {
144
    await executor.connect();
10✔
145

146
    await executor.queueBreaker.runCommand('delDelayed', executor.frozenBuildQueue, 'startFrozen', [
10✔
147
        { jobId: config.jobId }
148
    ]);
149

150
    return executor.redisBreaker.runCommand('hdel', executor.frozenBuildTable, config.jobId);
10✔
151
}
152

153
/**
154
 * Calls postBuildEvent() with job configuration
155
 * @async startFrozen
156
 * @param {Object} config       Configuration
157
 * @return {Promise}
158
 */
159
async function startFrozen(executor, config) {
160
    try {
×
161
        const newConfig = {
×
162
            job: {
163
                name: config.jobName
164
            },
165
            causeMessage: 'Started by freeze window scheduler'
166
        };
167

168
        if (config.jobState === 'DISABLED' || config.jobArchived === true) {
×
169
            logger.error(`job ${config.jobName} is disabled or archived`);
×
170

171
            return Promise.resolve();
×
172
        }
173

174
        Object.assign(newConfig, config);
×
175

176
        return await postBuildEvent(executor, newConfig);
×
177
    } catch (err) {
178
        logger.error(`frozen builds: failed to post build event for job ${config.jobId}:${config.pipeline.id} ${err}`);
×
179

180
        return Promise.resolve();
×
181
    }
182
}
183

184
/**
185
 * Starts a new periodic build in an executor
186
 * @method startPeriodic
187
 * @param {Object}   config              Configuration
188
 * @param {Object}   config.pipeline     Pipeline of the job
189
 * @param {Object}   config.job          Job object to create periodic builds for
190
 * @param {String}   config.apiUri       Base URL of the Screwdriver API
191
 * @param {Boolean}  config.isUpdate     Boolean to determine if updating existing periodic build
192
 * @param {Boolean}  config.triggerBuild Flag to post new build event
193
 * @return {Promise}
194
 */
195
async function startPeriodic(executor, config) {
196
    const { pipeline, job, isUpdate, triggerBuild } = config;
13✔
197
    // eslint-disable-next-line max-len
198
    const buildCron = reach(job, 'permutations>0>annotations>screwdriver.cd/buildPeriodically', { separator: '>' });
13✔
199

200
    if (isUpdate) {
13✔
201
        await stopPeriodic(executor, { jobId: job.id });
5✔
202
    }
203

204
    if (triggerBuild) {
13✔
205
        config.causeMessage = 'Started by periodic build scheduler';
4✔
206
    }
207

208
    if (buildCron && job.state === 'ENABLED' && !job.archived) {
13✔
209
        await executor.connect();
10✔
210

211
        const next = cron.next(cron.transform(buildCron, job.id));
9✔
212

213
        if (periodicBuildTableEnabled) {
9✔
214
            try {
2✔
215
                // Store the config in redis
216
                await executor.redisBreaker.runCommand(
2✔
217
                    'hset',
218
                    executor.periodicBuildTable,
219
                    job.id,
220
                    JSON.stringify(
221
                        Object.assign(config, {
222
                            isUpdate: false,
223
                            triggerBuild: false
224
                        })
225
                    )
226
                );
227
            } catch (err) {
228
                logger.error(`failed to store the config in Redis for job ${job.id}: ${err}`);
×
229
            }
230
        }
231

232
        // Note: arguments to enqueueAt are [timestamp, queue name, job name, array of args]
233
        let shouldRetry = false;
9✔
234
        const nextDate = new Date(next);
9✔
235

236
        logger.info(`Enqueued periodic job ${job.id} to be executed at ${nextDate}`);
9✔
237

238
        try {
9✔
239
            await executor.queue.enqueueAt(next, executor.periodicBuildQueue, 'startDelayed', [{ jobId: job.id }]);
9✔
240
        } catch (err) {
241
            // Error thrown by node-resque if there is duplicate: https://github.com/taskrabbit/node-resque/blob/master/lib/queue.js#L65
242
            // eslint-disable-next-line max-len
243
            if (err && err.message !== 'Job already enqueued at this time with same arguments') {
1!
244
                shouldRetry = true;
×
245
                logger.error(`failed to enqueue for job ${job.id}: ${err}`);
×
246
            } else {
247
                logger.warn(`duplicate build: failed to enqueue for job ${job.id}: ${err}`);
1✔
248
            }
249
        }
250

251
        if (shouldRetry) {
9!
252
            try {
×
253
                await executor.queue.enqueueAt(next, executor.periodicBuildQueue, 'startDelayed', [{ jobId: job.id }]);
×
254
            } catch (err) {
255
                logger.error(`failed to add to delayed queue for job ${job.id}: ${err}`);
×
256
            }
257
        } else {
258
            logger.info(`successfully added to queue for job ${job.id}`);
9✔
259
        }
260
    }
261
    logger.info(`added to delayed queue for job ${job.id}`);
12✔
262

263
    if (triggerBuild && !job.archived) {
12✔
264
        try {
3✔
265
            await postBuildEvent(executor, config);
3✔
266
        } catch (err) {
267
            logger.error(
1✔
268
                `periodic builds: failed to post build event for job ${job.id} in pipeline ${pipeline.id}: ${err}`
269
            );
270
        }
271
    }
272

273
    return Promise.resolve();
12✔
274
}
275

276
/**
277
 * Starts a new build in an executor
278
 * @async  start
279
 * @param  {Object} config               Configuration
280
 * @param  {Object} [config.annotations] Optional key/value object
281
 * @param  {Number} [config.eventId]     Optional eventID that this build belongs to
282
 * @param  {String} config.build         Build object
283
 * @param  {Array}  config.blockedBy     Array of job IDs that this job is blocked by. Always blockedby itself
284
 * @param  {String} config.causeMessage  Reason the event is run
285
 * @param  {Array}  config.freezeWindows Array of cron expressions that this job cannot run during
286
 * @param  {String} config.apiUri        Screwdriver's API
287
 * @param  {String} config.jobId         JobID that this build belongs to
288
 * @param  {String} config.jobName       Name of job that this build belongs to
289
 * @param  {String} config.jobState      ENABLED/DISABLED
290
 * @param  {String} config.jobArchived   Boolean value of whether job is archived
291
 * @param  {String} config.buildId       Unique ID for a build
292
 * @param  {Object} config.pipeline      Pipeline of the job
293
 * @param  {Fn}     config.tokenGen      Function to generate JWT from username, scope and scmContext
294
 * @param  {String} config.container     Container for the build to run in
295
 * @param  {String} config.token         JWT to act on behalf of the build
296
 * @return {Promise}
297
 */
298
async function start(executor, config) {
299
    await executor.connect();
11✔
300

301
    const {
302
        build,
303
        buildId,
304
        causeMessage,
305
        jobId,
306
        jobState,
307
        jobArchived,
308
        blockedBy,
309
        freezeWindows,
310
        apiUri,
311
        pipeline,
312
        isPR,
313
        prParentJobId,
314
        annotations
315
    } = config;
10✔
316
    const forceStart = /\[(force start)\]/.test(causeMessage);
10✔
317

318
    delete config.build;
10✔
319
    delete config.causeMessage;
10✔
320

321
    await stopFrozen(executor, {
10✔
322
        jobId
323
    });
324

325
    // Skip if job is disabled or archived
326
    if (jobState === 'DISABLED' || jobArchived === true) {
10!
327
        return Promise.resolve();
×
328
    }
329

330
    const currentTime = new Date();
10✔
331
    const origTime = new Date(currentTime.getTime());
10✔
332

333
    timeOutOfWindows(freezeWindows, currentTime);
10✔
334

335
    let enq;
336
    const buildStats = build && build.stats;
10✔
337

338
    // for backward compatibility
339
    if (buildStats) {
10✔
340
        // need to reassign so the field can be dirty
341
        build.stats = merge(build.stats, {
8✔
342
            queueEnterTime: new Date().toISOString()
343
        });
344
    }
345

346
    const tokenConfig = {
10✔
347
        username: buildId,
348
        buildId,
349
        jobId,
350
        eventId: build && build.eventId,
18✔
351
        isPR
352
    };
353

354
    if (pipeline) {
10!
355
        Object.assign(tokenConfig, {
10✔
356
            pipelineId: pipeline.id,
357
            scmContext: pipeline.scmContext,
358
            configPipelineId: pipeline.configPipelineId
359
        });
360
    }
361
    if (prParentJobId) {
10!
362
        tokenConfig.prParentJobId = prParentJobId;
10✔
363
    }
364

365
    const buildToken = executor.tokenGen({ ...tokenConfig, scope: ['build'] });
10✔
366

367
    // Check freeze window
368
    if (currentTime.getTime() > origTime.getTime() && !forceStart) {
10✔
369
        const payload = {
1✔
370
            status: 'FROZEN',
371
            statusMessage: `Blocked by freeze window, re-enqueued to ${currentTime}`
372
        };
373

374
        if (buildStats) {
1!
375
            payload.stats = build.stats;
×
376
        }
377

378
        await helper
1✔
379
            .updateBuild(
380
                {
381
                    buildId,
382
                    token: buildToken,
383
                    apiUri,
384
                    payload
385
                },
386
                helper.requestRetryStrategy
387
            )
388
            .catch(err => {
389
                logger.error(`frozenBuilds: failed to update build status for build ${buildId}: ${err}`);
×
390

391
                throw err;
×
392
            });
393

394
        // Remove old job from queue to collapse builds
395
        await executor.queueBreaker.runCommand('delDelayed', executor.frozenBuildQueue, 'startFrozen', [
1✔
396
            {
397
                jobId
398
            }
399
        ]);
400

401
        await executor.redisBreaker.runCommand('hset', executor.frozenBuildTable, jobId, JSON.stringify(config));
1✔
402

403
        // Add new job back to queue
404
        enq = await executor.queueBreaker.runCommand(
1✔
405
            'enqueueAt',
406
            currentTime.getTime(),
407
            executor.frozenBuildQueue,
408
            'startFrozen',
409
            [
410
                {
411
                    jobId
412
                }
413
            ]
414
        );
415
    } else {
416
        // validate schema for temporal token
417
        const value = TOKEN_CONFIG_SCHEMA.validate(tokenConfig);
9✔
418

419
        if (value.error) {
9✔
420
            logger.error('Failed to validate token config schema %s %s', buildId, jobId);
1✔
421

422
            throw value.error;
1✔
423
        }
424

425
        if (isVirtualJob(annotations)) {
8✔
426
            // Bypass execution of the build if the job is virtual
427
            const buildUpdatePayload = {
1✔
428
                status: 'SUCCESS',
429
                statusMessage: 'Skipped execution of the virtual job',
430
                statusMessageType: 'INFO'
431
            };
432

433
            await helper
1✔
434
                .updateBuild(
435
                    {
436
                        buildId,
437
                        token: buildToken,
438
                        apiUri,
439
                        payload: buildUpdatePayload
440
                    },
441
                    helper.requestRetryStrategy
442
                )
443
                .catch(err => {
NEW
444
                    logger.error(`Failed to update virtual build status for build ${buildId}: ${err}`);
×
445

UNCOV
446
                    throw err;
×
447
                });
448
        } else {
449
            if (buildStats) {
7✔
450
                await helper
6✔
451
                    .updateBuild(
452
                        {
453
                            buildId,
454
                            token: buildToken,
455
                            apiUri,
456
                            payload: { stats: build.stats, status: 'QUEUED' }
457
                        },
458
                        helper.requestRetryStrategy
459
                    )
460
                    .catch(err => {
461
                        logger.error(`Failed to update build status to QUEUED for build ${buildId}: ${err}`);
1✔
462
                        throw err;
1✔
463
                    });
464
            }
465

466
            try {
6✔
467
                const token = executor.tokenGen(
6✔
468
                    Object.assign(tokenConfig, { scope: ['temporal'] }),
469
                    TEMPORAL_TOKEN_TIMEOUT
470
                );
471

472
                // set the start time in the queue
473
                Object.assign(config, { token });
6✔
474
                // Store the config in redis
475
                await executor.redisBreaker.runCommand(
6✔
476
                    'hset',
477
                    executor.buildConfigTable,
478
                    buildId,
479
                    JSON.stringify(config)
480
                );
481

482
                const blockedBySameJob = reach(config, 'annotations>screwdriver.cd/blockedBySameJob', {
6✔
483
                    separator: '>',
484
                    default: true
485
                });
486
                const blockedBySameJobWaitTime = reach(config, 'annotations>screwdriver.cd/blockedBySameJobWaitTime', {
6✔
487
                    separator: '>',
488
                    default: BLOCKED_BY_SAME_JOB_WAIT_TIME
489
                });
490

491
                // Note: arguments to enqueue are [queue name, job name, array of args]
492
                enq = await executor.queueBreaker.runCommand('enqueue', executor.buildQueue, 'start', [
6✔
493
                    {
494
                        buildId,
495
                        jobId,
496
                        blockedBy: blockedBy.toString(),
497
                        blockedBySameJob,
498
                        blockedBySameJobWaitTime
499
                    }
500
                ]);
501
            } catch (err) {
502
                logger.error(`Redis enqueue failed for build ${buildId}: ${err}`);
1✔
503

504
                throw err;
1✔
505
            }
506
        }
507
    }
508

509
    return enq;
7✔
510
}
511

512
/**
513
 * Intializes the scheduler and multiworker
514
 * @async  init
515
 * @param {Object} executor
516
 * @return {Promise}
517
 */
518
async function init(executor) {
519
    if (executor.multiWorker) return 'Scheduler running';
50!
520

521
    const resqueConnection = { redis: executor.redis, namespace: queueNamespace };
50✔
522
    // Jobs object to register the worker with
523
    const jobs = {
50✔
524
        startDelayed: {
525
            perform: async config => {
526
                try {
3✔
527
                    const { jobId } = config;
3✔
528

529
                    logger.info(`Started processing periodic job ${jobId}`);
3✔
530

531
                    let fullConfig;
532

533
                    if (periodicBuildTableEnabled) {
3!
534
                        const periodicBuildConfig = await executor.redisBreaker.runCommand(
×
535
                            'hget',
536
                            executor.periodicBuildTable,
537
                            jobId
538
                        );
539

540
                        fullConfig = Object.assign(JSON.parse(periodicBuildConfig), {
×
541
                            triggerBuild: true
542
                        });
543
                    } else {
544
                        const apiUri = ecosystem.api;
3✔
545

546
                        const buildToken = executor.tokenGen({
3✔
547
                            jobId,
548
                            service: 'queue',
549
                            scope: ['build']
550
                        });
551

552
                        const job = await helper.getJobConfig({
3✔
553
                            jobId,
554
                            token: buildToken,
555
                            apiUri
556
                        });
557

558
                        const pipelineToken = executor.tokenGen({
2✔
559
                            pipelineId: job.pipelineId,
560
                            service: 'queue',
561
                            scope: ['pipeline']
562
                        });
563

564
                        const pipeline = await helper.getPipelineConfig({
2✔
565
                            pipelineId: job.pipelineId,
566
                            token: pipelineToken,
567
                            apiUri
568
                        });
569

570
                        fullConfig = {
1✔
571
                            pipeline,
572
                            job,
573
                            apiUri,
574
                            isUpdate: false,
575
                            triggerBuild: true
576
                        };
577
                    }
578

579
                    await startPeriodic(executor, fullConfig);
1✔
580
                } catch (err) {
581
                    logger.error(`err in startDelayed job: ${err}`);
2✔
582
                    throw err;
2✔
583
                }
584
            },
585
            plugins: [Plugins.Retry, Plugins.JobLock, Plugins.DelayQueueLock, Plugins.QueueLock],
586
            pluginOptions: {
587
                JobLock: {
588
                    reEnqueue: false
589
                },
590
                Retry: {
591
                    retryLimit: RETRY_LIMIT,
592
                    retryDelay: RETRY_DELAY
593
                }
594
            }
595
        },
596
        startFrozen: {
597
            perform: async jobConfig => {
598
                try {
×
599
                    logger.info(`Started processing frozen job ${jobConfig.jobId}`);
×
600

601
                    const fullConfig = await executor.redisBreaker.runCommand(
×
602
                        'hget',
603
                        executor.frozenBuildTable,
604
                        jobConfig.jobId
605
                    );
606

607
                    return await startFrozen(executor, JSON.parse(fullConfig));
×
608
                } catch (err) {
609
                    logger.error(`err in startFrozen job: ${err}`);
×
610
                    throw err;
×
611
                }
612
            },
613
            plugins: [Plugins.Retry],
614
            pluginOptions: {
615
                Retry: {
616
                    retryLimit: RETRY_LIMIT,
617
                    retryDelay: RETRY_DELAY
618
                }
619
            }
620
        }
621
    };
622

623
    executor.multiWorker = new MultiWorker(
50✔
624
        {
625
            connection: resqueConnection,
626
            queues: [executor.periodicBuildQueue, executor.frozenBuildQueue],
627
            minTaskProcessors: 1,
628
            maxTaskProcessors: 10,
629
            checkTimeout: 1000,
630
            maxEventLoopDelay: 10,
631
            toDisconnectProcessors: true
632
        },
633
        jobs
634
    );
635

636
    executor.scheduler = new Scheduler({ connection: resqueConnection });
50✔
637

638
    executor.multiWorker.on('start', workerId => logger.info(`worker[${workerId}] started`));
50✔
639
    executor.multiWorker.on('end', workerId => logger.info(`worker[${workerId}] ended`));
50✔
640
    executor.multiWorker.on('cleaning_worker', (workerId, worker, pid) =>
50✔
641
        logger.info(`cleaning old worker ${worker} pid ${pid}`)
×
642
    );
643
    executor.multiWorker.on('job', (workerId, queue, job) =>
50✔
644
        logger.info(`worker[${workerId}] working job ${queue} ${JSON.stringify(job)}`)
×
645
    );
646
    executor.multiWorker.on('reEnqueue', (workerId, queue, job, plugin) =>
50✔
647
        logger.info(`worker[${workerId}] reEnqueue job (${plugin}) ${queue} ${JSON.stringify(job)}`)
×
648
    );
649
    executor.multiWorker.on('success', (workerId, queue, job, result, duration) =>
50✔
650
        logger.info(`worker[${workerId}] job success ${queue} ${JSON.stringify(job)} >> ${result} (${duration}ms)`)
×
651
    );
652
    executor.multiWorker.on('failure', (workerId, queue, job, failure, duration) =>
50✔
653
        logger.info(`worker[${workerId}] job failure ${queue} ${JSON.stringify(job)} >> ${failure} (${duration}ms)`)
×
654
    );
655
    executor.multiWorker.on('error', (workerId, queue, job, error) =>
50✔
656
        logger.error(`worker[${workerId}] error ${queue} ${JSON.stringify(job)} >> ${error}`)
×
657
    );
658

659
    executor.scheduler.on('start', () => logger.info('scheduler started'));
50✔
660
    executor.scheduler.on('end', () => logger.info('scheduler ended'));
50✔
661
    executor.scheduler.on('leader', () => logger.info(`scheduler became leader`));
50✔
662
    executor.scheduler.on('error', error => logger.info(`scheduler error >> ${error}`));
50✔
663
    executor.scheduler.on('workingTimestamp', timestamp => logger.info(`scheduler working timestamp ${timestamp}`));
50✔
664
    executor.scheduler.on('cleanStuckWorker', (workerName, errorPayload, delta) =>
50✔
665
        logger.info(`scheduler failing ${workerName} (stuck for ${delta}s) and failing job ${errorPayload}`)
1✔
666
    );
667
    executor.scheduler.on('transferredJob', (timestamp, job) =>
50✔
668
        logger.info(`scheduler enqueuing job ${timestamp}  >>  ${JSON.stringify(job)}`)
1✔
669
    );
670

671
    await executor.multiWorker.start();
50✔
672
    await executor.scheduler.connect();
50✔
673
    await executor.scheduler.start();
50✔
674

675
    return 'Scheduler started';
50✔
676
}
677

678
/**
679
 * Adds start time of a build to timeout queue
680
 * @method startTimer
681
 * @param  {Object} config               Configuration
682
 * @param  {String} config.buildId       Unique ID for a build
683
 * @param  {String} config.startTime     Start time fo build
684
 * @param  {String} config.buildStatus     Status of build
685
 * @return {Promise}
686
 */
687
async function startTimer(executor, config) {
688
    try {
5✔
689
        await executor.connect();
5✔
690
        const { buildId, jobId, buildStatus, startTime } = config;
4✔
691

692
        if (buildStatus === 'RUNNING') {
4✔
693
            const buildTimeout = reach(config, 'annotations>screwdriver.cd/timeout', { separator: '>' });
2✔
694

695
            const value = parseInt(buildTimeout, 10);
2✔
696
            const timeout = Number.isNaN(value) ? DEFAULT_BUILD_TIMEOUT : value;
2✔
697

698
            const data = await executor.redisBreaker.runCommand('hget', executor.timeoutQueue, buildId);
2✔
699

700
            if (data) {
2!
701
                return Promise.resolve();
×
702
            }
703

704
            return await executor.redisBreaker.runCommand(
2✔
705
                'hset',
706
                executor.timeoutQueue,
707
                buildId,
708
                JSON.stringify({
709
                    jobId,
710
                    startTime,
711
                    timeout
712
                })
713
            );
714
        }
715

716
        return Promise.resolve();
2✔
717
    } catch (err) {
718
        logger.error(`Error occurred while saving to timeout queue ${err}`);
1✔
719

720
        return Promise.resolve();
1✔
721
    }
722
}
723

724
/**
725
 * Removes start time info key from timeout queue
726
 * @method status
727
 * @param  {Object} config               Configuration
728
 * @param  {String} config.buildId       Unique ID for a build
729
 * @return {Promise}
730
 */
731
async function stopTimer(executor, config) {
732
    try {
3✔
733
        await executor.connect();
3✔
734

735
        const data = await executor.redisBreaker.runCommand('hget', executor.timeoutQueue, config.buildId);
2✔
736

737
        if (!data) {
2✔
738
            return Promise.resolve();
1✔
739
        }
740

741
        return await executor.redisBreaker.runCommand('hdel', executor.timeoutQueue, config.buildId);
1✔
742
    } catch (err) {
743
        logger.error(`Error occurred while removing from timeout queue ${err}`);
1✔
744

745
        return Promise.resolve();
1✔
746
    }
747
}
748

749
/**
750
 * Stop a running or finished build
751
 * @async  _stop
752
 * @param  {Object} config               Configuration
753
 * @param  {Array}  config.blockedBy     Array of job IDs that this job is blocked by. Always blockedby itself
754
 * @param  {String} config.buildId       Unique ID for a build
755
 * @param  {String} config.jobId         JobID that this build belongs to
756
 * @return {Promise}
757
 */
758
async function stop(executor, config) {
759
    await executor.connect();
5✔
760

761
    const { buildId, jobId } = config; // in case config contains something else
4✔
762

763
    let blockedBy;
764

765
    if (config.blockedBy !== undefined) {
4✔
766
        blockedBy = config.blockedBy.toString();
3✔
767
    }
768

769
    const numDeleted = await executor.queueBreaker.runCommand('del', executor.buildQueue, 'start', [
4✔
770
        {
771
            buildId,
772
            jobId,
773
            blockedBy
774
        }
775
    ]);
776
    const deleteKey = `deleted_${jobId}_${buildId}`;
4✔
777
    let started = true;
4✔
778

779
    // This is to prevent the case where a build is aborted while still in buildQueue
780
    // The job might be picked up by the worker, so it's not deleted from buildQueue here
781
    // Immediately after, the job gets put back to the queue, so it's always inside buildQueue
782
    // This key will be cleaned up automatically or when it's picked up by the worker
783
    await executor.redisBreaker.runCommand('set', deleteKey, '');
4✔
784
    await executor.redisBreaker.runCommand('expire', deleteKey, EXPIRE_TIME);
4✔
785

786
    if (numDeleted !== 0) {
4✔
787
        // build hasn't started
788
        started = false;
2✔
789
    }
790

791
    return executor.queueBreaker.runCommand('enqueue', executor.buildQueue, 'stop', [
4✔
792
        {
793
            buildId,
794
            jobId,
795
            blockedBy,
796
            started // call executor.stop if the job already started
797
        }
798
    ]);
799
}
800

801
/**
802
 * Cleanup any related processing
803
 */
804
async function cleanUp(executor) {
805
    try {
1✔
806
        await executor.multiWorker.end();
1✔
807
        await executor.scheduler.end();
1✔
808
        await executor.queue.end();
1✔
809
    } catch (err) {
810
        logger.error(`failed to end executor queue: ${err}`);
×
811
    }
812
}
813

814
/**
815
 * Pushes a message to cache queue to clear it
816
 * @async  clearCache
817
 * @param {Object} executor
818
 * @param {Object} config
819
 */
820
async function clearCache(executor, config) {
821
    try {
3✔
822
        await executor.connect();
3✔
823

824
        return await executor.queueBreaker.runCommand('enqueue', executor.cacheQueue, 'clear', [
2✔
825
            {
826
                resource: 'caches',
827
                action: 'delete',
828
                prefix: executor.prefix,
829
                ...config
830
            }
831
        ]);
832
    } catch (err) {
833
        logger.error(`Error occurred while saving to cache queue ${err}`);
1✔
834

835
        throw err;
1✔
836
    }
837
}
838

839
/**
840
 * Pushes a message to unzip artifacts
841
 * @async  unzipArtifacts
842
 * @param  {Object} executor
843
 * @param  {Object} config               Configuration
844
 * @param  {String} config.buildId       Unique ID for a build
845
 * @return {Promise}
846
 */
847
async function unzipArtifacts(executor, config) {
848
    await executor.connect();
3✔
849
    const { buildId } = config;
2✔
850
    const tokenConfig = {
2✔
851
        username: buildId,
852
        scope: 'unzip_worker'
853
    };
854
    const token = executor.tokenGen(tokenConfig, TEMPORAL_UNZIP_TOKEN_TIMEOUT);
2✔
855

856
    const enq = await executor.queueBreaker.runCommand('enqueue', executor.unzipQueue, 'start', [
2✔
857
        {
858
            buildId,
859
            token
860
        }
861
    ]);
862

863
    return enq;
2✔
864
}
865

866
/**
867
 * Pushes webhooks to redis
868
 * @async  queueWebhook
869
 * @param  {Object} executor
870
 * @param  {Object} webhookConfig
871
 * @return {Promise}
872
 */
873
async function queueWebhook(executor, webhookConfig) {
874
    await executor.connect();
3✔
875

876
    return executor.queueBreaker.runCommand(
2✔
877
        'enqueue',
878
        executor.webhookQueue,
879
        'sendWebhook',
880
        JSON.stringify({
881
            webhookConfig,
882
            token: executor.tokenGen({
883
                service: 'queue',
884
                scope: ['webhook_worker']
885
            })
886
        })
887
    );
888
}
889

890
module.exports = {
54✔
891
    init,
892
    start,
893
    stop,
894
    startPeriodic,
895
    stopPeriodic,
896
    startFrozen,
897
    stopFrozen,
898
    startTimer,
899
    stopTimer,
900
    cleanUp,
901
    clearCache,
902
    unzipArtifacts,
903
    queueWebhook
904
};
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc