• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

restorecommerce / scheduling-srv / 8098985975

29 Feb 2024 04:04PM UTC coverage: 72.518% (+4.9%) from 67.613%
8098985975

push

github

Arun-KumarH
fix: external worker for additional default key

194 of 282 branches covered (68.79%)

Branch coverage included in aggregate %.

0 of 13 new or added lines in 1 file covered. (0.0%)

340 existing lines in 3 files now uncovered.

1574 of 2156 relevant lines covered (73.01%)

9.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.14
/src/schedulingService.ts
1
import * as _ from 'lodash-es';
1✔
2
import { errors } from '@restorecommerce/chassis-srv';
1✔
3
import * as kafkaClient from '@restorecommerce/kafka-client';
1✔
4
import { AuthZAction, ACSAuthZ, updateConfig, DecisionResponse, Operation, PolicySetRQResponse } from '@restorecommerce/acs-client';
1✔
5
import {
1✔
6
  JobServiceImplementation as SchedulingServiceServiceImplementation,
1✔
7
  JobFailed, JobDone, DeepPartial, JobList, JobListResponse, Data,
1✔
8
  Backoff_Type, JobOptions_Priority, JobReadRequest, JobReadRequest_SortOrder
1✔
9
} from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/job.js';
1✔
10
import { createClient, RedisClientType } from 'redis';
1✔
11
import { NewJob, Priority } from './types.js';
1✔
12
import pkg from 'cron-parser';
1✔
13
import * as crypto from 'node:crypto';
1✔
14
import { _filterJobData, _filterJobOptions, _filterQueuedJob, checkAccessRequest, marshallProtobufAny } from './utilts.js';
1✔
15
import * as uuid from 'uuid';
1✔
16
import { Logger } from 'winston';
1✔
17
import { Response_Decision } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/access_control.js';
1✔
18
import { Attribute } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/attribute.js';
1✔
19
import { DeleteRequest, DeleteResponse } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/resource_base.js';
1✔
20
import { Queue, QueueOptions, Job } from 'bullmq';
1✔
21

1✔
22
const { parseExpression } = pkg;
1✔
23
const JOB_DONE_EVENT = 'jobDone';
1✔
24
const JOB_FAILED_EVENT = 'jobFailed';
1✔
25
const DEFAULT_CLEANUP_COMPLETED_JOBS = 604800000; // 7 days in miliseconds
1✔
26
const COMPLETED_JOB_STATE = 'completed';
1✔
27
const FAILED_JOB_STATE = 'failed';
1✔
28
const QUEUE_CLEANUP = 'queueCleanup';
1✔
29

1✔
30
/**
1✔
31
 * A job scheduling service.
1✔
32
 */
1✔
33
export class SchedulingService implements SchedulingServiceServiceImplementation {
1✔
34

2✔
35
  jobEvents: kafkaClient.Topic;
2✔
36
  logger: Logger;
2✔
37

2✔
38
  queuesConfigList: any;
2✔
39
  queuesList: Queue[];
2✔
40
  defaultQueueName: string;
2✔
41

2✔
42
  redisClient: RedisClientType<any, any>;
2✔
43
  resourceEventsEnabled: boolean;
2✔
44
  canceledJobs: Set<string>;
2✔
45
  bullOptions: any;
2✔
46
  cfg: any;
2✔
47
  authZ: ACSAuthZ;
2✔
48
  authZCheck: boolean;
2✔
49
  repeatJobIdRedisClient: RedisClientType<any, any>;
2✔
50

2✔
51

2✔
52
  constructor(jobEvents: kafkaClient.Topic,
2✔
53
    private redisConfig: any, logger: any, redisClient: RedisClientType<any, any>,
2✔
54
    bullOptions: any, cfg: any, authZ: ACSAuthZ) {
2✔
55
    this.jobEvents = jobEvents;
2✔
56
    this.resourceEventsEnabled = true;
2✔
57
    this.bullOptions = bullOptions;
2✔
58
    this.logger = logger;
2✔
59
    this.queuesList = [];
2✔
60
    this.queuesConfigList = [];
2✔
61
    this.redisClient = redisClient;
2✔
62

2✔
63
    const repeatJobIdCfg = cfg.get('redis');
2✔
64
    repeatJobIdCfg.database = cfg.get('redis:db-indexes:db-repeatJobId');
2✔
65
    this.repeatJobIdRedisClient = createClient(repeatJobIdCfg);
2✔
66
    this.repeatJobIdRedisClient.on('error', (err) => logger.error('Redis client error in repeatable job store', err));
2✔
67
    this.repeatJobIdRedisClient.connect().then((data) => {
2✔
68
      logger.info('Redis client connection for repeatable job store successful');
2✔
69
    }).catch(err => logger.error('Redis client error for repeatable job store', err));
2✔
70

2✔
71
    this.canceledJobs = new Set<string>();
2✔
72
    this.cfg = cfg;
2✔
73
    this.authZ = authZ;
2✔
74
    this.authZCheck = this.cfg.get('authorization:enabled');
2✔
75

2✔
76
    // Read Queue Configuration file and find first queue which has "default": true,
2✔
77
    // then save it to defaultQueueName
2✔
78
    const queuesCfg = this.cfg.get('queue');
2✔
79
    if (_.isEmpty(queuesCfg)) {
2!
80
      this.logger.error('Queue configuration not found!');
×
81
      throw new Error('Queue configuration not found!');
×
UNCOV
82
    }
×
83
    let defaultTrueExists = false;
2✔
84
    for (let queueCfg of queuesCfg) {
2✔
85
      // Find configuration which has default=true
2✔
86
      if (queueCfg.default == true) {
2✔
87
        defaultTrueExists = true;
2✔
88
        this.defaultQueueName = queueCfg.name;
2✔
89
        break;
2✔
90
      }
2✔
91
    }
2✔
92
    if (!defaultTrueExists) {
2!
93
      this.logger.error('Queue default configuration not found!');
×
94
      throw new Error('Queue default configuration not found!');
×
UNCOV
95
    }
×
96

2✔
97
    // Create Queues
2✔
98
    for (let queueCfg of queuesCfg) {
2✔
99
      let queueOptions: QueueOptions;
6✔
100
      const prefix = queueCfg.name;
6✔
101
      const rateLimiting = queueCfg.rateLimiting;
6✔
102
      const advancedSettings = queueCfg.advancedSettings;
6✔
103

6✔
104
      queueOptions = {
6✔
105
        connection: {
6✔
106
          ...redisConfig,
6✔
107
        }
6✔
108
      };
6✔
109

6✔
110
      // Create Queue Configuration - Add Rate Limiting if enabled
6✔
111
      if (!_.isEmpty(rateLimiting) && rateLimiting.enabled == true) {
6!
112
        this.logger.info(`Queue: ${queueCfg.name} - Rate limiting is ENABLED.`);
×
UNCOV
113
      }
×
114

6✔
115
      if (!_.isEmpty(advancedSettings)) {
6✔
116
        queueOptions.settings = {
6✔
117
          ...advancedSettings,
6✔
118
        };
6✔
119
      }
6✔
120

6✔
121
      const redisURL = new URL((queueOptions.connection as any).url);
6✔
122

6✔
123
      if ('keyPrefix' in queueOptions.connection) {
6!
124
        delete queueOptions.connection.keyPrefix;
×
UNCOV
125
      }
×
126

6✔
127
      let queue = new Queue(prefix, {
6✔
128
        ...queueOptions,
6✔
129
        connection: {
6✔
130
          ...queueOptions.connection as any,
6✔
131
          host: redisURL.hostname,
6✔
132
          port: _.parseInt(redisURL.port)
6✔
133
        }
6✔
134
      });
6✔
135
      this.queuesList.push(queue);
6✔
136

6✔
137
      // Add Queue Configurations
6✔
138
      let queueCfgObj = {
6✔
139
        name: queueCfg.name,
6✔
140
        concurrency: queueCfg.concurrency,
6✔
141
        default: queueCfg.default,
6✔
142
        runMissedScheduled: queueCfg.runMissedScheduled
6✔
143
      };
6✔
144
      this.queuesConfigList.push(queueCfgObj);
6✔
145
    }
6✔
146
  }
2✔
147

2✔
148
  /**
2✔
149
   * Start handling the job queue, job scheduling and
2✔
150
   * managing job events.
2✔
151
   */
2✔
152
  async start(): Promise<any> {
2✔
153
    const logger = this.logger;
2✔
154
    const that = this;
2✔
155
    const events = [JOB_DONE_EVENT, JOB_FAILED_EVENT];
2✔
156
    for (let eventName of events) {
2✔
157
      // A Scheduling Service Event Listener
4✔
158
      await this.jobEvents.on(eventName, async (msg: any, ctx: any,
4✔
159
        config: any, eventName: string): Promise<any> => {
10✔
160
        let job = msg;
10✔
161
        // Match Job Type to Queue Name, else use Default Queue
10✔
162
        let queue = _.find(this.queuesList, { name: job.type });
10✔
163
        let defaultQueue = _.find(this.queuesList, { name: this.defaultQueueName });
10✔
164
        if (_.isEmpty(queue)) {
10!
165
          queue = defaultQueue;
×
UNCOV
166
        }
×
167

10✔
168
        if (eventName === JOB_FAILED_EVENT) {
10!
169
          logger.error(`job@${job.type}#${job.id} failed with error #${job.error}`,
×
UNCOV
170
            _filterQueuedJob<JobFailed>(job, this.logger));
×
171
        } else if (eventName === JOB_DONE_EVENT) {
10✔
172
          logger.verbose(`job#${job.id} done`, _filterQueuedJob<JobDone>(job, this.logger));
10✔
173
        }
10✔
174

10✔
175
        logger.info('Received Job event', { event: eventName });
10✔
176
        logger.info('Job details', job);
10✔
177
        const jobData: any = await queue.getJob(job.id).catch(error => {
10✔
178
          that.logger.error('Error retrieving job ${job.id} from queue', error);
×
179
        });
10✔
180

10✔
181
        if (job?.delete_scheduled) {
10✔
182
          await queue.removeRepeatable(jobData.name, jobData.opts.repeat);
2✔
183
        }
2✔
184
      });
4✔
185
    }
4✔
186

2✔
187
    // Initialize Event Listeners for each Queue
2✔
188
    for (let queue of this.queuesList) {
2✔
189
      queue.on('error', (error) => {
6✔
190
        logger.error('queue error', error);
×
191
      });
6✔
192
      queue.on('waiting', (job) => {
6✔
193
        logger.verbose(`job#${job.id} scheduled`, job);
15✔
194
      });
6✔
195
      queue.on('removed', (job) => {
6✔
196
        logger.verbose(`job#${job.id} removed`, job);
23✔
197
      });
6✔
198
      queue.on('progress', (job) => {
6✔
199
        logger.verbose(`job#${job.id} progress`, job);
×
200
      });
6✔
201
    }
6✔
202

2✔
203
    // If the scheduling service goes down and if there were
2✔
204
    // recurring jobs which have missed schedules then
2✔
205
    // we will need to reschedule it for those missing intervals.
2✔
206
    await this._rescheduleMissedJobs();
2✔
207
  }
2✔
208

2✔
209
  /**
2✔
210
   * To reschedule the missed recurring jobs upon service restart
2✔
211
   */
2✔
212
  async _rescheduleMissedJobs(): Promise<void> {
2✔
213
    // for jobs created via Kafka currently there are no acs checks
2✔
214
    this.disableAC();
2✔
215
    const createDispatch = [];
2✔
216
    let result: Job[] = [];
2✔
217
    let thiz = this;
2✔
218

2✔
219
    // Get the jobs
2✔
220
    for (let queueCfg of this.queuesConfigList) {
2✔
221
      // If enabled in the config, or the config is missing,b
6✔
222
      // Reschedule the missed jobs, else skip.
6✔
223
      let queue = _.find(this.queuesList, { name: queueCfg.name });
6✔
224
      let runMissedScheduled = queueCfg.runMissedScheduled;
6✔
225
      if (_.isNil(runMissedScheduled) ||
6✔
226
        (!_.isNil(runMissedScheduled) && runMissedScheduled == true)) {
6✔
227
        await queue.getJobs(this.bullOptions['activeAndFutureJobTypes']).then(jobs => {
6✔
228
          result = result.concat(jobs);
6✔
229
        }).catch(error => {
6✔
230
          thiz.logger.error('Error reading jobs to reschedule the missed recurring jobs', error);
×
231
        });
6✔
232
      }
6✔
233
    }
6✔
234
    let lastRunTime;
2✔
235
    for (let job of result) {
2!
UNCOV
236
      // get the last run time for the job, we store the last run time only
×
UNCOV
237
      // for recurring jobs
×
238
      if (job?.name) {
×
239
        try {
×
240
          lastRunTime = await this.redisClient.get(job.name);
×
UNCOV
241
        } catch (err) {
×
242
          this.logger.error('Error reading the last run time for job type:', { name: job.name, code: err.code, message: err.message, stack: err.stack });
×
UNCOV
243
        }
×
UNCOV
244
      }
×
UNCOV
245
      // we store lastRunTime only for recurring jobs and if it exists check
×
UNCOV
246
      // cron interval and schedule immediate jobs for missed intervals
×
247
      this.logger.info(`Last run time of ${job.name} Job was:`, lastRunTime);
×
248
      if (lastRunTime) {
×
UNCOV
249
        // convert redis string value to object and get actual time value
×
250
        try {
×
251
          lastRunTime = JSON.parse(lastRunTime);
×
UNCOV
252
        } catch (error) {
×
253
          this.logger.error('Error parsing lastRunTime', {
×
UNCOV
254
            code: error.code,
×
UNCOV
255
            message: error.message, stack: error.stack
×
UNCOV
256
          });
×
UNCOV
257
        }
×
UNCOV
258

×
259
        if ((job?.opts?.repeat as any)?.pattern && lastRunTime?.time) {
×
260
          let options = {
×
UNCOV
261
            currentDate: new Date(lastRunTime.time),
×
UNCOV
262
            endDate: new Date(),
×
UNCOV
263
            iterator: true
×
UNCOV
264
          };
×
UNCOV
265
          let intervalTime;
×
266
          try {
×
267
            intervalTime =
×
UNCOV
268
              parseExpression((job.opts.repeat as any).pattern, options);
×
UNCOV
269
          } catch (error) {
×
270
            this.logger.error('Error parsing cron expression running missed schedules', { code: error.code, message: error.message, stack: error.stack });
×
UNCOV
271
          }
×
272
          while (intervalTime?.hasNext()) {
×
273
            let nextInterval: any = intervalTime.next();
×
274
            const nextIntervalTime = nextInterval.value.toString();
×
UNCOV
275
            // schedule it as one time job for now or immediately
×
276
            const data = {
×
UNCOV
277
              payload: marshallProtobufAny({
×
UNCOV
278
                value: { time: nextIntervalTime }
×
UNCOV
279
              })
×
UNCOV
280
            };
×
281
            const currentTime = new Date();
×
282
            const when = new Date(currentTime.setSeconds(currentTime.getSeconds() + 2)).toISOString();
×
283
            const immediateJob: any = {
×
UNCOV
284
              type: job.name,
×
UNCOV
285
              data,
×
UNCOV
286
              // give a delay of 2 sec between each job
×
UNCOV
287
              // to avoid time out of queued jobs
×
UNCOV
288
              when,
×
UNCOV
289
              options: {}
×
UNCOV
290
            };
×
291
            createDispatch.push(thiz.create({
×
UNCOV
292
              items: [immediateJob],
×
UNCOV
293
              total_count: 0,
×
UNCOV
294
            }, {}));
×
UNCOV
295
          }
×
UNCOV
296
        }
×
UNCOV
297
      }
×
UNCOV
298
    }
×
299
    this.restoreAC();
2✔
300
    await createDispatch;
2✔
301
  }
2✔
302

2✔
303
  /**
2✔
304
   * Disabling of CRUD events.
2✔
305
   */
2✔
306
  disableEvents(): void {
2✔
307
    this.resourceEventsEnabled = false;
×
UNCOV
308
  }
×
309

2✔
310
  /**
2✔
311
   * Enabling of CRUD events.
2✔
312
   */
2✔
313
  enableEvents(): any {
2✔
314
    this.resourceEventsEnabled = true;
×
UNCOV
315
  }
×
316

2✔
317
  _validateJob(job: NewJob): NewJob {
2✔
318
    if (_.isNil(job.type)) {
18!
319
      throw new errors.InvalidArgument('Job type not specified.');
×
UNCOV
320
    }
×
321

18✔
322
    if (!job.options) {
18!
323
      job.options = {};
×
UNCOV
324
    }
×
325

18✔
326
    if (job.when) {
18✔
327
      // If the jobSchedule time has already lapsed then do not schedule the job
13✔
328
      const jobScheduleTime = new Date(job.when).getTime();
13✔
329
      const currentTime = new Date().getTime();
13✔
330
      if (jobScheduleTime < currentTime) {
13!
331
        throw new errors.InvalidArgument('Cannot schedule a job for an elapsed time');
×
UNCOV
332
      }
×
333

13✔
334
      job.options.delay = jobScheduleTime - currentTime;
13✔
335
    }
13✔
336

18✔
337
    if (job.options.backoff && typeof job.options.backoff !== 'number') {
18✔
338
      if (typeof job.options.backoff.type === 'number') {
18!
339
        job.options.backoff.type = Object.keys(Backoff_Type)[job.options.backoff.type];
×
UNCOV
340
      }
×
341
      job.options.backoff.type = job.options.backoff.type.toLowerCase();
18✔
342
    }
18✔
343

18✔
344
    if (job.options.priority && typeof job.options.priority === 'string') {
18✔
345
      job.options.priority = JobOptions_Priority[job.options.priority] as any;
18✔
346
    }
18✔
347

18✔
348
    if (_.isEmpty(job.data)) {
18!
349
      throw new errors.InvalidArgument('No job data specified.');
×
UNCOV
350
    }
×
351

18✔
352
    job.data = _filterJobData(job.data, false, this.logger);
18✔
353

18✔
354
    return job;
18✔
355
  }
18✔
356

2✔
357
  /**
2✔
358
   * get next job execution time in mili seconds
2✔
359
   * @param millis
2✔
360
   * @param opts
2✔
361
   */
2✔
362
  getNextMillis(millis, opts) {
2✔
363
    if (opts?.every) {
1!
364
      return Math.floor(millis / opts.every) * opts.every + opts.every;
×
UNCOV
365
    }
×
366

1✔
367
    const currentDate =
1✔
368
      opts?.startDate && new Date(opts.startDate) > new Date(millis)
1!
369
        ? new Date(opts.startDate)
1!
370
        : new Date(millis);
1✔
371
    const interval = parseExpression(
1✔
372
      opts.cron,
1✔
373
      _.defaults(
1✔
374
        {
1✔
375
          currentDate
1✔
376
        },
1✔
377
        opts
1✔
378
      )
1✔
379
    );
1✔
380

1✔
381
    try {
1✔
382
      return interval.next().getTime();
1✔
383
    } catch (e) {
1!
384
      this.logger.error('Error getting next job execution time');
×
UNCOV
385
    }
×
386
  }
1✔
387

2✔
388
  private md5(str) {
2✔
389
    return crypto
×
UNCOV
390
      .createHash('md5')
×
UNCOV
391
      .update(str)
×
UNCOV
392
      .digest('hex');
×
UNCOV
393
  }
×
394

2✔
395
  /**
2✔
396
   * store the mapping from repeateKey to external interface SCS job Id, and
2✔
397
   * also the mapping other way around i.e. from SCS job Id to repeatKey (needed for read operations)
2✔
398
   * @param name - job name
2✔
399
   * @param repeat - job repeate options
2✔
400
   * @param jobId - job id
2✔
401
   */
2✔
402
  async storeRepeatKey(repeatId, scsJobId, options) {
2✔
403
    try {
18✔
404
      if (repeatId && scsJobId) {
18✔
405
        this.logger.info('Repeat key mapped to external SCS JobId', { repeatId, scsJobId });
18✔
406
        await this.repeatJobIdRedisClient.set(repeatId, scsJobId);
18✔
407
        const jobIdData = { repeatId, options };
18✔
408
        await this.repeatJobIdRedisClient.set(scsJobId, JSON.stringify(jobIdData));
18✔
409
      }
18✔
410
    } catch (error) {
18!
411
      this.logger.error('Error storing repeatKey to redis', {
×
UNCOV
412
        code: error.code,
×
UNCOV
413
        message: error.message, stack: error.stack
×
UNCOV
414
      });
×
UNCOV
415
    }
×
416
  }
18✔
417

2✔
418
  private idGen(): string {
2✔
419
    return uuid.v4().replace(/-/g, '');
14✔
420
  }
14✔
421

2✔
422
  /**
2✔
423
   * Create and queue jobs.
2✔
424
   * @param {any} call RPC call argument
2✔
425
   * @param {any} ctx RPC context
2✔
426
   */
2✔
427
  async create(request: JobList, ctx: any): Promise<DeepPartial<JobListResponse>> {
2✔
428
    let jobListResponse: JobListResponse = { items: [], operation_status: { code: 0, message: '' }, total_count: 0 };
13✔
429
    let subject = request.subject;
13✔
430
    if (_.isNil(request) || _.isNil(request.items)) {
13!
431
      return {
×
UNCOV
432
        items: [],
×
UNCOV
433
        total_count: 0,
×
UNCOV
434
        operation_status: {
×
UNCOV
435
          code: 400,
×
UNCOV
436
          message: 'Missing items in create request'
×
UNCOV
437
        }
×
UNCOV
438
      };
×
UNCOV
439
    }
×
440

13✔
441
    await this.createMetadata(request.items, AuthZAction.CREATE, subject);
13✔
442
    let acsResponse: DecisionResponse;
13✔
443
    try {
13✔
444
      if (!ctx) { ctx = {}; };
13!
445
      ctx.subject = subject;
13✔
446
      ctx.resources = request?.items;
13✔
447
      acsResponse = await checkAccessRequest(ctx, [{
13✔
448
        resource: 'job',
13✔
449
        id: request.items.map(item => item.id)
13✔
450
      }], AuthZAction.CREATE, Operation.isAllowed);
13✔
451
    } catch (err) {
13!
452
      this.logger.error('Error requesting access-control-srv for create meta data', { code: err.code, message: err.message, stack: err.stack });
×
453
      return {
×
UNCOV
454
        items: [],
×
UNCOV
455
        total_count: 0,
×
UNCOV
456
        operation_status: {
×
UNCOV
457
          code: err.code,
×
UNCOV
458
          message: err.message
×
UNCOV
459
        }
×
UNCOV
460
      };
×
UNCOV
461
    }
×
462
    if (acsResponse.decision != Response_Decision.PERMIT) {
13✔
463
      return { items: [], total_count: 0, operation_status: acsResponse.operation_status };
1✔
464
    }
1✔
465

12✔
466
    let jobs: NewJob[] = [];
12✔
467
    for (let job of request?.items || []) {
13!
468
      try {
18✔
469
        jobs.push(this._validateJob(job as any));
18✔
470
      } catch (err) {
18!
471
        this.logger.error('Error validating job', job);
×
472
        jobListResponse.items.push({
×
UNCOV
473
          status: {
×
UNCOV
474
            id: job.id,
×
UNCOV
475
            code: 400,
×
UNCOV
476
            message: err.message
×
UNCOV
477
          }
×
UNCOV
478
        });
×
UNCOV
479
      }
×
480
    }
18✔
481

12✔
482
    let result: Job[] = [];
12✔
483
    // Scheduling jobs
12✔
484
    for (let i = 0; i < jobs.length; i += 1) {
13✔
485
      let job = jobs[i];
18✔
486
      // if not jobID is specified generate a UUID
18✔
487
      if (!job.id) {
18✔
488
        job.id = this.idGen();
14✔
489
      }
14✔
490

18✔
491
      // map the id to jobId as needed in JobOpts for bull
18✔
492
      if (job?.id) {
18✔
493
        // check if jobID already exists then map it as already exists error
18✔
494
        const existingJobId = await this.getRedisValue(job.id);
18✔
495
        if (existingJobId) {
18!
UNCOV
496
          // read job to check if data exists
×
497
          const jobData = await this.read(JobReadRequest.fromPartial({
×
UNCOV
498
            filter: {
×
UNCOV
499
              job_ids: [job.id]
×
UNCOV
500
            },
×
UNCOV
501
            subject
×
UNCOV
502
          }), ctx);
×
503
          if ((jobData?.items as any)[0]?.payload) {
×
504
            jobListResponse.items.push({
×
UNCOV
505
              status: {
×
UNCOV
506
                id: job.id,
×
UNCOV
507
                code: 403,
×
UNCOV
508
                message: `Job with ID ${job.id} already exists`
×
UNCOV
509
              }
×
UNCOV
510
            });
×
511
            continue;
×
UNCOV
512
          }
×
UNCOV
513
        }
×
514
        if (!job?.options) {
18!
515
          job.options = { jobId: job.id };
×
516
        } else {
18✔
517
          job.options.jobId = job.id;
18✔
518
        }
18✔
519
        if (job?.options?.repeat) {
18✔
520
          (job as any).options.repeat.jobId = job.id;
3✔
521
        }
3✔
522
      }
18✔
523

18✔
524
      if (!job.data.meta) {
18!
525
        const now = new Date();
×
526
        const metaObj = {
×
UNCOV
527
          created: now,
×
UNCOV
528
          modified: now,
×
UNCOV
529
          modified_by: '',
×
UNCOV
530
          owners: []
×
UNCOV
531
        };
×
532
        Object.assign(job.data, { meta: metaObj });
×
UNCOV
533
      }
×
534
      // if only owners are specified in meta
18✔
535
      if (job.data.meta && (!job.data.meta.created || !job.data.meta.modified)) {
18✔
536
        job.data.meta.created = new Date();
15✔
537
        job.data.meta.modified = new Date();
15✔
538
      }
15✔
539
      if (job?.data?.meta) {
18✔
540
        job.data.meta.created_by = subject?.id;
18✔
541
        job.data.meta.modified_by = subject?.id;
18✔
542
      }
18✔
543

18✔
544
      if (job?.data?.payload?.value) {
18✔
545
        job.data.payload.value = job.data.payload.value.toString() as any;
18✔
546
      }
18✔
547

18✔
548
      // convert enum priority back to number as it's expected by bull
18✔
549
      if (job?.options?.priority) {
18✔
550
        job.options.priority = typeof job.options.priority === 'number' ? job.options.priority : Priority[job.options.priority] as unknown as number;
18!
551
      }
18✔
552

18✔
553
      // if its a repeat job and tz is empty delete the key (else cron parser throws an error)
18✔
554
      if (job?.options?.repeat?.tz === '') {
18!
555
        delete job.options.repeat.tz;
×
UNCOV
556
      }
×
557

18✔
558
      const bullOptions = {
18✔
559
        ...job.options
18✔
560
      };
18✔
561

18✔
562
      if ((bullOptions as any).timeout === 1) {
18✔
563
        delete bullOptions['timeout'];
2✔
564
      }
2✔
565

18✔
566
      // Match the Job Type with the Queue Name and add the Job to this Queue.
18✔
567
      // If there is no match, add the Job to the Default Queue
18✔
568
      let queue = _.find(this.queuesList, { name: job?.queue_name });
18✔
569
      if (_.isEmpty(queue)) {
18!
570
        queue = _.find(this.queuesList, { name: this.defaultQueueName });
×
UNCOV
571
      }
×
572
      const submittedJob = await queue.add(job.type, job.data, bullOptions);
18✔
573
      if (submittedJob?.id?.startsWith('repeat:')) {
18✔
574
        const repeatJobId = submittedJob?.id?.split(':')[1];
3✔
575
        await this.storeRepeatKey(repeatJobId, job.id, job.options);
3✔
576
      } else if (submittedJob?.id) {
18✔
577
        // future job with when
15✔
578
        await this.storeRepeatKey(submittedJob.id, job.id, job.options);
15✔
579
      }
15✔
580
      this.logger.verbose(`job@${job.type} created`, job);
18✔
581
      result.push(submittedJob);
18✔
582
    }
18✔
583

12✔
584
    for (let job of result) {
13✔
585
      let jobId = job.id as string;
18✔
586
      if (jobId.startsWith('repeat:')) {
18✔
587
        const repeatKey = jobId.split(':')[1];
3✔
588
        job.id = await this.getRedisValue(repeatKey);
3✔
589
      }
3✔
590
    }
18✔
591

12✔
592
    for (let job of result) {
13✔
593
      let when = job?.opts?.delay ? new Date(job?.opts?.delay).toString() : '';
18✔
594
      jobListResponse.items.push({
18✔
595
        payload: {
18✔
596
          id: job.id as string,
18✔
597
          type: job.name,
18✔
598
          queue_name: job?.queueName,
18✔
599
          data: _filterJobData(job.data, true, this.logger),
18✔
600
          options: _filterJobOptions(job.opts) as any,
18✔
601
          when
18✔
602
        },
18✔
603
        status: {
18✔
604
          id: (job?.id)?.toString(),
18✔
605
          code: 200,
18✔
606
          message: 'success'
18✔
607
        }
18✔
608
      });
18✔
609
    }
18✔
610
    const jobList = {
12✔
611
      items: result.map(job => ({
12✔
612
        id: job.id,
18✔
613
        type: job.name,
18✔
614
        data: _filterJobData(job.data, true, this.logger),
18✔
615
        options: _filterJobOptions(job.opts)
18✔
616
      })),
12✔
617
      total_count: result.length
12✔
618
    };
12✔
619

12✔
620
    if (this.resourceEventsEnabled) {
12✔
621
      await this.jobEvents.emit('jobsCreated', jobList);
12✔
622
    }
12✔
623

12✔
624
    jobListResponse.operation_status = { code: 200, message: 'success' };
12✔
625
    return jobListResponse;
12✔
626
  }
12✔
627

2✔
628
  private filterByOwnerShip(customArgsObj, result) {
2✔
629
    // applying filter based on custom arguments (filterByOwnerShip)
39✔
630
    let customArgs = (customArgsObj)?.custom_arguments;
39✔
631
    if (customArgs?.value) {
39!
UNCOV
632
      let customArgsFilter;
×
UNCOV
633
      try {
×
UNCOV
634
        customArgsFilter = JSON.parse(customArgs.value.toString());
×
UNCOV
635
      } catch (error) {
×
636
        this.logger.error('Error parsing custom query arguments', {
×
UNCOV
637
          code: error.code,
×
UNCOV
638
          message: error.message, stack: error.stack
×
UNCOV
639
        });
×
UNCOV
640
      }
×
UNCOV
641
      if (!customArgsFilter) {
×
642
        return [];
×
UNCOV
643
      }
×
UNCOV
644
      const ownerIndicatorEntity = customArgsFilter.entity;
×
UNCOV
645
      const ownerValues = customArgsFilter.instance;
×
UNCOV
646
      const ownerIndictaorEntURN = this.cfg.get('authorization:urns:ownerIndicatoryEntity');
×
UNCOV
647
      const ownerInstanceURN = this.cfg.get('authorization:urns:ownerInstance');
×
UNCOV
648
      result = result.filter(job => {
×
UNCOV
649
        if (job?.data?.meta?.owners?.length > 0) {
×
UNCOV
650
          for (let owner of job.data.meta.owners) {
×
UNCOV
651
            if (owner?.id === ownerIndictaorEntURN && owner?.value === ownerIndicatorEntity && owner?.attributes?.length > 0) {
×
UNCOV
652
              for (let ownerInstObj of owner.attributes) {
×
UNCOV
653
                if (ownerInstObj?.id === ownerInstanceURN && ownerInstObj?.value && ownerValues.includes(ownerInstObj.value)) {
×
UNCOV
654
                  return job;
×
UNCOV
655
                }
×
UNCOV
656
              }
×
UNCOV
657
            }
×
UNCOV
658
          }
×
UNCOV
659
        }
×
UNCOV
660
      });
×
UNCOV
661
    }
×
662
    return result;
39✔
663
  }
39✔
664

2✔
665
  async deleteRedisKey(key: string): Promise<any> {
2✔
666
    try {
5✔
667
      await this.repeatJobIdRedisClient.del(key);
5✔
668
      this.logger.debug('Redis Key deleted successfully used for mapping repeatable jobID', { key });
5✔
669
    } catch (err) {
5!
670
      this.logger.error('Error deleting redis key', { key, msg: err.message, stack: err.stack });
×
UNCOV
671
    }
×
672
  }
5✔
673

2✔
674
  async getRedisValue(key: string): Promise<any> {
2✔
675
    let redisValue;
70✔
676
    try {
70✔
677
      if (key) {
70✔
678
        redisValue = await this.repeatJobIdRedisClient.get(key);
70✔
679
      }
70✔
680
      if (redisValue) {
70✔
681
        return JSON.parse(redisValue);
50✔
682
      } else {
70✔
683
        return;
20✔
684
      }
20✔
685
    } catch (err) {
70✔
686
      if (err?.message?.startsWith('Unexpected token') || err.message.startsWith('Unexpected number') || err.message.startsWith('Unexpected non-whitespace character')) {
24✔
687
        return redisValue;
24✔
688
      } else {
24!
689
        this.logger.error('Error reading redis key', { key, msg: err.message, stack: err.stack });
×
UNCOV
690
      }
×
691
    }
24✔
692
  }
70✔
693

2✔
694

2✔
695
  /**
2✔
696
   * Retrieve jobs.
2✔
697
   * @param {any} call RPC call argument
2✔
698
   * @param {any} ctx RPC context
2✔
699
   */
2✔
700
  async read(request: JobReadRequest, ctx: any): Promise<DeepPartial<JobListResponse>> {
2✔
701
    let jobListResponse: JobListResponse = { items: [], operation_status: { code: 0, message: '' }, total_count: 0 };
42✔
702
    let subject = request.subject;
42✔
703
    let acsResponse: PolicySetRQResponse;
42✔
704
    try {
42✔
705
      if (!ctx) { ctx = {}; };
42!
706
      ctx.subject = subject;
42✔
707
      ctx.resources = [];
42✔
708
      acsResponse = await checkAccessRequest(ctx, [{ resource: 'job' }], AuthZAction.READ,
42✔
709
        Operation.whatIsAllowed) as PolicySetRQResponse;
42✔
710
    } catch (err) {
42!
711
      this.logger.error('Error requesting access-control-srv for read operation', { code: err.code, message: err.message, stack: err.stack });
×
712
      return {
×
UNCOV
713
        operation_status: {
×
UNCOV
714
          code: err.code,
×
UNCOV
715
          message: err.message
×
UNCOV
716
        }
×
UNCOV
717
      };
×
UNCOV
718
    }
×
719
    if (acsResponse.decision != Response_Decision.PERMIT) {
42✔
720
      return { operation_status: acsResponse.operation_status };
3✔
721
    }
3✔
722

39✔
723
    let result: Job[] = [];
39✔
724
    if (_.isEmpty(request) || _.isEmpty(request.filter)
39✔
725
      && (!request.filter || !request.filter.job_ids
39!
726
        || _.isEmpty(request.filter.job_ids))
24!
727
      && (!request.filter || !request.filter.type ||
39!
728
        _.isEmpty(request.filter.type))) {
42✔
729
      result = await this._getJobList();
24✔
730
      let custom_arguments;
24✔
731
      if (acsResponse?.custom_query_args?.length > 0) {
24!
UNCOV
732
        custom_arguments = acsResponse.custom_query_args[0].custom_arguments;
×
UNCOV
733
      }
×
734
      result = this.filterByOwnerShip({ custom_arguments }, result);
24✔
735
    } else {
42✔
736
      const that = this;
15✔
737
      let jobIDs = request.filter.job_ids || [];
15✔
738
      if (!_.isArray(jobIDs)) {
15!
739
        jobIDs = [jobIDs];
×
UNCOV
740
      }
×
741
      const typeFilterName = request.filter.type;
15✔
742

15✔
743
      // Search in all the queues and retrieve jobs after JobID
15✔
744
      // and add them to the jobIDsCopy list.
15✔
745
      // If the job is not found in any of the queues, continue looking
15✔
746
      // Finally compare the two lists and add an error to status for which
15✔
747
      // job could not be found in any of the queues.
15✔
748
      if (jobIDs.length > 0) {
15✔
749
        // jobIDsCopy should contain the jobIDs duplicate values
14✔
750
        // after the for loop ends
14✔
751
        let jobIDsCopy: string[] = [];
14✔
752
        for (let jobID of jobIDs) {
14✔
753
          const jobIdData = await this.getRedisValue(jobID as string);
14✔
754
          // future jobs scheduled with `when` will have same repeatId as external SCS jobID
14✔
755
          if (jobIdData?.repeatId && (jobIdData.repeatId != jobID)) {
14✔
756
            const repeatId = jobIdData.repeatId;
1✔
757
            if (jobIdData?.options?.repeat?.cron && jobIdData?.options?.repeat?.every) {
1!
758
              jobListResponse.items.push({
×
UNCOV
759
                status: {
×
UNCOV
760
                  id: jobID.toString(),
×
UNCOV
761
                  code: 400,
×
UNCOV
762
                  message: 'Both .cron and .every options are defined for this repeatable job'
×
UNCOV
763
                }
×
UNCOV
764
              });
×
765
              continue;
×
UNCOV
766
            }
×
767
            const nextMillis = this.getNextMillis(Date.now(), jobIdData.options.repeat);
1✔
768
            this.logger.debug('Repeatable job identifier', { id: jobID, repeatId: `repeat:${repeatId}:${nextMillis}` });
1✔
769
            // map the repeatKey with nextmilis for bull repeatable jobID
1✔
770
            jobID = `repeat:${repeatId}:${nextMillis}`;
1✔
771
          }
1✔
772
          for (let queue of this.queuesList) {
14✔
773
            await new Promise((resolve, reject) => {
42✔
774
              // getJob returns job or null
42✔
775
              queue.getJob(jobID).then(job => {
42✔
776
                if (job?.opts?.repeat?.pattern) {
42✔
777
                  (job.opts.repeat as any).cron = job.opts.repeat.pattern;
1✔
778
                  delete job.opts.repeat.pattern;
1✔
779
                }
1✔
780
                resolve(job);
42✔
781
                if (!_.isEmpty(job)) {
42✔
782
                  result.push(job);
13✔
783
                  if ((job as any)?.opts?.repeat?.jobId) {
13✔
784
                    jobIDsCopy.push((job as any).opts.repeat.jobId);
1✔
785
                  } else {
13✔
786
                    jobIDsCopy.push(jobID);
12✔
787
                  }
12✔
788
                }
13✔
789
              }).catch(err => {
42✔
790
                that.logger.error(`Error reading job ${jobID}`, err);
×
791
                if (err?.code && typeof err.code === 'string') {
×
792
                  err.code = 500;
×
UNCOV
793
                }
×
794
                jobListResponse.items.push({
×
UNCOV
795
                  status: {
×
UNCOV
796
                    id: jobID.toString(),
×
UNCOV
797
                    code: err.code,
×
UNCOV
798
                    message: err.message
×
UNCOV
799
                  }
×
UNCOV
800
                });
×
801
              });
42✔
802
            });
42✔
803
          }
42✔
804
        }
14✔
805
        if (!_.isEqual(jobIDs.sort(), jobIDsCopy.sort())) {
14✔
806
          const jobIDsDiff = _.difference(jobIDs, jobIDsCopy);
1✔
807
          for (let jobId of jobIDsDiff) {
1✔
808
            jobListResponse.items.push({
1✔
809
              status: {
1✔
810
                id: jobId.toString(),
1✔
811
                code: 404,
1✔
812
                message: `Job ID ${jobId} not found in any of the queues`
1✔
813
              }
1✔
814
            });
1✔
815
          }
1✔
816
        }
1✔
817
      } else {
15✔
818
        try {
1✔
819
          let jobsList: any[] = [];
1✔
820
          for (let queue of this.queuesList) {
1✔
821
            const getJobsResult = await queue.getJobs(this.bullOptions['activeAndFutureJobTypes']);
3✔
822
            getJobsResult.forEach((job) => {
3✔
823
              if (job?.opts?.repeat?.pattern) {
5!
824
                (job.opts.repeat as any).cron = job.opts.repeat.pattern;
×
825
                delete job.opts.repeat.pattern;
×
UNCOV
826
              }
×
827
            });
3✔
828
            jobsList = jobsList.concat(getJobsResult);
3✔
829
          }
3✔
830
          result = jobsList;
1✔
831
        } catch (err) {
1!
832
          that.logger.error('Error reading jobs', err);
×
833
          if (typeof err.code === 'string') {
×
834
            err.code = 500;
×
UNCOV
835
          }
×
836
          return {
×
UNCOV
837
            operation_status: {
×
UNCOV
838
              code: err.code,
×
UNCOV
839
              message: err.message
×
UNCOV
840
            }
×
UNCOV
841
          };
×
UNCOV
842
        }
×
843
      }
1✔
844

15✔
845
      if (typeFilterName) {
15✔
846
        result = result.filter(job => job.name === typeFilterName);
2✔
847
      }
2✔
848
      let custom_arguments;
15✔
849
      if (acsResponse?.custom_query_args?.length > 0) {
15!
UNCOV
850
        custom_arguments = acsResponse.custom_query_args[0].custom_arguments;
×
UNCOV
851
      }
×
852
      result = this.filterByOwnerShip({ custom_arguments }, result);
15✔
853
    }
15✔
854

39✔
855
    result = result.filter(valid => !!valid);
39✔
856

39✔
857
    if (!_.isEmpty(request) && !_.isEmpty(request.sort)
39✔
858
      && _.includes(['ASCENDING', 'DESCENDING'], request.sort)) {
42✔
859
      let sort;
2✔
860
      switch (request.sort) {
2✔
861
        case JobReadRequest_SortOrder.DESCENDING:
2✔
862
          sort = 'desc';
1✔
863
          break;
1✔
864
        case JobReadRequest_SortOrder.ASCENDING:
2✔
865
          sort = 'asc';
1✔
866
          break;
1✔
867
        default:
2!
868
          this.logger.error(`Unknown sort option ${sort}`);
×
869
      }
2✔
870
      result = _.orderBy(result, ['id'], [sort]);
2✔
871
    }
2✔
872

39✔
873
    for (let job of result) {
42✔
874
      let jobId = job.id as string;
62✔
875
      if (jobId.startsWith('repeat:')) {
62✔
876
        const repeatKey = jobId.split(':')[1];
21✔
877
        // it could be possible the redis repeat key is deleted on index 8 and old completed
21✔
878
        // jobs exist in data_store if delete on complete was not set to true for repeatable jobs
21✔
879
        const jobRedisId = await this.getRedisValue(repeatKey);
21✔
880
        if (jobRedisId) {
21✔
881
          job.id = jobRedisId;
21✔
882
        }
21✔
883
      }
21✔
884
    }
62✔
885

39✔
886
    for (let job of result) {
42✔
887
      let when = job?.opts?.delay ? new Date(job?.opts?.delay).toString() : '';
62!
888
      jobListResponse.items.push({
62✔
889
        payload: {
62✔
890
          id: job.id as string,
62✔
891
          type: job.name,
62✔
892
          queue_name: job.queueName,
62✔
893
          data: _filterJobData(job.data, true, this.logger),
62✔
894
          options: _filterJobOptions(job.opts) as any,
62✔
895
          when
62✔
896
        },
62✔
897
        status: {
62✔
898
          id: job.id.toString(),
62✔
899
          code: 200,
62✔
900
          message: 'success'
62✔
901
        }
62✔
902
      });
62✔
903
    }
62✔
904
    jobListResponse.total_count = jobListResponse?.items?.length;
42✔
905
    jobListResponse.operation_status = {
42✔
906
      code: 200,
42✔
907
      message: 'success'
42✔
908
    };
42✔
909
    return jobListResponse;
42✔
910
  }
42✔
911

2✔
912
  async _getJobList(): Promise<Job[]> {
2✔
913
    let jobsList: any[] = [];
28✔
914
    for (let queue of this.queuesList) {
28✔
915
      const getJobsResult = await queue.getJobs(this.bullOptions['activeAndFutureJobTypes']);
84✔
916
      getJobsResult.forEach((job) => {
84✔
917
        if (job?.opts?.repeat?.pattern) {
54!
918
          (job.opts.repeat as any).cron = job.opts.repeat.pattern;
×
919
          delete job.opts.repeat.pattern;
×
UNCOV
920
        }
×
921
      });
84✔
922
      jobsList = jobsList.concat(getJobsResult);
84✔
923
    }
84✔
924
    return jobsList;
28✔
925
  }
28✔
926

2✔
927
  // delete a job by its job instance after processing 'jobDone' / 'jobFailed'
2✔
928
  async _deleteJobInstance(jobId: string, queue: Queue): Promise<void> {
2✔
929
    return this._removeBullJob(jobId, queue);
×
UNCOV
930
  }
×
931

2✔
932
  /**
2✔
933
   * Delete Job from queue.
2✔
934
   */
2✔
935
  async delete(request: DeleteRequest, ctx: any): Promise<DeepPartial<DeleteResponse>> {
2✔
936
    let deleteResponse: DeleteResponse = { status: [], operation_status: { code: 0, message: '' } };
9✔
937
    if (_.isEmpty(request)) {
9!
938
      return {
×
UNCOV
939
        operation_status: {
×
UNCOV
940
          code: 400,
×
UNCOV
941
          message: 'No arguments provided for delete operation'
×
UNCOV
942
        }
×
UNCOV
943
      };
×
UNCOV
944
    }
×
945
    const subject = request?.subject;
9✔
946
    const jobIDs = request?.ids;
9✔
947
    let resources = [];
9✔
948
    let action;
9✔
949
    if (jobIDs) {
9✔
950
      action = AuthZAction.DELETE;
4✔
951
      if (_.isArray(jobIDs)) {
4✔
952
        for (let id of jobIDs) {
4✔
953
          resources.push({ id });
4✔
954
        }
4✔
955
      } else {
4!
956
        resources = [{ id: jobIDs }];
×
UNCOV
957
      }
×
958
      await this.createMetadata(resources, action, subject);
4✔
959
    }
4✔
960
    if (request.collection) {
9✔
961
      action = AuthZAction.DROP;
5✔
962
      resources = [{ collection: request.collection }];
5✔
963
    }
5✔
964
    let acsResponse: DecisionResponse;
9✔
965
    try {
9✔
966
      if (!ctx) { ctx = {}; };
9!
967
      ctx.subject = subject;
9✔
968
      ctx.resources = resources;
9✔
969
      acsResponse = await checkAccessRequest(ctx, [{ resource: 'job', id: jobIDs as string[] }], action,
9✔
970
        Operation.isAllowed);
9✔
971
    } catch (err) {
9!
972
      this.logger.error('Error requesting access-control-srv for delete operation', { code: err.code, message: err.message, stack: err.stack });
×
973
      return {
×
UNCOV
974
        operation_status: {
×
UNCOV
975
          code: err.code,
×
UNCOV
976
          message: err.message
×
UNCOV
977
        }
×
UNCOV
978
      };
×
UNCOV
979
    }
×
980
    if (acsResponse.decision != Response_Decision.PERMIT) {
9✔
981
      return { operation_status: acsResponse.operation_status };
1✔
982
    }
1✔
983
    const dispatch = [];
8✔
984
    this.logger.info('Received delete request');
8✔
985
    if ('collection' in request && request.collection) {
9✔
986
      this.logger.verbose('Deleting all jobs');
4✔
987

4✔
988
      await this._getJobList().then(async (jobs) => {
4✔
989
        for (let job of jobs) {
4✔
990
          await job.remove();
10✔
991
          if (this.resourceEventsEnabled) {
10✔
992
            dispatch.push(this.jobEvents.emit('jobsDeleted', { id: job.id }));
10✔
993
          }
10✔
994
          deleteResponse.status.push({
10✔
995
            id: job.id.toString(),
10✔
996
            code: 200,
10✔
997
            message: 'success'
10✔
998
          });
10✔
999
        }
10✔
1000
      });
4✔
1001
      // FLUSH redis DB index 8 used for mapping of repeat jobIds (since req is for dropping job collection)
4✔
1002
      const delResp = await this.repeatJobIdRedisClient.flushDb();
4✔
1003
      if (delResp) {
4✔
1004
        this.logger.debug('Mapped keys for repeatable jobs deleted successfully');
4✔
1005
      } else {
4!
1006
        this.logger.debug('Could not delete repeatable job keys');
×
UNCOV
1007
      }
×
1008
    } else if ('ids' in request) {
4✔
1009
      this.logger.verbose('Deleting jobs by their IDs', { id: request.ids });
4✔
1010

4✔
1011
      for (let queue of this.queuesList) {
4✔
1012
        for (let jobDataKey of request.ids) {
12✔
1013
          let callback: Promise<boolean>;
12✔
1014
          const jobIdData = await this.getRedisValue(jobDataKey as string);
12✔
1015
          // future jobs scheduled with `when` will have same repeatId as external SCS jobID
12✔
1016
          if (jobIdData && jobIdData.repeatId && (jobIdData.repeatId != jobDataKey)) {
12✔
1017
            const jobs = await queue.getRepeatableJobs();
2✔
1018
            for (let job of jobs) {
2✔
1019
              if (job.id === jobDataKey) {
1✔
1020
                this.logger.debug('Removing Repeatable job by key for jobId', { id: job.id });
1✔
1021
                callback = queue.removeRepeatableByKey(job.key);
1✔
1022
                deleteResponse.status.push({
1✔
1023
                  id: job.id,
1✔
1024
                  code: 200,
1✔
1025
                  message: 'success'
1✔
1026
                });
1✔
1027
                await this.deleteRedisKey(jobDataKey as string);
1✔
1028
                await this.deleteRedisKey(jobIdData.repeatId);
1✔
1029
                break;
1✔
1030
              }
1✔
1031
            }
1✔
1032
          } else {
12✔
1033
            callback = queue.getJob(jobDataKey).then(async (jobData) => {
10✔
1034
              if (jobData) {
10✔
1035
                try {
3✔
1036
                  await this._removeBullJob(jobData.id, queue);
3✔
1037
                  await this.deleteRedisKey(jobData.id);
3✔
1038
                  deleteResponse.status.push({
3✔
1039
                    id: jobData.id.toString(),
3✔
1040
                    code: 200,
3✔
1041
                    message: 'success'
3✔
1042
                  });
3✔
1043
                } catch (err) {
3!
1044
                  if (typeof err?.code === 'string') {
×
1045
                    err.code = 500;
×
UNCOV
1046
                  }
×
1047
                  deleteResponse.status.push({
×
UNCOV
1048
                    id: jobData.id.toString(),
×
UNCOV
1049
                    code: err.code,
×
UNCOV
1050
                    message: err.message
×
UNCOV
1051
                  });
×
1052
                  return false;
×
UNCOV
1053
                }
×
1054
                return true;
3✔
1055
              }
3✔
1056
              return false;
7✔
1057
            });
10✔
1058
          }
10✔
1059

12✔
1060
          // since no CB is returned for removeRepeatableByKey by bull
12✔
1061
          if (!callback) {
12✔
1062
            if (this.resourceEventsEnabled) {
1✔
1063
              dispatch.push(this.jobEvents.emit(
1✔
1064
                'jobsDeleted', { id: jobDataKey })
1✔
1065
              );
1✔
1066
            }
1✔
1067
          } else {
12✔
1068
            callback.then(() => {
11✔
1069
              if (this.resourceEventsEnabled) {
11✔
1070
                dispatch.push(this.jobEvents.emit(
11✔
1071
                  'jobsDeleted', { id: jobDataKey })
11✔
1072
                );
11✔
1073
              }
11✔
1074
            }).catch(err => {
11✔
1075
              this.logger.error('Error deleting job', { id: jobDataKey });
×
1076
              if (typeof err?.code === 'number') {
×
1077
                err.code = 500;
×
UNCOV
1078
              }
×
1079
              deleteResponse.status.push({
×
UNCOV
1080
                id: jobDataKey.toString(),
×
UNCOV
1081
                code: err.code,
×
UNCOV
1082
                message: err.message
×
UNCOV
1083
              });
×
1084
            });
11✔
1085
          }
11✔
1086
        }
12✔
1087
      }
12✔
1088
    }
4✔
1089

8✔
1090
    await Promise.all(dispatch);
8✔
1091
    deleteResponse.operation_status = { code: 200, message: 'success' };
8✔
1092
    return deleteResponse;
8✔
1093
  }
8✔
1094

2✔
1095
  /**
2✔
1096
   * Clean up queues - removes complted and failed jobs from queue
2✔
1097
   * @param {any} job clean up job
2✔
1098
   */
2✔
1099
  async cleanupJobs(ttlAfterFinished) {
2✔
1100
    for (let queue of this.queuesList) {
×
1101
      try {
×
1102
        await queue.clean(ttlAfterFinished, 0, COMPLETED_JOB_STATE);
×
1103
        await queue.clean(ttlAfterFinished, 0, FAILED_JOB_STATE);
×
UNCOV
1104
      } catch (err) {
×
1105
        this.logger.error('Error cleaning up jobs', err);
×
UNCOV
1106
      }
×
UNCOV
1107
    }
×
1108
    this.logger.info('Jobs cleaned up successfully');
×
1109
    let lastExecutedInterval = { lastExecutedInterval: (new Date()).toString() };
×
1110
    await this.repeatJobIdRedisClient.set(QUEUE_CLEANUP, JSON.stringify(lastExecutedInterval));
×
UNCOV
1111
  }
×
1112

2✔
1113
  async setupCleanInterval(cleanInterval: number, ttlAfterFinished: number) {
2✔
1114
    if (!ttlAfterFinished) {
×
1115
      ttlAfterFinished = DEFAULT_CLEANUP_COMPLETED_JOBS;
×
UNCOV
1116
    }
×
1117
    const intervalData = await this.getRedisValue(QUEUE_CLEANUP);
×
UNCOV
1118
    let timeInMs, delta;
×
1119
    const now = new Date().getTime();
×
1120
    if (intervalData?.lastExecutedInterval && typeof (intervalData.lastExecutedInterval) === 'string') {
×
1121
      timeInMs = new Date(intervalData.lastExecutedInterval).getTime();
×
1122
      this.logger.debug('Previous execution interval', intervalData);
×
1123
      delta = now - timeInMs;
×
1124
      this.logger.debug('Clean interval and previous difference', { cleanInterval, difference: delta });
×
UNCOV
1125
    }
×
UNCOV
1126

×
1127
    if (delta && (delta < cleanInterval)) {
×
UNCOV
1128
      // use setTimeout and then create interval on setTimeout
×
1129
      this.logger.info('Restoring previous execution interval with set timeout', { time: cleanInterval - delta });
×
1130
      setTimeout(async () => {
×
1131
        await this.cleanupJobs(ttlAfterFinished);
×
1132
        setInterval(this.cleanupJobs.bind(this), cleanInterval, ttlAfterFinished);
×
UNCOV
1133
      }, cleanInterval - delta);
×
UNCOV
1134
    } else {
×
1135
      setInterval(this.cleanupJobs.bind(this), cleanInterval, ttlAfterFinished);
×
1136
      this.logger.info('Clean up job interval set successfully');
×
UNCOV
1137
    }
×
UNCOV
1138
  }
×
1139

2✔
1140
  /**
2✔
1141
   * Reschedules a job - deletes it and recreates it with a new generated ID.
2✔
1142
   */
2✔
1143
  async update(request: JobList, ctx: any): Promise<DeepPartial<JobListResponse>> {
2✔
1144
    let subject = request.subject;
4✔
1145
    // update meta data for owners information
4✔
1146
    await this.createMetadata(request.items, AuthZAction.MODIFY, subject);
4✔
1147
    let acsResponse: DecisionResponse;
4✔
1148
    try {
4✔
1149
      if (!ctx) { ctx = {}; };
4!
1150
      ctx.subject = subject;
4✔
1151
      ctx.resources = request?.items;
4✔
1152
      acsResponse = await checkAccessRequest(ctx,
4✔
1153
        [{ resource: 'job', id: request.items.map(item => item.id) }],
4✔
1154
        AuthZAction.MODIFY, Operation.isAllowed);
4✔
1155
    } catch (err) {
4!
1156
      this.logger.error('Error requesting access-control-srv for update operation', { code: err.code, message: err.message, stack: err.stack });
×
1157
      return {
×
UNCOV
1158
        operation_status: {
×
UNCOV
1159
          code: err.code,
×
UNCOV
1160
          message: err.message
×
UNCOV
1161
        }
×
UNCOV
1162
      };
×
UNCOV
1163
    }
×
1164
    if (acsResponse.decision != Response_Decision.PERMIT) {
4✔
1165
      return { operation_status: acsResponse.operation_status };
1✔
1166
    }
1✔
1167
    if (_.isNil(request) || _.isNil(request.items)) {
4!
1168
      return {
×
UNCOV
1169
        operation_status: {
×
UNCOV
1170
          code: 400,
×
UNCOV
1171
          message: 'Missing items in update request'
×
UNCOV
1172
        }
×
UNCOV
1173
      };
×
UNCOV
1174
    }
×
1175

3✔
1176
    const mappedJobs = request?.items?.reduce((obj, job) => {
4✔
1177
      obj[job.id] = job;
3✔
1178
      return obj;
3✔
1179
    }, {});
4✔
1180

4✔
1181
    const jobData = await this.read(JobReadRequest.fromPartial(
4✔
1182
      {
4✔
1183
        filter: {
4✔
1184
          job_ids: Object.keys(mappedJobs)
4✔
1185
        },
4✔
1186
        subject
4✔
1187
      }
4✔
1188
    ), ctx);
4✔
1189

3✔
1190
    await this.delete(DeleteRequest.fromPartial({
3✔
1191
      ids: Object.keys(mappedJobs),
3✔
1192
      subject
3✔
1193
    }), {});
3✔
1194

3✔
1195
    const result = [];
3✔
1196

3✔
1197
    jobData?.items?.forEach(async (job) => {
4✔
1198
      const mappedJob = mappedJobs[job?.payload?.id];
3✔
1199
      let endJob = {
3✔
1200
        id: mappedJob.id,
3✔
1201
        type: mappedJob.type,
3✔
1202
        queue_name: job?.payload?.queue_name,
3✔
1203
        options: {
3✔
1204
          ...job.payload.options,
3✔
1205
          ...(mappedJob.options ? mappedJob.options : {})
3!
1206
        },
3✔
1207
        data: mappedJob.data || job.payload.data,
3!
1208
        when: mappedJob.when,
3✔
1209
      };
3✔
1210

3✔
1211
      if (endJob.when && endJob.options) {
3✔
1212
        delete endJob.options['delay'];
3✔
1213
      }
3✔
1214

3✔
1215
      result.push(endJob);
3✔
1216
    });
4✔
1217

4✔
1218
    return this.create(JobList.fromPartial({
4✔
1219
      items: result,
4✔
1220
      subject
4✔
1221
    }), ctx);
4✔
1222
  }
4✔
1223

2✔
1224
  /**
2✔
1225
   * Upserts a job - creates a new job if it does not exist or update the
2✔
1226
   * existing one if it already exists.
2✔
1227
   */
2✔
1228
  async upsert(request: JobList, ctx: any): Promise<DeepPartial<JobListResponse>> {
2✔
1229
    let subject = request.subject;
2✔
1230
    await this.createMetadata(request.items, AuthZAction.MODIFY, subject);
2✔
1231
    let acsResponse: DecisionResponse;
2✔
1232
    try {
2✔
1233
      if (!ctx) { ctx = {}; };
2!
1234
      ctx.subject = subject;
2✔
1235
      ctx.resources = request.items;
2✔
1236
      acsResponse = await checkAccessRequest(ctx,
2✔
1237
        [{ resource: 'job', id: request.items.map(item => item.id) }],
2✔
1238
        AuthZAction.MODIFY, Operation.isAllowed);
2✔
1239
    } catch (err) {
2!
1240
      this.logger.error('Error requesting access-control-srv for upsert operation', { code: err.code, message: err.message, stack: err.stack });
×
1241
      return {
×
UNCOV
1242
        operation_status: {
×
UNCOV
1243
          code: err.code,
×
UNCOV
1244
          message: err.message
×
UNCOV
1245
        }
×
UNCOV
1246
      };
×
UNCOV
1247
    }
×
1248

2✔
1249
    if (acsResponse.decision != Response_Decision.PERMIT) {
2✔
1250
      return { operation_status: acsResponse.operation_status };
1✔
1251
    }
1✔
1252
    if (_.isNil(request) || _.isNil(request.items)) {
2!
1253
      return { operation_status: { code: 400, message: 'Missing items in upsert request' } };
×
UNCOV
1254
    }
×
1255

1✔
1256
    let result = [];
1✔
1257

1✔
1258
    for (let eachJob of request.items) {
1✔
1259
      let jobExists = false;
1✔
1260
      let origJobId = _.cloneDeep(eachJob.id);
1✔
1261
      for (let queue of this.queuesList) {
1✔
1262
        const jobIdData = await this.getRedisValue(eachJob.id as string);
2✔
1263
        // future jobs scheduled with `when` will have same repeatId as external SCS jobID
2✔
1264
        if (jobIdData?.repeatId && (jobIdData.repeatId != origJobId)) {
2!
1265
          const repeatId = jobIdData.repeatId;
×
1266
          if (jobIdData?.options?.repeat?.cron && jobIdData?.options?.repeat?.every) {
×
1267
            result.push({
×
UNCOV
1268
              status: {
×
UNCOV
1269
                id: origJobId,
×
UNCOV
1270
                code: 400,
×
UNCOV
1271
                message: 'Both .cron and .every options are defined for this repeatable job'
×
UNCOV
1272
              }
×
UNCOV
1273
            });
×
1274
            continue;
×
UNCOV
1275
          }
×
1276
          const nextMillis = this.getNextMillis(Date.now(), jobIdData.options.repeat);
×
1277
          this.logger.debug('Repeatable job identifier', { id: eachJob.id, repeatId: `repeat:${repeatId}:${nextMillis}` });
×
UNCOV
1278
          // map the repeatKey with nextmilis for bull repeatable jobID
×
1279
          eachJob.id = `repeat:${repeatId}:${nextMillis}`;
×
UNCOV
1280
        }
×
1281
        const jobInst = await queue.getJob(eachJob.id);
2✔
1282
        if (jobInst) {
2✔
1283
          // existing job update it with the given job identifier
1✔
1284
          if (eachJob.id.startsWith('repeat:')) {
1!
1285
            eachJob.id = origJobId;
×
UNCOV
1286
          }
×
1287
          result = [
1✔
1288
            ...result,
1✔
1289
            ...(await this.update(JobList.fromPartial({ items: [eachJob], subject }), ctx)).items
1✔
1290
          ];
1✔
1291
          jobExists = true;
1✔
1292
          break;
1✔
1293
        }
1✔
1294
      }
2✔
1295
      if (!jobExists) {
1!
UNCOV
1296
        // new job create it
×
1297
        result = [
×
UNCOV
1298
          ...result,
×
UNCOV
1299
          ...(await this.create(JobList.fromPartial({ items: [eachJob], subject }), ctx)).items
×
UNCOV
1300
        ];
×
UNCOV
1301
      }
×
1302
    }
1✔
1303

1✔
1304
    return {
1✔
1305
      items: result,
1✔
1306
      total_count: result.length,
1✔
1307
      operation_status: {
1✔
1308
        code: 200,
1✔
1309
        message: 'success'
1✔
1310
      }
1✔
1311
    };
1✔
1312
  }
1✔
1313

2✔
1314
  /**
2✔
1315
   * Clear all job data.
2✔
1316
   */
2✔
1317
  async clear(): Promise<any> {
2✔
1318
    let allJobs: any[] = [];
2✔
1319
    for (let queue of this.queuesList) {
2✔
1320
      allJobs = allJobs.concat(await queue.getJobs(this.bullOptions['allJobTypes']));
6✔
1321
    }
6✔
1322
    return Promise.all(allJobs.map(async (job) => job.remove())).catch(err => {
2✔
1323
      this.logger.error(`Error clearing jobs`, err);
×
1324
      throw err;
×
1325
    });
2✔
1326
  }
2✔
1327

2✔
1328
  async _removeBullJob(jobInstID: string, queue: Queue): Promise<void> {
2✔
1329
    return queue.getJob(jobInstID).then(job => {
3✔
1330
      if (job) {
3✔
1331
        return job.remove();
3✔
1332
      }
3✔
1333
    }).then(() => {
3✔
1334
      this.logger.info(`Job#${jobInstID} removed`);
3✔
1335
    }).catch(err => {
3✔
1336
      this.logger.error(`Error removing job ${jobInstID}`, err);
×
1337
      throw err;
×
1338
    });
3✔
1339
  }
3✔
1340

2✔
1341
  /**
2✔
1342
   *  disable access control
2✔
1343
   */
2✔
1344
  disableAC() {
2✔
1345
    try {
13✔
1346
      this.cfg.set('authorization:enabled', false);
13✔
1347
      updateConfig(this.cfg);
13✔
1348
    } catch (err) {
13!
1349
      this.logger.error('Error caught disabling authorization:', { err });
×
1350
      this.cfg.set('authorization:enabled', this.authZCheck);
×
UNCOV
1351
    }
×
1352
  }
13✔
1353

2✔
1354
  /**
2✔
1355
   *  enables access control
2✔
1356
   */
2✔
1357
  enableAC() {
2✔
1358
    try {
2✔
1359
      this.cfg.set('authorization:enabled', true);
2✔
1360
      updateConfig(this.cfg);
2✔
1361
    } catch (err) {
2!
1362
      this.logger.error('Error caught enabling authorization:', { err });
×
1363
      this.cfg.set('authorization:enabled', this.authZCheck);
×
UNCOV
1364
    }
×
1365
  }
2✔
1366

2✔
1367
  /**
2✔
1368
   *  restore AC state to previous vale either before enabling or disabling AC
2✔
1369
   */
2✔
1370
  restoreAC() {
2✔
1371
    try {
2✔
1372
      this.cfg.set('authorization:enabled', this.authZCheck);
2✔
1373
      updateConfig(this.cfg);
2✔
1374
    } catch (err) {
2!
1375
      this.logger.error('Error caught enabling authorization:', { err });
×
1376
      this.cfg.set('authorization:enabled', this.authZCheck);
×
UNCOV
1377
    }
×
1378
  }
2✔
1379

2✔
1380
  /**
2✔
1381
   * reads meta data from DB and updates owners information in resource if action is UPDATE / DELETE
2✔
1382
   * @param resources list of resources
2✔
1383
   * @param action resource action
2✔
1384
   * @param subject subject name
2✔
1385
   */
2✔
1386
  async createMetadata(resources: any, action: string, subject): Promise<any> {
2✔
1387
    let orgOwnerAttributes: Attribute[] = [];
23✔
1388
    if (resources && !_.isArray(resources)) {
23!
1389
      resources = [resources];
×
UNCOV
1390
    }
×
1391
    const urns = this.cfg.get('authorization:urns');
23✔
1392
    if (subject?.scope && (action === AuthZAction.CREATE || action === AuthZAction.MODIFY)) {
23✔
1393
      // add subject scope as default owners
19✔
1394
      orgOwnerAttributes.push(
19✔
1395
        {
19✔
1396
          id: urns?.ownerIndicatoryEntity,
19✔
1397
          value: urns?.organization,
19✔
1398
          attributes: [{
19✔
1399
            id: urns?.ownerInstance,
19✔
1400
            value: subject?.scope,
19✔
1401
            attributes: []
19✔
1402
          }]
19✔
1403
        });
19✔
1404
    }
19✔
1405

23✔
1406
    if (resources?.length > 0) {
23✔
1407
      for (let resource of resources) {
23✔
1408
        if (!resource.data) {
29✔
1409
          resource.data = { meta: {} };
4✔
1410
        } else if (!resource.data.meta) {
29✔
1411
          resource.data.meta = {};
18✔
1412
        }
18✔
1413
        if (resource?.id && (action === AuthZAction.MODIFY || action === AuthZAction.DELETE)) {
29✔
1414
          let result;
10✔
1415
          try {
10✔
1416
            result = await this.read(JobReadRequest.fromPartial({
10✔
1417
              filter: {
10✔
1418
                job_ids: [resource.id]
10✔
1419
              },
10✔
1420
              subject
10✔
1421
            }), {});
10✔
1422
          } catch (err) {
10!
1423
            if (err.message.startsWith('Error! Jobs not found in any of the queues') && action != AuthZAction.DELETE) {
×
1424
              this.logger.debug('New job should be created', { jobId: resource.id });
×
1425
              result = { items: [] };
×
UNCOV
1426
            } else {
×
1427
              this.logger.error(`Error reading job with resource ID ${resource.id}`, { code: err.code, message: err.message, stack: err.stack });
×
UNCOV
1428
            }
×
UNCOV
1429
          }
×
1430
          // update owners info
10✔
1431
          if (result?.items?.length === 1 && result?.items[0]?.payload) {
10✔
1432
            let item = result.items[0].payload;
8✔
1433
            resource.data.meta.owners = item.data.meta.owners;
8✔
1434
            // adding meta to resource root (needed by access-contorl-srv for owners information check)
8✔
1435
            // meta is inside data of resource since the data is persisted in redis using bull
8✔
1436
            resource.meta = { owners: item.data.meta.owners };
8✔
1437
          } else if ((!result || !result.items || !result.items[0] || !result.items[0].payload) && action === AuthZAction.MODIFY) {
10!
1438
            // job does not exist - create new job (ex: Upsert with action modify)
2✔
1439
            let ownerAttributes = _.cloneDeep(orgOwnerAttributes);
2✔
1440
            // add user as default owners
2✔
1441
            ownerAttributes.push(
2✔
1442
              {
2✔
1443
                id: urns?.ownerIndicatoryEntity,
2✔
1444
                value: urns?.user,
2✔
1445
                attributes: [{
2✔
1446
                  id: urns?.ownerInstance,
2✔
1447
                  value: subject?.id,
2✔
1448
                  attributes: []
2✔
1449
                }]
2✔
1450
              });
2✔
1451
            resource.data.meta.owners = ownerAttributes;
2✔
1452
            resource.meta = { owners: ownerAttributes };
2✔
1453
          }
2✔
1454
        } else if ((action === AuthZAction.CREATE || !resource.id) && !resource.data.meta.owners) {
29!
1455
          let ownerAttributes = _.cloneDeep(orgOwnerAttributes);
16✔
1456
          // add user as default owners
16✔
1457
          if (resource.id) {
16✔
1458
            ownerAttributes.push(
2✔
1459
              {
2✔
1460
                id: urns.ownerIndicatoryEntity,
2✔
1461
                value: urns.user,
2✔
1462
                attributes: [{
2✔
1463
                  id: urns.ownerInstance,
2✔
1464
                  value: subject.id,
2✔
1465
                  attributes: []
2✔
1466
                }]
2✔
1467
              });
2✔
1468
          }
2✔
1469
          resource.data.meta.owners = ownerAttributes;
16✔
1470
          resource.meta = { owners: ownerAttributes };
16✔
1471
        } else if (action === AuthZAction.CREATE && resource?.data?.meta?.owners) {
19✔
1472
          resource.meta = { owners: resource.data.meta.owners };
3✔
1473
        }
3✔
1474
      }
29✔
1475
    }
23✔
1476
    return resources;
23✔
1477
  }
23✔
1478
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc