• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

restorecommerce / scheduling-srv / 6430972057

06 Oct 2023 11:20AM UTC coverage: 67.471% (-0.04%) from 67.506%
6430972057

push

github

Arun-KumarH
fix: SCS fix cron for read operation as its stored as pattern in Redis by bullmq

269 of 449 branches covered (0.0%)

Branch coverage included in aggregate %.

11 of 11 new or added lines in 1 file covered. (100.0%)

654 of 919 relevant lines covered (71.16%)

6.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.68
/src/schedulingService.ts
1
import * as _ from 'lodash';
1✔
2
import { errors } from '@restorecommerce/chassis-srv';
1✔
3
import * as kafkaClient from '@restorecommerce/kafka-client';
4
import { AuthZAction, ACSAuthZ, updateConfig, DecisionResponse, Operation, PolicySetRQResponse } from '@restorecommerce/acs-client';
1✔
5
import {
1✔
6
  JobServiceImplementation as SchedulingServiceServiceImplementation,
7
  JobFailed, JobDone, DeepPartial, JobList, JobListResponse, Data,
8
  Backoff_Type, JobOptions_Priority, JobReadRequest, JobReadRequest_SortOrder
9
} from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/job';
10
import { createClient, RedisClientType } from 'redis';
1✔
11
import { NewJob, Priority } from './types';
1✔
12
import { parseExpression } from 'cron-parser';
1✔
13
import * as crypto from 'crypto';
1✔
14
import { _filterJobData, _filterJobOptions, _filterQueuedJob, checkAccessRequest, marshallProtobufAny } from './utilts';
1✔
15
import * as uuid from 'uuid';
1✔
16
import { Logger } from 'winston';
17
import { Response_Decision } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/access_control';
1✔
18
import { Attribute } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/attribute';
19
import { DeleteRequest, DeleteResponse } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/resource_base';
1✔
20
import { Queue, QueueOptions, Job } from 'bullmq';
1✔
21
import { parseInt } from 'lodash';
1✔
22

23
const JOB_DONE_EVENT = 'jobDone';
1✔
24
const JOB_FAILED_EVENT = 'jobFailed';
1✔
25
const DEFAULT_CLEANUP_COMPLETED_JOBS = 604800000; // 7 days in miliseconds
1✔
26
const COMPLETED_JOB_STATE = 'completed';
1✔
27
const FAILED_JOB_STATE = 'failed';
1✔
28
const QUEUE_CLEANUP = 'queueCleanup';
1✔
29

30
/**
31
 * A job scheduling service.
32
 */
33
export class SchedulingService implements SchedulingServiceServiceImplementation {
1✔
34

35
  jobEvents: kafkaClient.Topic;
36
  logger: Logger;
37

38
  queuesConfigList: any;
39
  queuesList: Queue[];
40
  defaultQueueName: string;
41

42
  redisClient: RedisClientType<any, any>;
43
  resourceEventsEnabled: boolean;
44
  canceledJobs: Set<string>;
45
  bullOptions: any;
46
  cfg: any;
47
  authZ: ACSAuthZ;
48
  authZCheck: boolean;
49
  repeatJobIdRedisClient: RedisClientType<any, any>;
50

51

52
  constructor(jobEvents: kafkaClient.Topic,
53
    private redisConfig: any, logger: any, redisClient: RedisClientType<any, any>,
1✔
54
    bullOptions: any, cfg: any, authZ: ACSAuthZ) {
55
    this.jobEvents = jobEvents;
1✔
56
    this.resourceEventsEnabled = true;
1✔
57
    this.bullOptions = bullOptions;
1✔
58
    this.logger = logger;
1✔
59
    this.queuesList = [];
1✔
60
    this.queuesConfigList = [];
1✔
61
    this.redisClient = redisClient;
1✔
62

63
    const repeatJobIdCfg = cfg.get('redis');
1✔
64
    repeatJobIdCfg.database = cfg.get('redis:db-indexes:db-repeatJobId');
1✔
65
    this.repeatJobIdRedisClient = createClient(repeatJobIdCfg);
1✔
66
    this.repeatJobIdRedisClient.on('error', (err) => logger.error('Redis client error in repeatable job store', err));
1✔
67
    this.repeatJobIdRedisClient.connect().then((data) => {
1✔
68
      logger.info('Redis client connection for repeatable job store successful');
1✔
69
    }).catch(err => logger.error('Redis client error for repeatable job store', err));
×
70

71
    this.canceledJobs = new Set<string>();
1✔
72
    this.cfg = cfg;
1✔
73
    this.authZ = authZ;
1✔
74
    this.authZCheck = this.cfg.get('authorization:enabled');
1✔
75

76
    // Read Queue Configuration file and find first queue which has "default": true,
77
    // then save it to defaultQueueName
78
    const queuesCfg = this.cfg.get('queue');
1✔
79
    if (_.isEmpty(queuesCfg)) {
1!
80
      this.logger.error('Queue configuration not found!');
×
81
      throw new Error('Queue configuration not found!');
×
82
    }
83
    let defaultTrueExists = false;
1✔
84
    for (let queueCfg of queuesCfg) {
1✔
85
      // Find configuration which has default=true
86
      if (queueCfg.default == true) {
1!
87
        defaultTrueExists = true;
1✔
88
        this.defaultQueueName = queueCfg.name;
1✔
89
        break;
1✔
90
      }
91
    }
92
    if (!defaultTrueExists) {
1!
93
      this.logger.error('Queue default configuration not found!');
×
94
      throw new Error('Queue default configuration not found!');
×
95
    }
96

97
    // Create Queues
98
    for (let queueCfg of queuesCfg) {
1✔
99
      let queueOptions: QueueOptions;
100
      const prefix = queueCfg.name;
3✔
101
      const rateLimiting = queueCfg.rateLimiting;
3✔
102
      const advancedSettings = queueCfg.advancedSettings;
3✔
103

104
      queueOptions = {
3✔
105
        connection: {
106
          ...redisConfig,
107
        }
108
      };
109

110
      // Create Queue Configuration - Add Rate Limiting if enabled
111
      if (!_.isEmpty(rateLimiting) && rateLimiting.enabled == true) {
3!
112
        this.logger.info(`Queue: ${queueCfg.name} - Rate limiting is ENABLED.`);
×
113
      }
114

115
      if (!_.isEmpty(advancedSettings)) {
3!
116
        queueOptions.settings = {
3✔
117
          ...advancedSettings,
118
        };
119
      }
120

121
      const redisURL = new URL((queueOptions.connection as any).url);
3✔
122

123
      if ('keyPrefix' in queueOptions.connection) {
3!
124
        delete queueOptions.connection.keyPrefix;
×
125
      }
126

127
      let queue = new Queue(prefix, {
3✔
128
        ...queueOptions,
129
        connection: {
130
          ...queueOptions.connection as any,
131
          host: redisURL.hostname,
132
          port: parseInt(redisURL.port)
133
        }
134
      });
135
      this.queuesList.push(queue);
3✔
136

137
      // Add Queue Configurations
138
      let queueCfgObj = {
3✔
139
        name: queueCfg.name,
140
        concurrency: queueCfg.concurrency,
141
        default: queueCfg.default,
142
        runMissedScheduled: queueCfg.runMissedScheduled
143
      };
144
      this.queuesConfigList.push(queueCfgObj);
3✔
145
    }
146
  }
147

148
  /**
149
   * Start handling the job queue, job scheduling and
150
   * managing job events.
151
   */
152
  async start(): Promise<any> {
153
    const logger = this.logger;
1✔
154
    const that = this;
1✔
155
    const events = [JOB_DONE_EVENT, JOB_FAILED_EVENT];
1✔
156
    for (let eventName of events) {
1✔
157
      // A Scheduling Service Event Listener
158
      await this.jobEvents.on(eventName, async (msg: any, ctx: any,
2✔
159
        config: any, eventName: string): Promise<any> => {
160
        let job = msg;
6✔
161
        // Match Job Type to Queue Name, else use Default Queue
162
        let queue = _.find(this.queuesList, { name: job.type });
6✔
163
        let defaultQueue = _.find(this.queuesList, { name: this.defaultQueueName });
6✔
164
        if (_.isEmpty(queue)) {
6!
165
          queue = defaultQueue;
×
166
        }
167

168
        if (eventName === JOB_FAILED_EVENT) {
6!
169
          logger.error(`job@${job.type}#${job.id} failed with error #${job.error}`,
×
170
            _filterQueuedJob<JobFailed>(job, this.logger));
171
        } else if (eventName === JOB_DONE_EVENT) {
6!
172
          logger.verbose(`job#${job.id} done`, _filterQueuedJob<JobDone>(job, this.logger));
6✔
173
        }
174

175
        logger.info('Received Job event', { event: eventName });
6✔
176
        logger.info('Job details', job);
6✔
177
        const jobData: any = await queue.getJob(job.id).catch(error => {
6✔
178
          that.logger.error('Error retrieving job ${job.id} from queue', error);
×
179
        });
180

181
        if (job?.delete_scheduled) {
6✔
182
          await queue.removeRepeatable(jobData.name, jobData.opts.repeat);
1✔
183
        }
184
      });
185
    }
186

187
    // Initialize Event Listeners for each Queue
188
    for (let queue of this.queuesList) {
1✔
189
      queue.on('error', (error) => {
3✔
190
        logger.error('queue error', error);
×
191
      });
192
      queue.on('waiting', (job) => {
3✔
193
        logger.verbose(`job#${job.id} scheduled`, job);
9✔
194
      });
195
      queue.on('removed', (job) => {
3✔
196
        logger.verbose(`job#${job.id} removed`, job);
13✔
197
      });
198
      queue.on('progress', (job) => {
3✔
199
        logger.verbose(`job#${job.id} progress`, job);
×
200
      });
201
    }
202

203
    // If the scheduling service goes down and if there were
204
    // recurring jobs which have missed schedules then
205
    // we will need to reschedule it for those missing intervals.
206
    await this._rescheduleMissedJobs();
1✔
207
  }
208

209
  /**
210
   * To reschedule the missed recurring jobs upon service restart
211
   */
212
  async _rescheduleMissedJobs(): Promise<void> {
213
    // for jobs created via Kafka currently there are no acs checks
214
    this.disableAC();
1✔
215
    const createDispatch = [];
1✔
216
    let result: Job[] = [];
1✔
217
    let thiz = this;
1✔
218

219
    // Get the jobs
220
    for (let queueCfg of this.queuesConfigList) {
1✔
221
      // If enabled in the config, or the config is missing,b
222
      // Reschedule the missed jobs, else skip.
223
      let queue = _.find(this.queuesList, { name: queueCfg.name });
3✔
224
      let runMissedScheduled = queueCfg.runMissedScheduled;
3✔
225
      if (_.isNil(runMissedScheduled) ||
3!
226
        (!_.isNil(runMissedScheduled) && runMissedScheduled == true)) {
227
        await queue.getJobs(this.bullOptions['activeAndFutureJobTypes']).then(jobs => {
3✔
228
          result = result.concat(jobs);
3✔
229
        }).catch(error => {
230
          thiz.logger.error('Error reading jobs to reschedule the missed recurring jobs', error);
×
231
        });
232
      }
233
    }
234
    let lastRunTime;
235
    for (let job of result) {
1✔
236
      // get the last run time for the job, we store the last run time only
237
      // for recurring jobs
238
      if (job?.name) {
×
239
        try {
×
240
          lastRunTime = await this.redisClient.get(job.name);
×
241
        } catch (err) {
242
          this.logger.error(
×
243
            'Error occurred reading the last run time for job type:',
244
            { name: job.name, err });
245
        }
246
      }
247
      // we store lastRunTime only for recurring jobs and if it exists check
248
      // cron interval and schedule immediate jobs for missed intervals
249
      this.logger.info(`Last run time of ${job.name} Job was:`, lastRunTime);
×
250
      if (lastRunTime) {
×
251
        // convert redis string value to object and get actual time value
252
        try {
×
253
          lastRunTime = JSON.parse(lastRunTime);
×
254
        } catch (error) {
255
          this.logger.error('Error parsing lastRunTime', {
×
256
            code: error.code,
257
            message: error.message, stack: error.stack
258
          });
259
        }
260

261
        if ((job?.opts?.repeat as any)?.pattern && lastRunTime?.time) {
×
262
          let options = {
×
263
            currentDate: new Date(lastRunTime.time),
264
            endDate: new Date(),
265
            iterator: true
266
          };
267
          let intervalTime;
268
          try {
×
269
            intervalTime =
×
270
              parseExpression((job.opts.repeat as any).pattern, options);
271
          } catch (error) {
272
            this.logger.error('Error parsing cron expression running missed schedules', { code: error.code, message: error.message, stack: error.stack });
×
273
          }
274
          while (intervalTime?.hasNext()) {
×
275
            let nextInterval: any = intervalTime.next();
×
276
            const nextIntervalTime = nextInterval.value.toString();
×
277
            // schedule it as one time job for now or immediately
278
            const data = {
×
279
              payload: marshallProtobufAny({
280
                value: { time: nextIntervalTime }
281
              })
282
            };
283
            const currentTime = new Date();
×
284
            const when = new Date(currentTime.setSeconds(currentTime.getSeconds() + 2)).toISOString();
×
285
            const immediateJob: any = {
×
286
              type: job.name,
287
              data,
288
              // give a delay of 2 sec between each job
289
              // to avoid time out of queued jobs
290
              when,
291
              options: {}
292
            };
293
            createDispatch.push(thiz.create({
×
294
              items: [immediateJob],
295
              total_count: 0,
296
            }, {}));
297
          }
298
        }
299
      }
300
    }
301
    this.restoreAC();
1✔
302
    await createDispatch;
1✔
303
  }
304

305
  /**
306
   * Disabling of CRUD events.
307
   */
308
  disableEvents(): void {
309
    this.resourceEventsEnabled = false;
×
310
  }
311

312
  /**
313
   * Enabling of CRUD events.
314
   */
315
  enableEvents(): any {
316
    this.resourceEventsEnabled = true;
×
317
  }
318

319
  _validateJob(job: NewJob): NewJob {
320
    if (_.isNil(job.type)) {
11!
321
      throw new errors.InvalidArgument('Job type not specified.');
×
322
    }
323

324
    if (!job.options) {
11!
325
      job.options = {};
×
326
    }
327

328
    if (job.when) {
11✔
329
      // If the jobSchedule time has already lapsed then do not schedule the job
330
      const jobScheduleTime = new Date(job.when).getTime();
7✔
331
      const currentTime = new Date().getTime();
7✔
332
      if (jobScheduleTime < currentTime) {
7!
333
        throw new errors.InvalidArgument('Cannot schedule a job for an elapsed time');
×
334
      }
335

336
      job.options.delay = jobScheduleTime - currentTime;
7✔
337
    }
338

339
    if (job.options.backoff && typeof job.options.backoff !== 'number') {
11!
340
      if (typeof job.options.backoff.type === 'number') {
11!
341
        job.options.backoff.type = Object.keys(Backoff_Type)[job.options.backoff.type];
×
342
      }
343
      job.options.backoff.type = job.options.backoff.type.toLowerCase();
11✔
344
    }
345

346
    if (job.options.priority && typeof job.options.priority === 'string') {
11!
347
      job.options.priority = JobOptions_Priority[job.options.priority] as any;
11✔
348
    }
349

350
    if (_.isEmpty(job.data)) {
11!
351
      throw new errors.InvalidArgument('No job data specified.');
×
352
    }
353

354
    job.data = _filterJobData(job.data, false, this.logger);
11✔
355

356
    return job;
11✔
357
  }
358

359
  /**
360
   * get next job execution time in mili seconds
361
   * @param millis
362
   * @param opts
363
   */
364
  getNextMillis(millis, opts) {
365
    if (opts?.every) {
1!
366
      return Math.floor(millis / opts.every) * opts.every + opts.every;
×
367
    }
368

369
    const currentDate =
370
      opts?.startDate && new Date(opts.startDate) > new Date(millis)
1!
371
        ? new Date(opts.startDate)
372
        : new Date(millis);
373
    const interval = parseExpression(
1✔
374
      opts.cron,
375
      _.defaults(
376
        {
377
          currentDate
378
        },
379
        opts
380
      )
381
    );
382

383
    try {
1✔
384
      return interval.next().getTime();
1✔
385
    } catch (e) {
386
      this.logger.error('Error getting next job execution time');
×
387
    }
388
  }
389

390
  private md5(str) {
391
    return crypto
×
392
      .createHash('md5')
393
      .update(str)
394
      .digest('hex');
395
  }
396

397
  /**
398
   * store the mapping from repeateKey to external interface SCS job Id, and
399
   * also the mapping other way around i.e. from SCS job Id to repeatKey (needed for read operations)
400
   * @param name - job name
401
   * @param repeat - job repeate options
402
   * @param jobId - job id
403
   */
404
  async storeRepeatKey(repeatId, scsJobId, options) {
405
    try {
11✔
406
      if (repeatId && scsJobId) {
11!
407
        this.logger.info('Repeat key mapped to external SCS JobId', { repeatId, scsJobId });
11✔
408
        await this.repeatJobIdRedisClient.set(repeatId, scsJobId);
11✔
409
        const jobIdData = { repeatId, options };
11✔
410
        await this.repeatJobIdRedisClient.set(scsJobId, JSON.stringify(jobIdData));
11✔
411
      }
412
    } catch (error) {
413
      this.logger.error('Error storing repeatKey to redis', {
×
414
        code: error.code,
415
        message: error.message, stack: error.stack
416
      });
417
    }
418
  }
419

420
  private idGen(): string {
421
    return uuid.v4().replace(/-/g, '');
8✔
422
  }
423

424
  /**
425
   * Create and queue jobs.
426
   * @param {any} call RPC call argument
427
   * @param {any} ctx RPC context
428
   */
429
  async create(request: JobList, ctx: any): Promise<DeepPartial<JobListResponse>> {
430
    let jobListResponse: JobListResponse = { items: [], operation_status: { code: 0, message: '' }, total_count: 0 };
9✔
431
    let subject = request.subject;
9✔
432
    if (_.isNil(request) || _.isNil(request.items)) {
9!
433
      return {
×
434
        items: [],
435
        total_count: 0,
436
        operation_status: {
437
          code: 400,
438
          message: 'Missing items in create request'
439
        }
440
      };
441
    }
442

443
    await this.createMetadata(request.items, AuthZAction.CREATE, subject);
9✔
444
    let acsResponse: DecisionResponse;
445
    try {
9✔
446
      if (!ctx) { ctx = {}; };
9!
447
      ctx.subject = subject;
9✔
448
      ctx.resources = request?.items;
9✔
449
      acsResponse = await checkAccessRequest(ctx, [{
9✔
450
        resource: 'job',
451
        id: request.items.map(item => item.id)
12✔
452
      }], AuthZAction.CREATE, Operation.isAllowed);
453
    } catch (err) {
454
      this.logger.error('Error occurred requesting access-control-srv:', err);
×
455
      return {
×
456
        items: [],
457
        total_count: 0,
458
        operation_status: {
459
          code: err.code,
460
          message: err.message
461
        }
462
      };
463
    }
464
    if (acsResponse.decision != Response_Decision.PERMIT) {
9✔
465
      return { items: [], total_count: 0, operation_status: acsResponse.operation_status };
1✔
466
    }
467

468
    let jobs: NewJob[] = [];
8✔
469
    for (let job of request?.items || []) {
8!
470
      try {
11✔
471
        jobs.push(this._validateJob(job as any));
11✔
472
      } catch (err) {
473
        this.logger.error('Error validating job', job);
×
474
        jobListResponse.items.push({
×
475
          status: {
476
            id: job.id,
477
            code: 400,
478
            message: err.message
479
          }
480
        });
481
      }
482
    }
483

484
    let result: Job[] = [];
8✔
485
    // Scheduling jobs
486
    for (let i = 0; i < jobs.length; i += 1) {
8✔
487
      let job = jobs[i];
11✔
488
      // if not jobID is specified generate a UUID
489
      if (!job.id) {
11✔
490
        job.id = this.idGen();
8✔
491
      }
492

493
      // map the id to jobId as needed in JobOpts for bull
494
      if (job?.id) {
11!
495
        // check if jobID already exists then map it as already exists error
496
        const existingJobId = await this.getRedisValue(job.id);
11✔
497
        if (existingJobId) {
11!
498
          // read job to check if data exists
499
          const jobData = await this.read(JobReadRequest.fromPartial({
×
500
            filter: {
501
              job_ids: [job.id]
502
            },
503
            subject
504
          }), ctx);
505
          if ((jobData?.items as any)[0]?.payload) {
×
506
            jobListResponse.items.push({
×
507
              status: {
508
                id: job.id,
509
                code: 403,
510
                message: `Job with ID ${job.id} already exists`
511
              }
512
            });
513
            continue;
×
514
          }
515
        }
516
        if (!job?.options) {
11!
517
          job.options = { jobId: job.id };
×
518
        } else {
519
          job.options.jobId = job.id;
11✔
520
        }
521
        if (job?.options?.repeat) {
11✔
522
          (job as any).options.repeat.jobId = job.id;
2✔
523
        }
524
      }
525

526
      if (!job.data.meta) {
11!
527
        const now = new Date();
×
528
        const metaObj = {
×
529
          created: now,
530
          modified: now,
531
          modified_by: '',
532
          owners: []
533
        };
534
        Object.assign(job.data, { meta: metaObj });
×
535
      }
536
      // if only owners are specified in meta
537
      if (job.data.meta && (!job.data.meta.created || !job.data.meta.modified)) {
11✔
538
        job.data.meta.created = new Date();
9✔
539
        job.data.meta.modified = new Date();
9✔
540
      }
541

542
      if (job?.data?.payload?.value) {
11!
543
        job.data.payload.value = job.data.payload.value.toString() as any;
11✔
544
      }
545

546
      // convert enum priority back to number as it's expected by bull
547
      if (job?.options?.priority) {
11!
548
        job.options.priority = typeof job.options.priority === 'number' ? job.options.priority : Priority[job.options.priority] as unknown as number;
11!
549
      }
550

551
      // if its a repeat job and tz is empty delete the key (else cron parser throws an error)
552
      if (job?.options?.repeat?.tz === '') {
11!
553
        delete job.options.repeat.tz;
×
554
      }
555

556
      const bullOptions = {
11✔
557
        ...job.options
558
      };
559

560
      if ((bullOptions as any).timeout === 1) {
11✔
561
        delete bullOptions['timeout'];
2✔
562
      }
563

564
      // Match the Job Type with the Queue Name and add the Job to this Queue.
565
      // If there is no match, add the Job to the Default Queue
566
      let queue = _.find(this.queuesList, { name: job.type });
11✔
567
      if (_.isEmpty(queue)) {
11!
568
        queue = _.find(this.queuesList, { name: this.defaultQueueName });
×
569
      }
570
      const submittedJob = await queue.add(job.type, job.data, bullOptions);
11✔
571
      if (submittedJob?.id?.startsWith('repeat:')) {
11✔
572
        const repeatJobId = submittedJob?.id?.split(':')[1];
2✔
573
        await this.storeRepeatKey(repeatJobId, job.id, job.options);
2✔
574
      } else if (submittedJob?.id) {
9!
575
        // future job with when
576
        await this.storeRepeatKey(submittedJob.id, job.id, job.options);
9✔
577
      }
578
      this.logger.verbose(`job@${job.type} created`, job);
11✔
579
      result.push(submittedJob);
11✔
580
    }
581

582
    for (let job of result) {
8✔
583
      let jobId = job.id as string;
11✔
584
      if (jobId.startsWith('repeat:')) {
11✔
585
        const repeatKey = jobId.split(':')[1];
2✔
586
        job.id = await this.getRedisValue(repeatKey);
2✔
587
      }
588
    }
589

590
    for (let job of result) {
8✔
591
      let when = job?.opts?.delay ? new Date(job?.opts?.delay).toString() : '';
11✔
592
      jobListResponse.items.push({
11✔
593
        payload: {
594
          id: job.id as string,
595
          type: job.name,
596
          data: _filterJobData(job.data, true, this.logger),
597
          options: _filterJobOptions(job.opts) as any,
598
          when
599
        },
600
        status: {
601
          id: (job?.id)?.toString(),
602
          code: 200,
603
          message: 'success'
604
        }
605
      });
606
    }
607
    const jobList = {
8✔
608
      items: result.map(job => ({
11✔
609
        id: job.id,
610
        type: job.name,
611
        data: _filterJobData(job.data, true, this.logger),
612
        options: _filterJobOptions(job.opts)
613
      })),
614
      total_count: result.length
615
    };
616

617
    if (this.resourceEventsEnabled) {
8!
618
      await this.jobEvents.emit('jobsCreated', jobList);
8✔
619
    }
620

621
    jobListResponse.operation_status = { code: 200, message: 'success' };
8✔
622
    return jobListResponse;
8✔
623
  }
624

625
  private filterByOwnerShip(customArgsObj, result) {
626
    // applying filter based on custom arguments (filterByOwnerShip)
627
    let customArgs = (customArgsObj)?.custom_arguments;
24✔
628
    if (customArgs?.value) {
24!
629
      let customArgsFilter;
630
      try {
24✔
631
        customArgsFilter = JSON.parse(customArgs.value.toString());
24✔
632
      } catch (error) {
633
        this.logger.error('Error parsing custom query arguments', {
×
634
          code: error.code,
635
          message: error.message, stack: error.stack
636
        });
637
      }
638
      if (!customArgsFilter) {
24!
639
        return [];
×
640
      }
641
      const ownerIndicatorEntity = customArgsFilter.entity;
24✔
642
      const ownerValues = customArgsFilter.instance;
24✔
643
      const ownerIndictaorEntURN = this.cfg.get('authorization:urns:ownerIndicatoryEntity');
24✔
644
      const ownerInstanceURN = this.cfg.get('authorization:urns:ownerInstance');
24✔
645
      result = result.filter(job => {
24✔
646
        if (job?.data?.meta?.owners?.length > 0) {
36!
647
          for (let owner of job.data.meta.owners) {
36✔
648
            if (owner?.id === ownerIndictaorEntURN && owner?.value === ownerIndicatorEntity && owner?.attributes?.length > 0) {
36!
649
              for (let ownerInstObj of owner.attributes) {
36✔
650
                if (ownerInstObj?.id === ownerInstanceURN && ownerInstObj?.value && ownerValues.includes(ownerInstObj.value)) {
36!
651
                  return job;
36✔
652
                }
653
              }
654
            }
655
          }
656
        }
657
      });
658
    }
659
    return result;
24✔
660
  }
661

662
  async deleteRedisKey(key: string): Promise<any> {
663
    try {
4✔
664
      await this.repeatJobIdRedisClient.del(key);
4✔
665
      this.logger.debug('Redis Key deleted successfully used for mapping repeatable jobID', { key });
4✔
666
    } catch (err) {
667
      this.logger.error(
×
668
        'Error occurred deleting redis key',
669
        { key, msg: err.message });
670
    }
671
  }
672

673
  async getRedisValue(key: string): Promise<any> {
674
    let redisValue;
675
    try {
46✔
676
      if (key) {
46!
677
        redisValue = await this.repeatJobIdRedisClient.get(key);
46✔
678
      }
679
      if (redisValue) {
46✔
680
        return JSON.parse(redisValue);
33✔
681
      } else {
682
        return;
13✔
683
      }
684
    } catch (err) {
685
      if (err?.message?.startsWith('Unexpected token') || err.message.startsWith('Unexpected number')) {
13!
686
        return redisValue;
13✔
687
      } else {
688
        this.logger.error(
×
689
          'Error occurred reading redis key',
690
          { key, msg: err.message });
691
      }
692
    }
693
  }
694

695

696
  /**
697
   * Retrieve jobs.
698
   * @param {any} call RPC call argument
699
   * @param {any} ctx RPC context
700
   */
701
  async read(request: JobReadRequest, ctx: any): Promise<DeepPartial<JobListResponse>> {
702
    let jobListResponse: JobListResponse = { items: [], operation_status: { code: 0, message: '' }, total_count: 0 };
27✔
703
    let subject = request.subject;
27✔
704
    let acsResponse: PolicySetRQResponse;
705
    try {
27✔
706
      if (!ctx) { ctx = {}; };
27!
707
      ctx.subject = subject;
27✔
708
      ctx.resources = [];
27✔
709
      acsResponse = await checkAccessRequest(ctx, [{ resource: 'job' }], AuthZAction.READ,
27✔
710
        Operation.whatIsAllowed) as PolicySetRQResponse;
711
    } catch (err) {
712
      this.logger.error('Error occurred requesting access-control-srv:', err);
×
713
      return {
×
714
        operation_status: {
715
          code: err.code,
716
          message: err.message
717
        }
718
      };
719
    }
720
    if (acsResponse.decision != Response_Decision.PERMIT) {
27✔
721
      return { operation_status: acsResponse.operation_status };
3✔
722
    }
723

724
    let result: Job[] = [];
24✔
725
    if (_.isEmpty(request) || _.isEmpty(request.filter)
24!
726
      && (!request.filter || !request.filter.job_ids
727
        || _.isEmpty(request.filter.job_ids))
728
      && (!request.filter || !request.filter.type ||
729
        _.isEmpty(request.filter.type))) {
730
      result = await this._getJobList();
12✔
731
      let custom_arguments;
732
      if (acsResponse?.custom_query_args?.length > 0) {
12!
733
        custom_arguments = acsResponse.custom_query_args[0].custom_arguments;
12✔
734
      }
735
      result = this.filterByOwnerShip({ custom_arguments }, result);
12✔
736
    } else {
737
      const that = this;
12✔
738
      let jobIDs = request.filter.job_ids || [];
12✔
739
      if (!_.isArray(jobIDs)) {
12!
740
        jobIDs = [jobIDs];
×
741
      }
742
      const typeFilterName = request.filter.type;
12✔
743

744
      // Search in all the queues and retrieve jobs after JobID
745
      // and add them to the jobIDsCopy list.
746
      // If the job is not found in any of the queues, continue looking
747
      // Finally compare the two lists and add an error to status for which
748
      // job could not be found in any of the queues.
749
      if (jobIDs.length > 0) {
12✔
750
        // jobIDsCopy should contain the jobIDs duplicate values
751
        // after the for loop ends
752
        let jobIDsCopy: string[] = [];
11✔
753
        for (let jobID of jobIDs) {
11✔
754
          const jobIdData = await this.getRedisValue(jobID as string);
11✔
755
          // future jobs scheduled with `when` will have same repeatId as external SCS jobID
756
          if (jobIdData?.repeatId && (jobIdData.repeatId != jobID)) {
11✔
757
            const repeatId = jobIdData.repeatId;
1✔
758
            if (jobIdData?.options?.repeat?.cron && jobIdData?.options?.repeat?.every) {
1!
759
              jobListResponse.items.push({
×
760
                status: {
761
                  id: jobID.toString(),
762
                  code: 400,
763
                  message: 'Both .cron and .every options are defined for this repeatable job'
764
                }
765
              });
766
              continue;
×
767
            }
768
            const nextMillis = this.getNextMillis(Date.now(), jobIdData.options.repeat);
1✔
769
            this.logger.debug('Repeatable job identifier', { id: jobID, repeatId: `repeat:${repeatId}:${nextMillis}` });
1✔
770
            // map the repeatKey with nextmilis for bull repeatable jobID
771
            jobID = `repeat:${repeatId}:${nextMillis}`;
1✔
772
          }
773
          for (let queue of this.queuesList) {
11✔
774
            await new Promise((resolve, reject) => {
33✔
775
              // getJob returns job or null
776
              queue.getJob(jobID).then(job => {
33✔
777
                if (job?.opts?.repeat?.pattern) {
33✔
778
                  (job.opts.repeat as any).cron = job.opts.repeat.pattern;
1✔
779
                  delete job.opts.repeat.pattern;
1✔
780
                }
781
                resolve(job);
33✔
782
                if (!_.isEmpty(job)) {
33✔
783
                  result.push(job);
10✔
784
                  if ((job as any)?.opts?.repeat?.jobId) {
10✔
785
                    jobIDsCopy.push((job as any).opts.repeat.jobId);
1✔
786
                  } else {
787
                    jobIDsCopy.push(jobID);
9✔
788
                  }
789
                }
790
              }).catch(err => {
791
                that.logger.error(`Error reading job ${jobID}`, err);
×
792
                if (err?.code && typeof err.code === 'string') {
×
793
                  err.code = 500;
×
794
                }
795
                jobListResponse.items.push({
×
796
                  status: {
797
                    id: jobID.toString(),
798
                    code: err.code,
799
                    message: err.message
800
                  }
801
                });
802
              });
803
            });
804
          }
805
        }
806
        if (!_.isEqual(jobIDs.sort(), jobIDsCopy.sort())) {
11✔
807
          const jobIDsDiff = _.difference(jobIDs, jobIDsCopy);
1✔
808
          for (let jobId of jobIDsDiff) {
1✔
809
            jobListResponse.items.push({
1✔
810
              status: {
811
                id: jobId.toString(),
812
                code: 404,
813
                message: `Job ID ${jobId} not found in any of the queues`
814
              }
815
            });
816
          }
817
        }
818
      } else {
819
        try {
1✔
820
          let jobsList: any[] = [];
1✔
821
          for (let queue of this.queuesList) {
1✔
822
            const getJobsResult = await queue.getJobs(this.bullOptions['activeAndFutureJobTypes']);
3✔
823
            getJobsResult.forEach((job) => {
3✔
824
              if (job?.opts?.repeat?.pattern) {
5!
825
                (job.opts.repeat as any).cron = job.opts.repeat.pattern;
×
826
                delete job.opts.repeat.pattern;
×
827
              }
828
            });
829
            jobsList = jobsList.concat(getJobsResult);
3✔
830
          }
831
          result = jobsList;
1✔
832
        } catch (err) {
833
          that.logger.error('Error reading jobs', err);
×
834
          if (typeof err.code === 'string') {
×
835
            err.code = 500;
×
836
          }
837
          return {
×
838
            operation_status: {
839
              code: err.code,
840
              message: err.message
841
            }
842
          };
843
        }
844
      }
845

846
      if (typeFilterName) {
12✔
847
        result = result.filter(job => job.name === typeFilterName);
6✔
848
      }
849
      let custom_arguments;
850
      if (acsResponse?.custom_query_args?.length > 0) {
12!
851
        custom_arguments = acsResponse.custom_query_args[0].custom_arguments;
12✔
852
      }
853
      result = this.filterByOwnerShip({ custom_arguments }, result);
12✔
854
    }
855

856
    result = result.filter(valid => !!valid);
36✔
857

858
    if (!_.isEmpty(request) && !_.isEmpty(request.sort)
24✔
859
      && _.includes(['ASCENDING', 'DESCENDING'], request.sort)) {
860
      let sort;
861
      switch (request.sort) {
2!
862
        case JobReadRequest_SortOrder.DESCENDING:
863
          sort = 'desc';
1✔
864
          break;
1✔
865
        case JobReadRequest_SortOrder.ASCENDING:
866
          sort = 'asc';
1✔
867
          break;
1✔
868
        default:
869
          this.logger.error(`Unknown sort option ${sort}`);
×
870
      }
871
      result = _.orderBy(result, ['id'], [sort]);
2✔
872
    }
873

874
    for (let job of result) {
24✔
875
      let jobId = job.id as string;
36✔
876
      if (jobId.startsWith('repeat:')) {
36✔
877
        const repeatKey = jobId.split(':')[1];
11✔
878
        // it could be possible the redis repeat key is deleted on index 8 and old completed
879
        // jobs exist in data_store if delete on complete was not set to true for repeatable jobs
880
        const jobRedisId = await this.getRedisValue(repeatKey);
11✔
881
        if (jobRedisId) {
11!
882
          job.id = jobRedisId;
11✔
883
        }
884
      }
885
    }
886

887
    for (let job of result) {
24✔
888
      let when = job?.opts?.delay ? new Date(job?.opts?.delay).toString() : '';
36!
889
      jobListResponse.items.push({
36✔
890
        payload: {
891
          id: job.id as string,
892
          type: job.name,
893
          data: _filterJobData(job.data, true, this.logger),
894
          options: _filterJobOptions(job.opts) as any,
895
          when
896
        },
897
        status: {
898
          id: job.id.toString(),
899
          code: 200,
900
          message: 'success'
901
        }
902
      });
903
    }
904
    jobListResponse.total_count = jobListResponse?.items?.length;
24✔
905
    jobListResponse.operation_status = {
24✔
906
      code: 200,
907
      message: 'success'
908
    };
909
    return jobListResponse;
24✔
910
  }
911

912
  async _getJobList(): Promise<Job[]> {
913
    let jobsList: any[] = [];
14✔
914
    for (let queue of this.queuesList) {
14✔
915
      const getJobsResult = await queue.getJobs(this.bullOptions['activeAndFutureJobTypes']);
42✔
916
      getJobsResult.forEach((job) => {
42✔
917
        if (job?.opts?.repeat?.pattern) {
26!
918
          (job.opts.repeat as any).cron = job.opts.repeat.pattern;
×
919
          delete job.opts.repeat.pattern;
×
920
        }
921
      });
922
      jobsList = jobsList.concat(getJobsResult);
42✔
923
    }
924
    return jobsList;
14✔
925
  }
926

927
  // delete a job by its job instance after processing 'jobDone' / 'jobFailed'
928
  async _deleteJobInstance(jobId: string, queue: Queue): Promise<void> {
929
    return this._removeBullJob(jobId, queue);
×
930
  }
931

932
  /**
933
   * Delete Job from queue.
934
   */
935
  async delete(request: DeleteRequest, ctx: any): Promise<DeepPartial<DeleteResponse>> {
936
    let deleteResponse: DeleteResponse = { status: [], operation_status: { code: 0, message: '' } };
6✔
937
    if (_.isEmpty(request)) {
6!
938
      return {
×
939
        operation_status: {
940
          code: 400,
941
          message: 'No arguments provided for delete operation'
942
        }
943
      };
944
    }
945
    const subject = request?.subject;
6✔
946
    const jobIDs = request?.ids;
6✔
947
    let resources = [];
6✔
948
    let action;
949
    if (jobIDs) {
6✔
950
      action = AuthZAction.DELETE;
3✔
951
      if (_.isArray(jobIDs)) {
3!
952
        for (let id of jobIDs) {
3✔
953
          resources.push({ id });
3✔
954
        }
955
      } else {
956
        resources = [{ id: jobIDs }];
×
957
      }
958
      await this.createMetadata(resources, action, subject);
3✔
959
    }
960
    if (request.collection) {
6✔
961
      action = AuthZAction.DROP;
3✔
962
      resources = [{ collection: request.collection }];
3✔
963
    }
964
    let acsResponse: DecisionResponse;
965
    try {
6✔
966
      if (!ctx) { ctx = {}; };
6!
967
      ctx.subject = subject;
6✔
968
      ctx.resources = resources;
6✔
969
      acsResponse = await checkAccessRequest(ctx, [{ resource: 'job', id: jobIDs as string[] }], action,
6✔
970
        Operation.isAllowed);
971
    } catch (err) {
972
      this.logger.error('Error occurred requesting access-control-srv:', err);
×
973
      return {
×
974
        operation_status: {
975
          code: err.code,
976
          message: err.message
977
        }
978
      };
979
    }
980
    if (acsResponse.decision != Response_Decision.PERMIT) {
6✔
981
      return { operation_status: acsResponse.operation_status };
1✔
982
    }
983
    const dispatch = [];
5✔
984
    this.logger.info('Received delete request');
5✔
985
    if ('collection' in request && request.collection) {
5✔
986
      this.logger.verbose('Deleting all jobs');
2✔
987

988
      await this._getJobList().then(async (jobs) => {
2✔
989
        for (let job of jobs) {
2✔
990
          await job.remove();
5✔
991
          if (this.resourceEventsEnabled) {
5!
992
            dispatch.push(this.jobEvents.emit('jobsDeleted', { id: job.id }));
5✔
993
          }
994
          deleteResponse.status.push({
5✔
995
            id: job.id.toString(),
996
            code: 200,
997
            message: 'success'
998
          });
999
        }
1000
      });
1001
      // FLUSH redis DB index 8 used for mapping of repeat jobIds (since req is for dropping job collection)
1002
      const delResp = await this.repeatJobIdRedisClient.flushDb();
2✔
1003
      if (delResp) {
2!
1004
        this.logger.debug('Mapped keys for repeatable jobs deleted successfully');
2✔
1005
      } else {
1006
        this.logger.debug('Could not delete repeatable job keys');
×
1007
      }
1008
    } else if ('ids' in request) {
3!
1009
      this.logger.verbose('Deleting jobs by their IDs', { id: request.ids });
3✔
1010

1011
      for (let queue of this.queuesList) {
3✔
1012
        for (let jobDataKey of request.ids) {
9✔
1013
          let callback: Promise<boolean>;
1014
          const jobIdData = await this.getRedisValue(jobDataKey as string);
9✔
1015
          // future jobs scheduled with `when` will have same repeatId as external SCS jobID
1016
          if (jobIdData && jobIdData.repeatId && (jobIdData.repeatId != jobDataKey)) {
9✔
1017
            const jobs = await queue.getRepeatableJobs();
2✔
1018
            for (let job of jobs) {
2✔
1019
              if (job.id === jobDataKey) {
1!
1020
                this.logger.debug('Removing Repeatable job by key for jobId', { id: job.id });
1✔
1021
                callback = queue.removeRepeatableByKey(job.key);
1✔
1022
                deleteResponse.status.push({
1✔
1023
                  id: job.id,
1024
                  code: 200,
1025
                  message: 'success'
1026
                });
1027
                await this.deleteRedisKey(jobDataKey as string);
1✔
1028
                await this.deleteRedisKey(jobIdData.repeatId);
1✔
1029
                break;
1✔
1030
              }
1031
            }
1032
          } else {
1033
            callback = queue.getJob(jobDataKey).then(async (jobData) => {
7✔
1034
              if (jobData) {
7✔
1035
                try {
2✔
1036
                  await this._removeBullJob(jobData.id, queue);
2✔
1037
                  await this.deleteRedisKey(jobData.id);
2✔
1038
                  deleteResponse.status.push({
2✔
1039
                    id: jobData.id.toString(),
1040
                    code: 200,
1041
                    message: 'success'
1042
                  });
1043
                } catch (err) {
1044
                  if (typeof err?.code === 'string') {
×
1045
                    err.code = 500;
×
1046
                  }
1047
                  deleteResponse.status.push({
×
1048
                    id: jobData.id.toString(),
1049
                    code: err.code,
1050
                    message: err.message
1051
                  });
1052
                  return false;
×
1053
                }
1054
                return true;
2✔
1055
              }
1056
              return false;
5✔
1057
            });
1058
          }
1059

1060
          // since no CB is returned for removeRepeatableByKey by bull
1061
          if (!callback) {
9✔
1062
            if (this.resourceEventsEnabled) {
1!
1063
              dispatch.push(this.jobEvents.emit(
1✔
1064
                'jobsDeleted', { id: jobDataKey })
1065
              );
1066
            }
1067
          } else {
1068
            callback.then(() => {
8✔
1069
              if (this.resourceEventsEnabled) {
8!
1070
                dispatch.push(this.jobEvents.emit(
8✔
1071
                  'jobsDeleted', { id: jobDataKey })
1072
                );
1073
              }
1074
            }).catch(err => {
1075
              this.logger.error('Error deleting job', { id: jobDataKey });
×
1076
              if (typeof err?.code === 'number') {
×
1077
                err.code = 500;
×
1078
              }
1079
              deleteResponse.status.push({
×
1080
                id: jobDataKey.toString(),
1081
                code: err.code,
1082
                message: err.message
1083
              });
1084
            });
1085
          }
1086
        }
1087
      }
1088
    }
1089

1090
    await Promise.all(dispatch);
5✔
1091
    deleteResponse.operation_status = { code: 200, message: 'success' };
5✔
1092
    return deleteResponse;
5✔
1093
  }
1094

1095
  /**
1096
   * Clean up queues - removes complted and failed jobs from queue
1097
   * @param {any} job clean up job
1098
   */
1099
  async cleanupJobs(ttlAfterFinished) {
1100
    for (let queue of this.queuesList) {
×
1101
      try {
×
1102
        await queue.clean(ttlAfterFinished, 0, COMPLETED_JOB_STATE);
×
1103
        await queue.clean(ttlAfterFinished, 0, FAILED_JOB_STATE);
×
1104
      } catch (err) {
1105
        this.logger.error('Error cleaning up jobs', err);
×
1106
      }
1107
    }
1108
    this.logger.info('Jobs cleaned up successfully');
×
1109
    let lastExecutedInterval = { lastExecutedInterval: (new Date()).toString() };
×
1110
    await this.repeatJobIdRedisClient.set(QUEUE_CLEANUP, JSON.stringify(lastExecutedInterval));
×
1111
  }
1112

1113
  async setupCleanInterval(cleanInterval: number, ttlAfterFinished: number) {
1114
    if (!ttlAfterFinished) {
×
1115
      ttlAfterFinished = DEFAULT_CLEANUP_COMPLETED_JOBS;
×
1116
    }
1117
    const intervalData = await this.getRedisValue(QUEUE_CLEANUP);
×
1118
    let timeInMs, delta;
1119
    const now = new Date().getTime();
×
1120
    if (intervalData?.lastExecutedInterval && typeof (intervalData.lastExecutedInterval) === 'string') {
×
1121
      timeInMs = new Date(intervalData.lastExecutedInterval).getTime();
×
1122
      this.logger.debug('Previous execution interval', intervalData);
×
1123
      delta = now - timeInMs;
×
1124
      this.logger.debug('Clean interval and previous difference', { cleanInterval, difference: delta });
×
1125
    }
1126

1127
    if (delta && (delta < cleanInterval)) {
×
1128
      // use setTimeout and then create interval on setTimeout
1129
      this.logger.info('Restoring previous execution interval with set timeout', { time: cleanInterval - delta });
×
1130
      setTimeout(async () => {
×
1131
        await this.cleanupJobs(ttlAfterFinished);
×
1132
        setInterval(this.cleanupJobs.bind(this), cleanInterval, ttlAfterFinished);
×
1133
      }, cleanInterval - delta);
1134
    } else {
1135
      setInterval(this.cleanupJobs.bind(this), cleanInterval, ttlAfterFinished);
×
1136
      this.logger.info('Clean up job interval set successfully');
×
1137
    }
1138
  }
1139

1140
  /**
1141
   * Reschedules a job - deletes it and recreates it with a new generated ID.
1142
   */
1143
  async update(request: JobList, ctx: any): Promise<DeepPartial<JobListResponse>> {
1144
    let subject = request.subject;
3✔
1145
    // update meta data for owners information
1146
    await this.createMetadata(request.items, AuthZAction.MODIFY, subject);
3✔
1147
    let acsResponse: DecisionResponse;
1148
    try {
3✔
1149
      if (!ctx) { ctx = {}; };
3!
1150
      ctx.subject = subject;
3✔
1151
      ctx.resources = request?.items;
3✔
1152
      acsResponse = await checkAccessRequest(ctx,
3✔
1153
        [{ resource: 'job', id: request.items.map(item => item.id) }],
3✔
1154
        AuthZAction.MODIFY, Operation.isAllowed);
1155
    } catch (err) {
1156
      this.logger.error('Error occurred requesting access-control-srv:', err);
×
1157
      return {
×
1158
        operation_status: {
1159
          code: err.code,
1160
          message: err.message
1161
        }
1162
      };
1163
    }
1164
    if (acsResponse.decision != Response_Decision.PERMIT) {
3✔
1165
      return { operation_status: acsResponse.operation_status };
1✔
1166
    }
1167
    if (_.isNil(request) || _.isNil(request.items)) {
2!
1168
      return {
×
1169
        operation_status: {
1170
          code: 400,
1171
          message: 'Missing items in update request'
1172
        }
1173
      };
1174
    }
1175

1176
    const mappedJobs = request?.items?.reduce((obj, job) => {
2✔
1177
      obj[job.id] = job;
2✔
1178
      return obj;
2✔
1179
    }, {});
1180

1181
    const jobData = await this.read(JobReadRequest.fromPartial(
2✔
1182
      {
1183
        filter: {
1184
          job_ids: Object.keys(mappedJobs)
1185
        },
1186
        subject
1187
      }
1188
    ), ctx);
1189

1190
    await this.delete(DeleteRequest.fromPartial({
2✔
1191
      ids: Object.keys(mappedJobs),
1192
      subject
1193
    }), {});
1194

1195
    const result = [];
2✔
1196

1197
    jobData?.items?.forEach(async (job) => {
2✔
1198
      const mappedJob = mappedJobs[job?.payload?.id];
2✔
1199
      let endJob = {
2✔
1200
        id: mappedJob.id,
1201
        type: mappedJob.type,
1202
        options: {
1203
          ...job.payload.options,
1204
          ...(mappedJob.options ? mappedJob.options : {})
2!
1205
        },
1206
        data: mappedJob.data || job.payload.data,
2!
1207
        when: mappedJob.when,
1208
      };
1209

1210
      if (endJob.when && endJob.options) {
2!
1211
        delete endJob.options['delay'];
2✔
1212
      }
1213

1214
      result.push(endJob);
2✔
1215
    });
1216

1217
    return this.create(JobList.fromPartial({
2✔
1218
      items: result,
1219
      subject
1220
    }), ctx);
1221
  }
1222

1223
  /**
1224
   * Upserts a job - creates a new job if it does not exist or update the
1225
   * existing one if it already exists.
1226
   */
1227
  async upsert(request: JobList, ctx: any): Promise<DeepPartial<JobListResponse>> {
1228
    let subject = request.subject;
2✔
1229
    await this.createMetadata(request.items, AuthZAction.MODIFY, subject);
2✔
1230
    let acsResponse: DecisionResponse;
1231
    try {
2✔
1232
      if (!ctx) { ctx = {}; };
2!
1233
      ctx.subject = subject;
2✔
1234
      ctx.resources = request.items;
2✔
1235
      acsResponse = await checkAccessRequest(ctx,
2✔
1236
        [{ resource: 'job', id: request.items.map(item => item.id) }],
2✔
1237
        AuthZAction.MODIFY, Operation.isAllowed);
1238
    } catch (err) {
1239
      this.logger.error('Error occurred requesting access-control-srv:', err);
×
1240
      return {
×
1241
        operation_status: {
1242
          code: err.code,
1243
          message: err.message
1244
        }
1245
      };
1246
    }
1247

1248
    if (acsResponse.decision != Response_Decision.PERMIT) {
2✔
1249
      return { operation_status: acsResponse.operation_status };
1✔
1250
    }
1251
    if (_.isNil(request) || _.isNil(request.items)) {
1!
1252
      return { operation_status: { code: 400, message: 'Missing items in upsert request' } };
×
1253
    }
1254

1255
    let result = [];
1✔
1256

1257
    for (let eachJob of request.items) {
1✔
1258
      let jobExists = false;
1✔
1259
      let origJobId = _.cloneDeep(eachJob.id);
1✔
1260
      for (let queue of this.queuesList) {
1✔
1261
        const jobIdData = await this.getRedisValue(eachJob.id as string);
2✔
1262
        // future jobs scheduled with `when` will have same repeatId as external SCS jobID
1263
        if (jobIdData?.repeatId && (jobIdData.repeatId != origJobId)) {
2!
1264
          const repeatId = jobIdData.repeatId;
×
1265
          if (jobIdData?.options?.repeat?.cron && jobIdData?.options?.repeat?.every) {
×
1266
            result.push({
×
1267
              status: {
1268
                id: origJobId,
1269
                code: 400,
1270
                message: 'Both .cron and .every options are defined for this repeatable job'
1271
              }
1272
            });
1273
            continue;
×
1274
          }
1275
          const nextMillis = this.getNextMillis(Date.now(), jobIdData.options.repeat);
×
1276
          this.logger.debug('Repeatable job identifier', { id: eachJob.id, repeatId: `repeat:${repeatId}:${nextMillis}` });
×
1277
          // map the repeatKey with nextmilis for bull repeatable jobID
1278
          eachJob.id = `repeat:${repeatId}:${nextMillis}`;
×
1279
        }
1280
        const jobInst = await queue.getJob(eachJob.id);
2✔
1281
        if (jobInst) {
2✔
1282
          // existing job update it with the given job identifier
1283
          if (eachJob.id.startsWith('repeat:')) {
1!
1284
            eachJob.id = origJobId;
×
1285
          }
1286
          result = [
1✔
1287
            ...result,
1288
            ...(await this.update(JobList.fromPartial({ items: [eachJob], subject }), ctx)).items
1289
          ];
1290
          jobExists = true;
1✔
1291
          break;
1✔
1292
        }
1293
      }
1294
      if (!jobExists) {
1!
1295
        // new job create it
1296
        result = [
×
1297
          ...result,
1298
          ...(await this.create(JobList.fromPartial({ items: [eachJob], subject }), ctx)).items
1299
        ];
1300
      }
1301
    }
1302

1303
    return {
1✔
1304
      items: result,
1305
      total_count: result.length,
1306
      operation_status: {
1307
        code: 200,
1308
        message: 'success'
1309
      }
1310
    };
1311
  }
1312

1313
  /**
1314
   * Clear all job data.
1315
   */
1316
  async clear(): Promise<any> {
1317
    let allJobs: any[] = [];
1✔
1318
    for (let queue of this.queuesList) {
1✔
1319
      allJobs = allJobs.concat(await queue.getJobs(this.bullOptions['allJobTypes']));
3✔
1320
    }
1321
    return Promise.all(allJobs.map(async (job) => job.remove())).catch(err => {
6✔
1322
      this.logger.error(`Error clearing jobs`, err);
×
1323
      throw err;
×
1324
    });
1325
  }
1326

1327
  async _removeBullJob(jobInstID: string, queue: Queue): Promise<void> {
1328
    return queue.getJob(jobInstID).then(job => {
2✔
1329
      if (job) {
2!
1330
        return job.remove();
2✔
1331
      }
1332
    }).then(() => {
1333
      this.logger.info(`Job#${jobInstID} removed`);
2✔
1334
    }).catch(err => {
1335
      this.logger.error(`Error removing job ${jobInstID}`, err);
×
1336
      throw err;
×
1337
    });
1338
  }
1339

1340
  /**
1341
   *  disable access control
1342
   */
1343
  disableAC() {
1344
    try {
1✔
1345
      this.cfg.set('authorization:enabled', false);
1✔
1346
      updateConfig(this.cfg);
1✔
1347
    } catch (err) {
1348
      this.logger.error('Error caught disabling authorization:', { err });
×
1349
      this.cfg.set('authorization:enabled', this.authZCheck);
×
1350
    }
1351
  }
1352

1353
  /**
1354
   *  enables access control
1355
   */
1356
  enableAC() {
1357
    try {
1✔
1358
      this.cfg.set('authorization:enabled', true);
1✔
1359
      updateConfig(this.cfg);
1✔
1360
    } catch (err) {
1361
      this.logger.error('Error caught enabling authorization:', { err });
×
1362
      this.cfg.set('authorization:enabled', this.authZCheck);
×
1363
    }
1364
  }
1365

1366
  /**
1367
   *  restore AC state to previous vale either before enabling or disabling AC
1368
   */
1369
  restoreAC() {
1370
    try {
1✔
1371
      this.cfg.set('authorization:enabled', this.authZCheck);
1✔
1372
      updateConfig(this.cfg);
1✔
1373
    } catch (err) {
1374
      this.logger.error('Error caught enabling authorization:', { err });
×
1375
      this.cfg.set('authorization:enabled', this.authZCheck);
×
1376
    }
1377
  }
1378

1379
  /**
1380
   * reads meta data from DB and updates owners information in resource if action is UPDATE / DELETE
1381
   * @param resources list of resources
1382
   * @param action resource action
1383
   * @param subject subject name
1384
   */
1385
  async createMetadata(resources: any, action: string, subject): Promise<any> {
1386
    let orgOwnerAttributes: Attribute[] = [];
17✔
1387
    if (resources && !_.isArray(resources)) {
17!
1388
      resources = [resources];
×
1389
    }
1390
    const urns = this.cfg.get('authorization:urns');
17✔
1391
    if (subject?.scope && (action === AuthZAction.CREATE || action === AuthZAction.MODIFY)) {
17✔
1392
      // add subject scope as default owners
1393
      orgOwnerAttributes.push(
14✔
1394
        {
1395
          id: urns?.ownerIndicatoryEntity,
1396
          value: urns?.organization,
1397
          attributes: [{
1398
            id: urns?.ownerInstance,
1399
            value: subject?.scope,
1400
            attributes: []
1401
          }]
1402
        });
1403
    }
1404

1405
    if (resources?.length > 0) {
17!
1406
      for (let resource of resources) {
17✔
1407
        if (!resource.data) {
20✔
1408
          resource.data = { meta: {} };
3✔
1409
        } else if (!resource.data.meta) {
17✔
1410
          resource.data.meta = {};
12✔
1411
        }
1412
        if (resource?.id && (action === AuthZAction.MODIFY || action === AuthZAction.DELETE)) {
20✔
1413
          let result;
1414
          try {
8✔
1415
            result = await this.read(JobReadRequest.fromPartial({
8✔
1416
              filter: {
1417
                job_ids: [resource.id]
1418
              },
1419
              subject
1420
            }), {});
1421
          } catch (err) {
1422
            if (err.message.startsWith('Error! Jobs not found in any of the queues') && action != AuthZAction.DELETE) {
×
1423
              this.logger.debug('New job should be created', { jobId: resource.id });
×
1424
              result = { items: [] };
×
1425
            } else {
1426
              this.logger.error(`Error reading job with resource ID ${resource.id}`, { code: err.code, message: err.message, stack: err.stack });
×
1427
            }
1428
          }
1429
          // update owners info
1430
          if (result?.items?.length === 1 && result?.items[0]?.payload) {
8✔
1431
            let item = result.items[0].payload;
6✔
1432
            resource.data.meta.owners = item.data.meta.owners;
6✔
1433
            // adding meta to resource root (needed by access-contorl-srv for owners information check)
1434
            // meta is inside data of resource since the data is persisted in redis using bull
1435
            resource.meta = { owners: item.data.meta.owners };
6✔
1436
          } else if ((!result || !result.items || !result.items[0] || !result.items[0].payload) && action === AuthZAction.MODIFY) {
2!
1437
            // job does not exist - create new job (ex: Upsert with action modify)
1438
            let ownerAttributes = _.cloneDeep(orgOwnerAttributes);
2✔
1439
            // add user as default owners
1440
            ownerAttributes.push(
2✔
1441
              {
1442
                id: urns?.ownerIndicatoryEntity,
1443
                value: urns?.user,
1444
                attributes: [{
1445
                  id: urns?.ownerInstance,
1446
                  value: subject?.id,
1447
                  attributes: []
1448
                }]
1449
              });
1450
            resource.data.meta.owners = ownerAttributes;
2✔
1451
            resource.meta = { owners: ownerAttributes };
2✔
1452
          }
1453
        } else if ((action === AuthZAction.CREATE || !resource.id) && !resource.data.meta.owners) {
12!
1454
          let ownerAttributes = _.cloneDeep(orgOwnerAttributes);
10✔
1455
          // add user as default owners
1456
          if (resource.id) {
10✔
1457
            ownerAttributes.push(
2✔
1458
              {
1459
                id: urns.ownerIndicatoryEntity,
1460
                value: urns.user,
1461
                attributes: [{
1462
                  id: urns.ownerInstance,
1463
                  value: subject.id,
1464
                  attributes: []
1465
                }]
1466
              });
1467
          }
1468
          resource.data.meta.owners = ownerAttributes;
10✔
1469
          resource.meta = { owners: ownerAttributes };
10✔
1470
        } else if (action === AuthZAction.CREATE && resource?.data?.meta?.owners) {
2!
1471
          resource.meta = { owners: resource.data.meta.owners };
2✔
1472
        }
1473
      }
1474
    }
1475
    return resources;
17✔
1476
  }
1477
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc