• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

restorecommerce / scheduling-srv / 6429601929

06 Oct 2023 08:59AM UTC coverage: 67.881% (+0.5%) from 67.386%
6429601929

push

github

Arun-KumarH
fix: SCS CRUD to retain unique jobId and not to duplicate jobs on update / upsert

266 of 441 branches covered (0.0%)

Branch coverage included in aggregate %.

28 of 28 new or added lines in 1 file covered. (100.0%)

647 of 904 relevant lines covered (71.57%)

6.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.37
/src/schedulingService.ts
1
import * as _ from 'lodash';
1✔
2
import { errors } from '@restorecommerce/chassis-srv';
1✔
3
import * as kafkaClient from '@restorecommerce/kafka-client';
4
import { AuthZAction, ACSAuthZ, updateConfig, DecisionResponse, Operation, PolicySetRQResponse } from '@restorecommerce/acs-client';
1✔
5
import {
1✔
6
  JobServiceImplementation as SchedulingServiceServiceImplementation,
7
  JobFailed, JobDone, DeepPartial, JobList, JobListResponse, Data,
8
  Backoff_Type, JobOptions_Priority, JobReadRequest, JobReadRequest_SortOrder
9
} from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/job';
10
import { createClient, RedisClientType } from 'redis';
1✔
11
import { NewJob, Priority } from './types';
1✔
12
import { parseExpression } from 'cron-parser';
1✔
13
import * as crypto from 'crypto';
1✔
14
import { _filterJobData, _filterJobOptions, _filterQueuedJob, checkAccessRequest, marshallProtobufAny } from './utilts';
1✔
15
import * as uuid from 'uuid';
1✔
16
import { Logger } from 'winston';
17
import { Response_Decision } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/access_control';
1✔
18
import { Attribute } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/attribute';
19
import { DeleteRequest, DeleteResponse } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/resource_base';
1✔
20
import { Queue, QueueOptions, Job } from 'bullmq';
1✔
21
import { parseInt } from 'lodash';
1✔
22

23
const JOB_DONE_EVENT = 'jobDone';
1✔
24
const JOB_FAILED_EVENT = 'jobFailed';
1✔
25
const DEFAULT_CLEANUP_COMPLETED_JOBS = 604800000; // 7 days in miliseconds
1✔
26
const COMPLETED_JOB_STATE = 'completed';
1✔
27
const FAILED_JOB_STATE = 'failed';
1✔
28
const QUEUE_CLEANUP = 'queueCleanup';
1✔
29

30
/**
31
 * A job scheduling service.
32
 */
33
export class SchedulingService implements SchedulingServiceServiceImplementation {
1✔
34

35
  jobEvents: kafkaClient.Topic;
36
  logger: Logger;
37

38
  queuesConfigList: any;
39
  queuesList: Queue[];
40
  defaultQueueName: string;
41

42
  redisClient: RedisClientType<any, any>;
43
  resourceEventsEnabled: boolean;
44
  canceledJobs: Set<string>;
45
  bullOptions: any;
46
  cfg: any;
47
  authZ: ACSAuthZ;
48
  authZCheck: boolean;
49
  repeatJobIdRedisClient: RedisClientType<any, any>;
50

51

52
  constructor(jobEvents: kafkaClient.Topic,
53
    private redisConfig: any, logger: any, redisClient: RedisClientType<any, any>,
1✔
54
    bullOptions: any, cfg: any, authZ: ACSAuthZ) {
55
    this.jobEvents = jobEvents;
1✔
56
    this.resourceEventsEnabled = true;
1✔
57
    this.bullOptions = bullOptions;
1✔
58
    this.logger = logger;
1✔
59
    this.queuesList = [];
1✔
60
    this.queuesConfigList = [];
1✔
61
    this.redisClient = redisClient;
1✔
62

63
    const repeatJobIdCfg = cfg.get('redis');
1✔
64
    repeatJobIdCfg.database = cfg.get('redis:db-indexes:db-repeatJobId');
1✔
65
    this.repeatJobIdRedisClient = createClient(repeatJobIdCfg);
1✔
66
    this.repeatJobIdRedisClient.on('error', (err) => logger.error('Redis client error in repeatable job store', err));
1✔
67
    this.repeatJobIdRedisClient.connect().then((data) => {
1✔
68
      logger.info('Redis client connection for repeatable job store successful');
1✔
69
    }).catch(err => logger.error('Redis client error for repeatable job store', err));
×
70

71
    this.canceledJobs = new Set<string>();
1✔
72
    this.cfg = cfg;
1✔
73
    this.authZ = authZ;
1✔
74
    this.authZCheck = this.cfg.get('authorization:enabled');
1✔
75

76
    // Read Queue Configuration file and find first queue which has "default": true,
77
    // then save it to defaultQueueName
78
    const queuesCfg = this.cfg.get('queue');
1✔
79
    if (_.isEmpty(queuesCfg)) {
1!
80
      this.logger.error('Queue configuration not found!');
×
81
      throw new Error('Queue configuration not found!');
×
82
    }
83
    let defaultTrueExists = false;
1✔
84
    for (let queueCfg of queuesCfg) {
1✔
85
      // Find configuration which has default=true
86
      if (queueCfg.default == true) {
1!
87
        defaultTrueExists = true;
1✔
88
        this.defaultQueueName = queueCfg.name;
1✔
89
        break;
1✔
90
      }
91
    }
92
    if (!defaultTrueExists) {
1!
93
      this.logger.error('Queue default configuration not found!');
×
94
      throw new Error('Queue default configuration not found!');
×
95
    }
96

97
    // Create Queues
98
    for (let queueCfg of queuesCfg) {
1✔
99
      let queueOptions: QueueOptions;
100
      const prefix = queueCfg.name;
3✔
101
      const rateLimiting = queueCfg.rateLimiting;
3✔
102
      const advancedSettings = queueCfg.advancedSettings;
3✔
103

104
      queueOptions = {
3✔
105
        connection: {
106
          ...redisConfig,
107
        }
108
      };
109

110
      // Create Queue Configuration - Add Rate Limiting if enabled
111
      if (!_.isEmpty(rateLimiting) && rateLimiting.enabled == true) {
3!
112
        this.logger.info(`Queue: ${queueCfg.name} - Rate limiting is ENABLED.`);
×
113
      }
114

115
      if (!_.isEmpty(advancedSettings)) {
3!
116
        queueOptions.settings = {
3✔
117
          ...advancedSettings,
118
        };
119
      }
120

121
      const redisURL = new URL((queueOptions.connection as any).url);
3✔
122

123
      if ('keyPrefix' in queueOptions.connection) {
3!
124
        delete queueOptions.connection.keyPrefix;
×
125
      }
126

127
      let queue = new Queue(prefix, {
3✔
128
        ...queueOptions,
129
        connection: {
130
          ...queueOptions.connection as any,
131
          host: redisURL.hostname,
132
          port: parseInt(redisURL.port)
133
        }
134
      });
135
      this.queuesList.push(queue);
3✔
136

137
      // Add Queue Configurations
138
      let queueCfgObj = {
3✔
139
        name: queueCfg.name,
140
        concurrency: queueCfg.concurrency,
141
        default: queueCfg.default,
142
        runMissedScheduled: queueCfg.runMissedScheduled
143
      };
144
      this.queuesConfigList.push(queueCfgObj);
3✔
145
    }
146
  }
147

148
  /**
149
   * Start handling the job queue, job scheduling and
150
   * managing job events.
151
   */
152
  async start(): Promise<any> {
153
    const logger = this.logger;
1✔
154
    const that = this;
1✔
155
    const events = [JOB_DONE_EVENT, JOB_FAILED_EVENT];
1✔
156
    for (let eventName of events) {
1✔
157
      // A Scheduling Service Event Listener
158
      await this.jobEvents.on(eventName, async (msg: any, ctx: any,
2✔
159
        config: any, eventName: string): Promise<any> => {
160
        let job = msg;
6✔
161
        // Match Job Type to Queue Name, else use Default Queue
162
        let queue = _.find(this.queuesList, { name: job.type });
6✔
163
        let defaultQueue = _.find(this.queuesList, { name: this.defaultQueueName });
6✔
164
        if (_.isEmpty(queue)) {
6!
165
          queue = defaultQueue;
×
166
        }
167

168
        if (eventName === JOB_FAILED_EVENT) {
6!
169
          logger.error(`job@${job.type}#${job.id} failed with error #${job.error}`,
×
170
            _filterQueuedJob<JobFailed>(job, this.logger));
171
        } else if (eventName === JOB_DONE_EVENT) {
6!
172
          logger.verbose(`job#${job.id} done`, _filterQueuedJob<JobDone>(job, this.logger));
6✔
173
        }
174

175
        logger.info('Received Job event', { event: eventName });
6✔
176
        logger.info('Job details', job);
6✔
177
        const jobData: any = await queue.getJob(job.id).catch(error => {
6✔
178
          that.logger.error('Error retrieving job ${job.id} from queue', error);
×
179
        });
180

181
        if (job?.delete_scheduled) {
6✔
182
          await queue.removeRepeatable(jobData.name, jobData.opts.repeat);
1✔
183
        }
184
      });
185
    }
186

187
    // Initialize Event Listeners for each Queue
188
    for (let queue of this.queuesList) {
1✔
189
      queue.on('error', (error) => {
3✔
190
        logger.error('queue error', error);
×
191
      });
192
      queue.on('waiting', (job) => {
3✔
193
        logger.verbose(`job#${job.id} scheduled`, job);
9✔
194
      });
195
      queue.on('removed', (job) => {
3✔
196
        logger.verbose(`job#${job.id} removed`, job);
13✔
197
      });
198
      queue.on('progress', (job) => {
3✔
199
        logger.verbose(`job#${job.id} progress`, job);
×
200
      });
201
    }
202

203
    // If the scheduling service goes down and if there were
204
    // recurring jobs which have missed schedules then
205
    // we will need to reschedule it for those missing intervals.
206
    await this._rescheduleMissedJobs();
1✔
207
  }
208

209
  /**
210
   * To reschedule the missed recurring jobs upon service restart
211
   */
212
  async _rescheduleMissedJobs(): Promise<void> {
213
    // for jobs created via Kafka currently there are no acs checks
214
    this.disableAC();
1✔
215
    const createDispatch = [];
1✔
216
    let result: Job[] = [];
1✔
217
    let thiz = this;
1✔
218

219
    // Get the jobs
220
    for (let queueCfg of this.queuesConfigList) {
1✔
221
      // If enabled in the config, or the config is missing,b
222
      // Reschedule the missed jobs, else skip.
223
      let queue = _.find(this.queuesList, { name: queueCfg.name });
3✔
224
      let runMissedScheduled = queueCfg.runMissedScheduled;
3✔
225
      if (_.isNil(runMissedScheduled) ||
3!
226
        (!_.isNil(runMissedScheduled) && runMissedScheduled == true)) {
227
        await queue.getJobs(this.bullOptions['activeAndFutureJobTypes']).then(jobs => {
3✔
228
          result = result.concat(jobs);
3✔
229
        }).catch(error => {
230
          thiz.logger.error('Error reading jobs to reschedule the missed recurring jobs', error);
×
231
        });
232
      }
233
    }
234
    let lastRunTime;
235
    for (let job of result) {
1✔
236
      // get the last run time for the job, we store the last run time only
237
      // for recurring jobs
238
      try {
×
239
        lastRunTime = await this.redisClient.get(job.name);
×
240
      } catch (err) {
241
        this.logger.error(
×
242
          'Error occurred reading the last run time for job type:',
243
          { name: job.name, err });
244
      }
245
      // we store lastRunTime only for recurring jobs and if it exists check
246
      // cron interval and schedule immediate jobs for missed intervals
247
      this.logger.info(`Last run time of ${job.name} Job was:`, lastRunTime);
×
248
      if (lastRunTime) {
×
249
        // convert redis string value to object and get actual time value
250
        try {
×
251
          lastRunTime = JSON.parse(lastRunTime);
×
252
        } catch (error) {
253
          this.logger.error('Error parsing lastRunTime', {
×
254
            code: error.code,
255
            message: error.message, stack: error.stack
256
          });
257
        }
258

259
        if ((job?.opts?.repeat as any)?.cron && lastRunTime) {
×
260
          let options = {
×
261
            currentDate: new Date(lastRunTime.time),
262
            endDate: new Date(),
263
            iterator: true
264
          };
265
          const intervalTime =
266
            parseExpression((job.opts.repeat as any).cron, options);
×
267
          while (intervalTime.hasNext()) {
×
268
            let nextInterval: any = intervalTime.next();
×
269
            const nextIntervalTime = nextInterval.value.toString();
×
270
            // schedule it as one time job for now or immediately
271
            const data = {
×
272
              payload: marshallProtobufAny({
273
                value: { time: nextIntervalTime }
274
              })
275
            };
276
            const currentTime = new Date();
×
277
            const immediateJob: any = {
×
278
              type: job.name,
279
              data,
280
              // give a delay of 2 sec between each job
281
              // to avoid time out of queued jobs
282
              when: currentTime.setSeconds(currentTime.getSeconds() + 2).toString(),
283
              options: {}
284
            };
285
            createDispatch.push(thiz.create({
×
286
              items: [immediateJob],
287
              total_count: 0,
288
            }, {}));
289
          }
290
        }
291
      }
292
    }
293
    this.restoreAC();
1✔
294
    await createDispatch;
1✔
295
  }
296

297
  /**
298
   * Disabling of CRUD events.
299
   */
300
  disableEvents(): void {
301
    this.resourceEventsEnabled = false;
×
302
  }
303

304
  /**
305
   * Enabling of CRUD events.
306
   */
307
  enableEvents(): any {
308
    this.resourceEventsEnabled = true;
×
309
  }
310

311
  _validateJob(job: NewJob): NewJob {
312
    if (_.isNil(job.type)) {
11!
313
      throw new errors.InvalidArgument('Job type not specified.');
×
314
    }
315

316
    if (!job.options) {
11!
317
      job.options = {};
×
318
    }
319

320
    if (job.when) {
11✔
321
      // If the jobSchedule time has already lapsed then do not schedule the job
322
      const jobScheduleTime = new Date(job.when).getTime();
7✔
323
      const currentTime = new Date().getTime();
7✔
324
      if (jobScheduleTime < currentTime) {
7!
325
        throw new errors.InvalidArgument('Cannot schedule a job for an elapsed time');
×
326
      }
327

328
      job.options.delay = jobScheduleTime - currentTime;
7✔
329
    }
330

331
    if (job.options.backoff && typeof job.options.backoff !== 'number') {
11!
332
      if (typeof job.options.backoff.type === 'number') {
11!
333
        job.options.backoff.type = Object.keys(Backoff_Type)[job.options.backoff.type];
×
334
      }
335
      job.options.backoff.type = job.options.backoff.type.toLowerCase();
11✔
336
    }
337

338
    if (job.options.priority && typeof job.options.priority === 'string') {
11!
339
      job.options.priority = JobOptions_Priority[job.options.priority] as any;
11✔
340
    }
341

342
    if (_.isEmpty(job.data)) {
11!
343
      throw new errors.InvalidArgument('No job data specified.');
×
344
    }
345

346
    job.data = _filterJobData(job.data, false, this.logger);
11✔
347

348
    return job;
11✔
349
  }
350

351
  /**
352
   * get next job execution time in mili seconds
353
   * @param millis
354
   * @param opts
355
   */
356
  getNextMillis(millis, opts) {
357
    if (opts?.every) {
1!
358
      return Math.floor(millis / opts.every) * opts.every + opts.every;
×
359
    }
360

361
    const currentDate =
362
      opts?.startDate && new Date(opts.startDate) > new Date(millis)
1!
363
        ? new Date(opts.startDate)
364
        : new Date(millis);
365
    const interval = parseExpression(
1✔
366
      opts.cron,
367
      _.defaults(
368
        {
369
          currentDate
370
        },
371
        opts
372
      )
373
    );
374

375
    try {
1✔
376
      return interval.next().getTime();
1✔
377
    } catch (e) {
378
      this.logger.error('Error getting next job execution time');
×
379
    }
380
  }
381

382
  private md5(str) {
383
    return crypto
×
384
      .createHash('md5')
385
      .update(str)
386
      .digest('hex');
387
  }
388

389
  /**
390
   * store the mapping from repeateKey to external interface SCS job Id, and
391
   * also the mapping other way around i.e. from SCS job Id to repeatKey (needed for read operations)
392
   * @param name - job name
393
   * @param repeat - job repeate options
394
   * @param jobId - job id
395
   */
396
  async storeRepeatKey(repeatId, scsJobId, options) {
397
    try {
11✔
398
      if (repeatId && scsJobId) {
11!
399
        this.logger.info('Repeat key mapped to external SCS JobId', { repeatId, scsJobId });
11✔
400
        await this.repeatJobIdRedisClient.set(repeatId, scsJobId);
11✔
401
        const jobIdData = { repeatId, options };
11✔
402
        await this.repeatJobIdRedisClient.set(scsJobId, JSON.stringify(jobIdData));
11✔
403
      }
404
    } catch (error) {
405
      this.logger.error('Error storing repeatKey to redis', {
×
406
        code: error.code,
407
        message: error.message, stack: error.stack
408
      });
409
    }
410
  }
411

412
  private idGen(): string {
413
    return uuid.v4().replace(/-/g, '');
8✔
414
  }
415

416
  /**
417
   * Create and queue jobs.
418
   * @param {any} call RPC call argument
419
   * @param {any} ctx RPC context
420
   */
421
  async create(request: JobList, ctx: any): Promise<DeepPartial<JobListResponse>> {
422
    let jobListResponse: JobListResponse = { items: [], operation_status: { code: 0, message: '' }, total_count: 0 };
9✔
423
    let subject = request.subject;
9✔
424
    if (_.isNil(request) || _.isNil(request.items)) {
9!
425
      return {
×
426
        items: [],
427
        total_count: 0,
428
        operation_status: {
429
          code: 400,
430
          message: 'Missing items in create request'
431
        }
432
      };
433
    }
434

435
    await this.createMetadata(request.items, AuthZAction.CREATE, subject);
9✔
436
    let acsResponse: DecisionResponse;
437
    try {
9✔
438
      if (!ctx) { ctx = {}; };
9!
439
      ctx.subject = subject;
9✔
440
      ctx.resources = request?.items;
9✔
441
      acsResponse = await checkAccessRequest(ctx, [{
9✔
442
        resource: 'job',
443
        id: request.items.map(item => item.id)
12✔
444
      }], AuthZAction.CREATE, Operation.isAllowed);
445
    } catch (err) {
446
      this.logger.error('Error occurred requesting access-control-srv:', err);
×
447
      return {
×
448
        items: [],
449
        total_count: 0,
450
        operation_status: {
451
          code: err.code,
452
          message: err.message
453
        }
454
      };
455
    }
456
    if (acsResponse.decision != Response_Decision.PERMIT) {
9✔
457
      return { items: [], total_count: 0, operation_status: acsResponse.operation_status };
1✔
458
    }
459

460
    let jobs: NewJob[] = [];
8✔
461
    for (let job of request?.items || []) {
8!
462
      try {
11✔
463
        jobs.push(this._validateJob(job as any));
11✔
464
      } catch (err) {
465
        this.logger.error('Error validating job', job);
×
466
        jobListResponse.items.push({
×
467
          status: {
468
            id: job.id,
469
            code: 400,
470
            message: err.message
471
          }
472
        });
473
      }
474
    }
475

476
    let result: Job[] = [];
8✔
477
    // Scheduling jobs
478
    for (let i = 0; i < jobs.length; i += 1) {
8✔
479
      let job = jobs[i];
11✔
480
      // if not jobID is specified generate a UUID
481
      if (!job.id) {
11✔
482
        job.id = this.idGen();
8✔
483
      }
484

485
      // map the id to jobId as needed in JobOpts for bull
486
      if (job?.id) {
11!
487
        // check if jobID already exists then map it as already exists error
488
        const existingJobId = await this.getRedisValue(job.id);
11✔
489
        if (existingJobId) {
11!
490
          // read job to check if data exists
491
          const jobData = await this.read(JobReadRequest.fromPartial({
×
492
            filter: {
493
              job_ids: [job.id]
494
            },
495
            subject
496
          }), ctx);
497
          if ((jobData?.items as any)[0]?.payload) {
×
498
            jobListResponse.items.push({
×
499
              status: {
500
                id: job.id,
501
                code: 403,
502
                message: `Job with ID ${job.id} already exists`
503
              }
504
            });
505
            continue;
×
506
          }
507
        }
508
        if (!job?.options) {
11!
509
          job.options = { jobId: job.id };
×
510
        } else {
511
          job.options.jobId = job.id;
11✔
512
        }
513
        if (job?.options?.repeat) {
11✔
514
          (job as any).options.repeat.jobId = job.id;
2✔
515
        }
516
      }
517

518
      if (!job.data.meta) {
11!
519
        const now = new Date();
×
520
        const metaObj = {
×
521
          created: now,
522
          modified: now,
523
          modified_by: '',
524
          owners: []
525
        };
526
        Object.assign(job.data, { meta: metaObj });
×
527
      }
528
      // if only owners are specified in meta
529
      if (job.data.meta && (!job.data.meta.created || !job.data.meta.modified)) {
11✔
530
        job.data.meta.created = new Date();
9✔
531
        job.data.meta.modified = new Date();
9✔
532
      }
533

534
      if (job?.data?.payload?.value) {
11!
535
        job.data.payload.value = job.data.payload.value.toString() as any;
11✔
536
      }
537

538
      // convert enum priority back to number as it's expected by bull
539
      if (job?.options?.priority) {
11!
540
        job.options.priority = typeof job.options.priority === 'number' ? job.options.priority : Priority[job.options.priority] as unknown as number;
11!
541
      }
542

543
      // if its a repeat job and tz is empty delete the key (else cron parser throws an error)
544
      if (job?.options?.repeat?.tz === '') {
11!
545
        delete job.options.repeat.tz;
×
546
      }
547

548
      const bullOptions = {
11✔
549
        ...job.options
550
      };
551

552
      if ((bullOptions as any).timeout === 1) {
11✔
553
        delete bullOptions['timeout'];
2✔
554
      }
555

556
      // Match the Job Type with the Queue Name and add the Job to this Queue.
557
      // If there is no match, add the Job to the Default Queue
558
      let queue = _.find(this.queuesList, { name: job.type });
11✔
559
      if (_.isEmpty(queue)) {
11!
560
        queue = _.find(this.queuesList, { name: this.defaultQueueName });
×
561
      }
562
      const submittedJob = await queue.add(job.type, job.data, bullOptions);
11✔
563
      if (submittedJob?.id?.startsWith('repeat:')) {
11✔
564
        const repeatJobId = submittedJob?.id?.split(':')[1];
2✔
565
        await this.storeRepeatKey(repeatJobId, job.id, job.options);
2✔
566
      } else if (submittedJob?.id) {
9!
567
        // future job with when
568
        await this.storeRepeatKey(submittedJob.id, job.id, job.options);
9✔
569
      }
570
      this.logger.verbose(`job@${job.type} created`, job);
11✔
571
      result.push(submittedJob);
11✔
572
    }
573

574
    for (let job of result) {
8✔
575
      let jobId = job.id as string;
11✔
576
      if (jobId.startsWith('repeat:')) {
11✔
577
        const repeatKey = jobId.split(':')[1];
2✔
578
        job.id = await this.getRedisValue(repeatKey);
2✔
579
      }
580
    }
581

582
    for (let job of result) {
8✔
583
      let when = job?.opts?.delay ? new Date(job?.opts?.delay).toString() : '';
11✔
584
      jobListResponse.items.push({
11✔
585
        payload: {
586
          id: job.id as string,
587
          type: job.name,
588
          data: _filterJobData(job.data, true, this.logger),
589
          options: _filterJobOptions(job.opts) as any,
590
          when
591
        },
592
        status: {
593
          id: (job?.id)?.toString(),
594
          code: 200,
595
          message: 'success'
596
        }
597
      });
598
    }
599
    const jobList = {
8✔
600
      items: result.map(job => ({
11✔
601
        id: job.id,
602
        type: job.name,
603
        data: _filterJobData(job.data, true, this.logger),
604
        options: _filterJobOptions(job.opts)
605
      })),
606
      total_count: result.length
607
    };
608

609
    if (this.resourceEventsEnabled) {
8!
610
      await this.jobEvents.emit('jobsCreated', jobList);
8✔
611
    }
612

613
    jobListResponse.operation_status = { code: 200, message: 'success' };
8✔
614
    return jobListResponse;
8✔
615
  }
616

617
  private filterByOwnerShip(customArgsObj, result) {
618
    // applying filter based on custom arguments (filterByOwnerShip)
619
    let customArgs = (customArgsObj)?.custom_arguments;
24✔
620
    if (customArgs?.value) {
24!
621
      let customArgsFilter;
622
      try {
24✔
623
        customArgsFilter = JSON.parse(customArgs.value.toString());
24✔
624
      } catch (error) {
625
        this.logger.error('Error parsing custom query arguments', {
×
626
          code: error.code,
627
          message: error.message, stack: error.stack
628
        });
629
      }
630
      if (!customArgsFilter) {
24!
631
        return [];
×
632
      }
633
      const ownerIndicatorEntity = customArgsFilter.entity;
24✔
634
      const ownerValues = customArgsFilter.instance;
24✔
635
      const ownerIndictaorEntURN = this.cfg.get('authorization:urns:ownerIndicatoryEntity');
24✔
636
      const ownerInstanceURN = this.cfg.get('authorization:urns:ownerInstance');
24✔
637
      result = result.filter(job => {
24✔
638
        if (job?.data?.meta?.owners?.length > 0) {
36!
639
          for (let owner of job.data.meta.owners) {
36✔
640
            if (owner?.id === ownerIndictaorEntURN && owner?.value === ownerIndicatorEntity && owner?.attributes?.length > 0) {
36!
641
              for (let ownerInstObj of owner.attributes) {
36✔
642
                if (ownerInstObj?.id === ownerInstanceURN && ownerInstObj?.value && ownerValues.includes(ownerInstObj.value)) {
36!
643
                  return job;
36✔
644
                }
645
              }
646
            }
647
          }
648
        }
649
      });
650
    }
651
    return result;
24✔
652
  }
653

654
  async deleteRedisKey(key: string): Promise<any> {
655
    try {
4✔
656
      await this.repeatJobIdRedisClient.del(key);
4✔
657
      this.logger.debug('Redis Key deleted successfully used for mapping repeatable jobID', { key });
4✔
658
    } catch (err) {
659
      this.logger.error(
×
660
        'Error occurred deleting redis key',
661
        { key, msg: err.message });
662
    }
663
  }
664

665
  async getRedisValue(key: string): Promise<any> {
666
    let redisValue;
667
    try {
46✔
668
      if (key) {
46!
669
        redisValue = await this.repeatJobIdRedisClient.get(key);
46✔
670
      }
671
      if (redisValue) {
46✔
672
        return JSON.parse(redisValue);
33✔
673
      } else {
674
        return;
13✔
675
      }
676
    } catch (err) {
677
      if (err?.message?.startsWith('Unexpected token') || err.message.startsWith('Unexpected number')) {
13!
678
        return redisValue;
13✔
679
      } else {
680
        this.logger.error(
×
681
          'Error occurred reading redis key',
682
          { key, msg: err.message });
683
      }
684
    }
685
  }
686

687

688
  /**
689
   * Retrieve jobs.
690
   * @param {any} call RPC call argument
691
   * @param {any} ctx RPC context
692
   */
693
  async read(request: JobReadRequest, ctx: any): Promise<DeepPartial<JobListResponse>> {
694
    let jobListResponse: JobListResponse = { items: [], operation_status: { code: 0, message: '' }, total_count: 0 };
27✔
695
    let subject = request.subject;
27✔
696
    let acsResponse: PolicySetRQResponse;
697
    try {
27✔
698
      if (!ctx) { ctx = {}; };
27!
699
      ctx.subject = subject;
27✔
700
      ctx.resources = [];
27✔
701
      acsResponse = await checkAccessRequest(ctx, [{ resource: 'job' }], AuthZAction.READ,
27✔
702
        Operation.whatIsAllowed) as PolicySetRQResponse;
703
    } catch (err) {
704
      this.logger.error('Error occurred requesting access-control-srv:', err);
×
705
      return {
×
706
        operation_status: {
707
          code: err.code,
708
          message: err.message
709
        }
710
      };
711
    }
712
    if (acsResponse.decision != Response_Decision.PERMIT) {
27✔
713
      return { operation_status: acsResponse.operation_status };
3✔
714
    }
715

716
    let result: Job[] = [];
24✔
717
    if (_.isEmpty(request) || _.isEmpty(request.filter)
24!
718
      && (!request.filter || !request.filter.job_ids
719
        || _.isEmpty(request.filter.job_ids))
720
      && (!request.filter || !request.filter.type ||
721
        _.isEmpty(request.filter.type))) {
722
      result = await this._getJobList();
12✔
723
      let custom_arguments;
724
      if (acsResponse?.custom_query_args?.length > 0) {
12!
725
        custom_arguments = acsResponse.custom_query_args[0].custom_arguments;
12✔
726
      }
727
      result = this.filterByOwnerShip({ custom_arguments }, result);
12✔
728
    } else {
729
      const that = this;
12✔
730
      let jobIDs = request.filter.job_ids || [];
12✔
731
      if (!_.isArray(jobIDs)) {
12!
732
        jobIDs = [jobIDs];
×
733
      }
734
      const typeFilterName = request.filter.type;
12✔
735

736
      // Search in all the queues and retrieve jobs after JobID
737
      // and add them to the jobIDsCopy list.
738
      // If the job is not found in any of the queues, continue looking
739
      // Finally compare the two lists and add an error to status for which
740
      // job could not be found in any of the queues.
741
      if (jobIDs.length > 0) {
12✔
742
        // jobIDsCopy should contain the jobIDs duplicate values
743
        // after the for loop ends
744
        let jobIDsCopy: string[] = [];
11✔
745
        for (let jobID of jobIDs) {
11✔
746
          const jobIdData = await this.getRedisValue(jobID as string);
11✔
747
          // future jobs scheduled with `when` will have same repeatId as external SCS jobID
748
          if (jobIdData?.repeatId && (jobIdData.repeatId != jobID)) {
11✔
749
            const repeatId = jobIdData.repeatId;
1✔
750
            if (jobIdData?.options?.repeat?.cron && jobIdData?.options?.repeat?.every) {
1!
751
              jobListResponse.items.push({
×
752
                status: {
753
                  id: jobID.toString(),
754
                  code: 400,
755
                  message: 'Both .cron and .every options are defined for this repeatable job'
756
                }
757
              });
758
              continue;
×
759
            }
760
            const nextMillis = this.getNextMillis(Date.now(), jobIdData.options.repeat);
1✔
761
            this.logger.debug('Repeatable job identifier', { id: jobID, repeatId: `repeat:${repeatId}:${nextMillis}` });
1✔
762
            // map the repeatKey with nextmilis for bull repeatable jobID
763
            jobID = `repeat:${repeatId}:${nextMillis}`;
1✔
764
          }
765
          for (let queue of this.queuesList) {
11✔
766
            await new Promise((resolve, reject) => {
33✔
767
              // getJob returns job or null
768
              queue.getJob(jobID).then(job => {
33✔
769
                resolve(job);
33✔
770
                if (!_.isEmpty(job)) {
33✔
771
                  result.push(job);
10✔
772
                  if ((job as any)?.opts?.repeat?.jobId) {
10✔
773
                    jobIDsCopy.push((job as any).opts.repeat.jobId);
1✔
774
                  } else {
775
                    jobIDsCopy.push(jobID);
9✔
776
                  }
777
                }
778
              }).catch(err => {
779
                that.logger.error(`Error reading job ${jobID}`, err);
×
780
                if (err?.code && typeof err.code === 'string') {
×
781
                  err.code = 500;
×
782
                }
783
                jobListResponse.items.push({
×
784
                  status: {
785
                    id: jobID.toString(),
786
                    code: err.code,
787
                    message: err.message
788
                  }
789
                });
790
              });
791
            });
792
          }
793
        }
794
        if (!_.isEqual(jobIDs.sort(), jobIDsCopy.sort())) {
11✔
795
          const jobIDsDiff = _.difference(jobIDs, jobIDsCopy);
1✔
796
          for (let jobId of jobIDsDiff) {
1✔
797
            jobListResponse.items.push({
1✔
798
              status: {
799
                id: jobId.toString(),
800
                code: 404,
801
                message: `Job ID ${jobId} not found in any of the queues`
802
              }
803
            });
804
          }
805
        }
806
      } else {
807
        try {
1✔
808
          let jobsList: any[] = [];
1✔
809
          for (let queue of this.queuesList) {
1✔
810
            const getJobsResult = await queue.getJobs(this.bullOptions['activeAndFutureJobTypes']);
3✔
811
            jobsList = jobsList.concat(getJobsResult);
3✔
812
          }
813
          result = jobsList;
1✔
814
        } catch (err) {
815
          that.logger.error('Error reading jobs', err);
×
816
          if (typeof err.code === 'string') {
×
817
            err.code = 500;
×
818
          }
819
          return {
×
820
            operation_status: {
821
              code: err.code,
822
              message: err.message
823
            }
824
          };
825
        }
826
      }
827

828
      if (typeFilterName) {
12✔
829
        result = result.filter(job => job.name === typeFilterName);
6✔
830
      }
831
      let custom_arguments;
832
      if (acsResponse?.custom_query_args?.length > 0) {
12!
833
        custom_arguments = acsResponse.custom_query_args[0].custom_arguments;
12✔
834
      }
835
      result = this.filterByOwnerShip({ custom_arguments }, result);
12✔
836
    }
837

838
    result = result.filter(valid => !!valid);
36✔
839

840
    if (!_.isEmpty(request) && !_.isEmpty(request.sort)
24✔
841
      && _.includes(['ASCENDING', 'DESCENDING'], request.sort)) {
842
      let sort;
843
      switch (request.sort) {
2!
844
        case JobReadRequest_SortOrder.DESCENDING:
845
          sort = 'desc';
1✔
846
          break;
1✔
847
        case JobReadRequest_SortOrder.ASCENDING:
848
          sort = 'asc';
1✔
849
          break;
1✔
850
        default:
851
          this.logger.error(`Unknown sort option ${sort}`);
×
852
      }
853
      result = _.orderBy(result, ['id'], [sort]);
2✔
854
    }
855

856
    for (let job of result) {
24✔
857
      let jobId = job.id as string;
36✔
858
      if (jobId.startsWith('repeat:')) {
36✔
859
        const repeatKey = jobId.split(':')[1];
11✔
860
        // it could be possible the redis repeat key is deleted on index 8 and old completed
861
        // jobs exist in data_store if delete on complete was not set to true for repeatable jobs
862
        const jobRedisId = await this.getRedisValue(repeatKey);
11✔
863
        if (jobRedisId) {
11!
864
          job.id = jobRedisId;
11✔
865
        }
866
      }
867
    }
868

869
    for (let job of result) {
24✔
870
      let when = job?.opts?.delay ? new Date(job?.opts?.delay).toString() : '';
36!
871
      jobListResponse.items.push({
36✔
872
        payload: {
873
          id: job.id as string,
874
          type: job.name,
875
          data: _filterJobData(job.data, true, this.logger),
876
          options: _filterJobOptions(job.opts) as any,
877
          when
878
        },
879
        status: {
880
          id: job.id.toString(),
881
          code: 200,
882
          message: 'success'
883
        }
884
      });
885
    }
886
    jobListResponse.total_count = jobListResponse?.items?.length;
24✔
887
    jobListResponse.operation_status = {
24✔
888
      code: 200,
889
      message: 'success'
890
    };
891
    return jobListResponse;
24✔
892
  }
893

894
  async _getJobList(): Promise<Job[]> {
895
    let jobsList: any[] = [];
14✔
896
    for (let queue of this.queuesList) {
14✔
897
      const getJobsResult = await queue.getJobs(this.bullOptions['activeAndFutureJobTypes']);
42✔
898
      jobsList = jobsList.concat(getJobsResult);
42✔
899
    }
900
    return jobsList;
14✔
901
  }
902

903
  // delete a job by its job instance after processing 'jobDone' / 'jobFailed'
904
  async _deleteJobInstance(jobId: string, queue: Queue): Promise<void> {
905
    return this._removeBullJob(jobId, queue);
×
906
  }
907

908
  /**
909
   * Delete Job from queue.
910
   */
911
  async delete(request: DeleteRequest, ctx: any): Promise<DeepPartial<DeleteResponse>> {
912
    let deleteResponse: DeleteResponse = { status: [], operation_status: { code: 0, message: '' } };
6✔
913
    if (_.isEmpty(request)) {
6!
914
      return {
×
915
        operation_status: {
916
          code: 400,
917
          message: 'No arguments provided for delete operation'
918
        }
919
      };
920
    }
921
    const subject = request?.subject;
6✔
922
    const jobIDs = request?.ids;
6✔
923
    let resources = [];
6✔
924
    let action;
925
    if (jobIDs) {
6✔
926
      action = AuthZAction.DELETE;
3✔
927
      if (_.isArray(jobIDs)) {
3!
928
        for (let id of jobIDs) {
3✔
929
          resources.push({ id });
3✔
930
        }
931
      } else {
932
        resources = [{ id: jobIDs }];
×
933
      }
934
      await this.createMetadata(resources, action, subject);
3✔
935
    }
936
    if (request.collection) {
6✔
937
      action = AuthZAction.DROP;
3✔
938
      resources = [{ collection: request.collection }];
3✔
939
    }
940
    let acsResponse: DecisionResponse;
941
    try {
6✔
942
      if (!ctx) { ctx = {}; };
6!
943
      ctx.subject = subject;
6✔
944
      ctx.resources = resources;
6✔
945
      acsResponse = await checkAccessRequest(ctx, [{ resource: 'job', id: jobIDs as string[] }], action,
6✔
946
        Operation.isAllowed);
947
    } catch (err) {
948
      this.logger.error('Error occurred requesting access-control-srv:', err);
×
949
      return {
×
950
        operation_status: {
951
          code: err.code,
952
          message: err.message
953
        }
954
      };
955
    }
956
    if (acsResponse.decision != Response_Decision.PERMIT) {
6✔
957
      return { operation_status: acsResponse.operation_status };
1✔
958
    }
959
    const dispatch = [];
5✔
960
    this.logger.info('Received delete request');
5✔
961
    if ('collection' in request && request.collection) {
5✔
962
      this.logger.verbose('Deleting all jobs');
2✔
963

964
      await this._getJobList().then(async (jobs) => {
2✔
965
        for (let job of jobs) {
2✔
966
          await job.remove();
5✔
967
          if (this.resourceEventsEnabled) {
5!
968
            dispatch.push(this.jobEvents.emit('jobsDeleted', { id: job.id }));
5✔
969
          }
970
          deleteResponse.status.push({
5✔
971
            id: job.id.toString(),
972
            code: 200,
973
            message: 'success'
974
          });
975
        }
976
      });
977
      // FLUSH redis DB index 8 used for mapping of repeat jobIds (since req is for dropping job collection)
978
      const delResp = await this.repeatJobIdRedisClient.flushDb();
2✔
979
      if (delResp) {
2!
980
        this.logger.debug('Mapped keys for repeatable jobs deleted successfully');
2✔
981
      } else {
982
        this.logger.debug('Could not delete repeatable job keys');
×
983
      }
984
    } else if ('ids' in request) {
3!
985
      this.logger.verbose('Deleting jobs by their IDs', { id: request.ids });
3✔
986

987
      for (let queue of this.queuesList) {
3✔
988
        for (let jobDataKey of request.ids) {
9✔
989
          let callback: Promise<boolean>;
990
          const jobIdData = await this.getRedisValue(jobDataKey as string);
9✔
991
          // future jobs scheduled with `when` will have same repeatId as external SCS jobID
992
          if (jobIdData && jobIdData.repeatId && (jobIdData.repeatId != jobDataKey)) {
9✔
993
            const jobs = await queue.getRepeatableJobs();
2✔
994
            for (let job of jobs) {
2✔
995
              if (job.id === jobDataKey) {
1!
996
                this.logger.debug('Removing Repeatable job by key for jobId', { id: job.id });
1✔
997
                callback = queue.removeRepeatableByKey(job.key);
1✔
998
                deleteResponse.status.push({
1✔
999
                  id: job.id,
1000
                  code: 200,
1001
                  message: 'success'
1002
                });
1003
                await this.deleteRedisKey(jobDataKey as string);
1✔
1004
                await this.deleteRedisKey(jobIdData.repeatId);
1✔
1005
                break;
1✔
1006
              }
1007
            }
1008
          } else {
1009
            callback = queue.getJob(jobDataKey).then(async (jobData) => {
7✔
1010
              if (jobData) {
7✔
1011
                try {
2✔
1012
                  await this._removeBullJob(jobData.id, queue);
2✔
1013
                  await this.deleteRedisKey(jobData.id);
2✔
1014
                  deleteResponse.status.push({
2✔
1015
                    id: jobData.id.toString(),
1016
                    code: 200,
1017
                    message: 'success'
1018
                  });
1019
                } catch (err) {
1020
                  if (typeof err?.code === 'string') {
×
1021
                    err.code = 500;
×
1022
                  }
1023
                  deleteResponse.status.push({
×
1024
                    id: jobData.id.toString(),
1025
                    code: err.code,
1026
                    message: err.message
1027
                  });
1028
                  return false;
×
1029
                }
1030
                return true;
2✔
1031
              }
1032
              return false;
5✔
1033
            });
1034
          }
1035

1036
          // since no CB is returned for removeRepeatableByKey by bull
1037
          if (!callback) {
9✔
1038
            if (this.resourceEventsEnabled) {
1!
1039
              dispatch.push(this.jobEvents.emit(
1✔
1040
                'jobsDeleted', { id: jobDataKey })
1041
              );
1042
            }
1043
          } else {
1044
            callback.then(() => {
8✔
1045
              if (this.resourceEventsEnabled) {
8!
1046
                dispatch.push(this.jobEvents.emit(
8✔
1047
                  'jobsDeleted', { id: jobDataKey })
1048
                );
1049
              }
1050
            }).catch(err => {
1051
              this.logger.error('Error deleting job', { id: jobDataKey });
×
1052
              if (typeof err?.code === 'number') {
×
1053
                err.code = 500;
×
1054
              }
1055
              deleteResponse.status.push({
×
1056
                id: jobDataKey.toString(),
1057
                code: err.code,
1058
                message: err.message
1059
              });
1060
            });
1061
          }
1062
        }
1063
      }
1064
    }
1065

1066
    await Promise.all(dispatch);
5✔
1067
    deleteResponse.operation_status = { code: 200, message: 'success' };
5✔
1068
    return deleteResponse;
5✔
1069
  }
1070

1071
  /**
1072
   * Clean up queues - removes complted and failed jobs from queue
1073
   * @param {any} job clean up job
1074
   */
1075
  async cleanupJobs(ttlAfterFinished) {
1076
    for (let queue of this.queuesList) {
×
1077
      try {
×
1078
        await queue.clean(ttlAfterFinished, 0, COMPLETED_JOB_STATE);
×
1079
        await queue.clean(ttlAfterFinished, 0, FAILED_JOB_STATE);
×
1080
      } catch (err) {
1081
        this.logger.error('Error cleaning up jobs', err);
×
1082
      }
1083
    }
1084
    this.logger.info('Jobs cleaned up successfully');
×
1085
    let lastExecutedInterval = { lastExecutedInterval: (new Date()).toString() };
×
1086
    await this.repeatJobIdRedisClient.set(QUEUE_CLEANUP, JSON.stringify(lastExecutedInterval));
×
1087
  }
1088

1089
  async setupCleanInterval(cleanInterval: number, ttlAfterFinished: number) {
1090
    if (!ttlAfterFinished) {
×
1091
      ttlAfterFinished = DEFAULT_CLEANUP_COMPLETED_JOBS;
×
1092
    }
1093
    const intervalData = await this.getRedisValue(QUEUE_CLEANUP);
×
1094
    let timeInMs, delta;
1095
    const now = new Date().getTime();
×
1096
    if (intervalData?.lastExecutedInterval && typeof (intervalData.lastExecutedInterval) === 'string') {
×
1097
      timeInMs = new Date(intervalData.lastExecutedInterval).getTime();
×
1098
      this.logger.debug('Previous execution interval', intervalData);
×
1099
      delta = now - timeInMs;
×
1100
      this.logger.debug('Clean interval and previous difference', { cleanInterval, difference: delta });
×
1101
    }
1102

1103
    if (delta && (delta < cleanInterval)) {
×
1104
      // use setTimeout and then create interval on setTimeout
1105
      this.logger.info('Restoring previous execution interval with set timeout', { time: cleanInterval - delta });
×
1106
      setTimeout(async () => {
×
1107
        await this.cleanupJobs(ttlAfterFinished);
×
1108
        setInterval(this.cleanupJobs.bind(this), cleanInterval, ttlAfterFinished);
×
1109
      }, cleanInterval - delta);
1110
    } else {
1111
      setInterval(this.cleanupJobs.bind(this), cleanInterval, ttlAfterFinished);
×
1112
      this.logger.info('Clean up job interval set successfully');
×
1113
    }
1114
  }
1115

1116
  /**
1117
   * Reschedules a job - deletes it and recreates it with a new generated ID.
1118
   */
1119
  async update(request: JobList, ctx: any): Promise<DeepPartial<JobListResponse>> {
1120
    let subject = request.subject;
3✔
1121
    // update meta data for owners information
1122
    await this.createMetadata(request.items, AuthZAction.MODIFY, subject);
3✔
1123
    let acsResponse: DecisionResponse;
1124
    try {
3✔
1125
      if (!ctx) { ctx = {}; };
3!
1126
      ctx.subject = subject;
3✔
1127
      ctx.resources = request?.items;
3✔
1128
      acsResponse = await checkAccessRequest(ctx,
3✔
1129
        [{ resource: 'job', id: request.items.map(item => item.id) }],
3✔
1130
        AuthZAction.MODIFY, Operation.isAllowed);
1131
    } catch (err) {
1132
      this.logger.error('Error occurred requesting access-control-srv:', err);
×
1133
      return {
×
1134
        operation_status: {
1135
          code: err.code,
1136
          message: err.message
1137
        }
1138
      };
1139
    }
1140
    if (acsResponse.decision != Response_Decision.PERMIT) {
3✔
1141
      return { operation_status: acsResponse.operation_status };
1✔
1142
    }
1143
    if (_.isNil(request) || _.isNil(request.items)) {
2!
1144
      return {
×
1145
        operation_status: {
1146
          code: 400,
1147
          message: 'Missing items in update request'
1148
        }
1149
      };
1150
    }
1151

1152
    const mappedJobs = request?.items?.reduce((obj, job) => {
2✔
1153
      obj[job.id] = job;
2✔
1154
      return obj;
2✔
1155
    }, {});
1156

1157
    const jobData = await this.read(JobReadRequest.fromPartial(
2✔
1158
      {
1159
        filter: {
1160
          job_ids: Object.keys(mappedJobs)
1161
        },
1162
        subject
1163
      }
1164
    ), ctx);
1165

1166
    await this.delete(DeleteRequest.fromPartial({
2✔
1167
      ids: Object.keys(mappedJobs),
1168
      subject
1169
    }), {});
1170

1171
    const result = [];
2✔
1172

1173
    jobData?.items?.forEach(async (job) => {
2✔
1174
      const mappedJob = mappedJobs[job?.payload?.id];
2✔
1175
      let endJob = {
2✔
1176
        id: mappedJob.id,
1177
        type: mappedJob.type,
1178
        options: {
1179
          ...job.payload.options,
1180
          ...(mappedJob.options ? mappedJob.options : {})
2!
1181
        },
1182
        data: mappedJob.data || job.payload.data,
2!
1183
        when: mappedJob.when,
1184
      };
1185

1186
      if (endJob.when && endJob.options) {
2!
1187
        delete endJob.options['delay'];
2✔
1188
      }
1189

1190
      result.push(endJob);
2✔
1191
    });
1192

1193
    return this.create(JobList.fromPartial({
2✔
1194
      items: result,
1195
      subject
1196
    }), ctx);
1197
  }
1198

1199
  /**
1200
   * Upserts a job - creates a new job if it does not exist or update the
1201
   * existing one if it already exists.
1202
   */
1203
  async upsert(request: JobList, ctx: any): Promise<DeepPartial<JobListResponse>> {
1204
    let subject = request.subject;
2✔
1205
    await this.createMetadata(request.items, AuthZAction.MODIFY, subject);
2✔
1206
    let acsResponse: DecisionResponse;
1207
    try {
2✔
1208
      if (!ctx) { ctx = {}; };
2!
1209
      ctx.subject = subject;
2✔
1210
      ctx.resources = request.items;
2✔
1211
      acsResponse = await checkAccessRequest(ctx,
2✔
1212
        [{ resource: 'job', id: request.items.map(item => item.id) }],
2✔
1213
        AuthZAction.MODIFY, Operation.isAllowed);
1214
    } catch (err) {
1215
      this.logger.error('Error occurred requesting access-control-srv:', err);
×
1216
      return {
×
1217
        operation_status: {
1218
          code: err.code,
1219
          message: err.message
1220
        }
1221
      };
1222
    }
1223

1224
    if (acsResponse.decision != Response_Decision.PERMIT) {
2✔
1225
      return { operation_status: acsResponse.operation_status };
1✔
1226
    }
1227
    if (_.isNil(request) || _.isNil(request.items)) {
1!
1228
      return { operation_status: { code: 400, message: 'Missing items in upsert request' } };
×
1229
    }
1230

1231
    let result = [];
1✔
1232

1233
    for (let eachJob of request.items) {
1✔
1234
      let jobExists = false;
1✔
1235
      let origJobId = _.cloneDeep(eachJob.id);
1✔
1236
      for (let queue of this.queuesList) {
1✔
1237
        const jobIdData = await this.getRedisValue(eachJob.id as string);
2✔
1238
        // future jobs scheduled with `when` will have same repeatId as external SCS jobID
1239
        if (jobIdData?.repeatId && (jobIdData.repeatId != origJobId)) {
2!
1240
          const repeatId = jobIdData.repeatId;
×
1241
          if (jobIdData?.options?.repeat?.cron && jobIdData?.options?.repeat?.every) {
×
1242
            result.push({
×
1243
              status: {
1244
                id: origJobId,
1245
                code: 400,
1246
                message: 'Both .cron and .every options are defined for this repeatable job'
1247
              }
1248
            });
1249
            continue;
×
1250
          }
1251
          const nextMillis = this.getNextMillis(Date.now(), jobIdData.options.repeat);
×
1252
          this.logger.debug('Repeatable job identifier', { id: eachJob.id, repeatId: `repeat:${repeatId}:${nextMillis}` });
×
1253
          // map the repeatKey with nextmilis for bull repeatable jobID
1254
          eachJob.id = `repeat:${repeatId}:${nextMillis}`;
×
1255
        }
1256
        const jobInst = await queue.getJob(eachJob.id);
2✔
1257
        if (jobInst) {
2✔
1258
          // existing job update it with the given job identifier
1259
          if (eachJob.id.startsWith('repeat:')) {
1!
1260
            eachJob.id = origJobId;
×
1261
          }
1262
          result = [
1✔
1263
            ...result,
1264
            ...(await this.update(JobList.fromPartial({ items: [eachJob], subject }), ctx)).items
1265
          ];
1266
          jobExists = true;
1✔
1267
          break;
1✔
1268
        }
1269
      }
1270
      if (!jobExists) {
1!
1271
        // new job create it
1272
        result = [
×
1273
          ...result,
1274
          ...(await this.create(JobList.fromPartial({ items: [eachJob], subject }), ctx)).items
1275
        ];
1276
      }
1277
    }
1278

1279
    return {
1✔
1280
      items: result,
1281
      total_count: result.length,
1282
      operation_status: {
1283
        code: 200,
1284
        message: 'success'
1285
      }
1286
    };
1287
  }
1288

1289
  /**
1290
   * Clear all job data.
1291
   */
1292
  async clear(): Promise<any> {
1293
    let allJobs: any[] = [];
1✔
1294
    for (let queue of this.queuesList) {
1✔
1295
      allJobs = allJobs.concat(await queue.getJobs(this.bullOptions['allJobTypes']));
3✔
1296
    }
1297
    return Promise.all(allJobs.map(async (job) => job.remove())).catch(err => {
6✔
1298
      this.logger.error(`Error clearing jobs`, err);
×
1299
      throw err;
×
1300
    });
1301
  }
1302

1303
  async _removeBullJob(jobInstID: string, queue: Queue): Promise<void> {
1304
    return queue.getJob(jobInstID).then(job => {
2✔
1305
      if (job) {
2!
1306
        return job.remove();
2✔
1307
      }
1308
    }).then(() => {
1309
      this.logger.info(`Job#${jobInstID} removed`);
2✔
1310
    }).catch(err => {
1311
      this.logger.error(`Error removing job ${jobInstID}`, err);
×
1312
      throw err;
×
1313
    });
1314
  }
1315

1316
  /**
1317
   *  disable access control
1318
   */
1319
  disableAC() {
1320
    try {
1✔
1321
      this.cfg.set('authorization:enabled', false);
1✔
1322
      updateConfig(this.cfg);
1✔
1323
    } catch (err) {
1324
      this.logger.error('Error caught disabling authorization:', { err });
×
1325
      this.cfg.set('authorization:enabled', this.authZCheck);
×
1326
    }
1327
  }
1328

1329
  /**
1330
   *  enables access control
1331
   */
1332
  enableAC() {
1333
    try {
1✔
1334
      this.cfg.set('authorization:enabled', true);
1✔
1335
      updateConfig(this.cfg);
1✔
1336
    } catch (err) {
1337
      this.logger.error('Error caught enabling authorization:', { err });
×
1338
      this.cfg.set('authorization:enabled', this.authZCheck);
×
1339
    }
1340
  }
1341

1342
  /**
1343
   *  restore AC state to previous vale either before enabling or disabling AC
1344
   */
1345
  restoreAC() {
1346
    try {
1✔
1347
      this.cfg.set('authorization:enabled', this.authZCheck);
1✔
1348
      updateConfig(this.cfg);
1✔
1349
    } catch (err) {
1350
      this.logger.error('Error caught enabling authorization:', { err });
×
1351
      this.cfg.set('authorization:enabled', this.authZCheck);
×
1352
    }
1353
  }
1354

1355
  /**
1356
   * reads meta data from DB and updates owners information in resource if action is UPDATE / DELETE
1357
   * @param resources list of resources
1358
   * @param action resource action
1359
   * @param subject subject name
1360
   */
1361
  async createMetadata(resources: any, action: string, subject): Promise<any> {
1362
    let orgOwnerAttributes: Attribute[] = [];
17✔
1363
    if (resources && !_.isArray(resources)) {
17!
1364
      resources = [resources];
×
1365
    }
1366
    const urns = this.cfg.get('authorization:urns');
17✔
1367
    if (subject?.scope && (action === AuthZAction.CREATE || action === AuthZAction.MODIFY)) {
17✔
1368
      // add subject scope as default owners
1369
      orgOwnerAttributes.push(
14✔
1370
        {
1371
          id: urns?.ownerIndicatoryEntity,
1372
          value: urns?.organization,
1373
          attributes: [{
1374
            id: urns?.ownerInstance,
1375
            value: subject?.scope,
1376
            attributes: []
1377
          }]
1378
        });
1379
    }
1380

1381
    if (resources?.length > 0) {
17!
1382
      for (let resource of resources) {
17✔
1383
        if (!resource.data) {
20✔
1384
          resource.data = { meta: {} };
3✔
1385
        } else if (!resource.data.meta) {
17✔
1386
          resource.data.meta = {};
12✔
1387
        }
1388
        if (resource?.id && (action === AuthZAction.MODIFY || action === AuthZAction.DELETE)) {
20✔
1389
          let result;
1390
          try {
8✔
1391
            result = await this.read(JobReadRequest.fromPartial({
8✔
1392
              filter: {
1393
                job_ids: [resource.id]
1394
              },
1395
              subject
1396
            }), {});
1397
          } catch (err) {
1398
            if (err.message.startsWith('Error! Jobs not found in any of the queues') && action != AuthZAction.DELETE) {
×
1399
              this.logger.debug('New job should be created', { jobId: resource.id });
×
1400
              result = { items: [] };
×
1401
            } else {
1402
              this.logger.error(`Error reading job with resource ID ${resource.id}`, { code: err.code, message: err.message, stack: err.stack });
×
1403
            }
1404
          }
1405
          // update owners info
1406
          if (result?.items?.length === 1 && result?.items[0]?.payload) {
8✔
1407
            let item = result.items[0].payload;
6✔
1408
            resource.data.meta.owners = item.data.meta.owners;
6✔
1409
            // adding meta to resource root (needed by access-contorl-srv for owners information check)
1410
            // meta is inside data of resource since the data is persisted in redis using bull
1411
            resource.meta = { owners: item.data.meta.owners };
6✔
1412
          } else if ((!result || !result.items || !result.items[0] || !result.items[0].payload) && action === AuthZAction.MODIFY) {
2!
1413
            // job does not exist - create new job (ex: Upsert with action modify)
1414
            let ownerAttributes = _.cloneDeep(orgOwnerAttributes);
2✔
1415
            // add user as default owners
1416
            ownerAttributes.push(
2✔
1417
              {
1418
                id: urns?.ownerIndicatoryEntity,
1419
                value: urns?.user,
1420
                attributes: [{
1421
                  id: urns?.ownerInstance,
1422
                  value: subject?.id,
1423
                  attributes: []
1424
                }]
1425
              });
1426
            resource.data.meta.owners = ownerAttributes;
2✔
1427
            resource.meta = { owners: ownerAttributes };
2✔
1428
          }
1429
        } else if ((action === AuthZAction.CREATE || !resource.id) && !resource.data.meta.owners) {
12!
1430
          let ownerAttributes = _.cloneDeep(orgOwnerAttributes);
10✔
1431
          // add user as default owners
1432
          if (resource.id) {
10✔
1433
            ownerAttributes.push(
2✔
1434
              {
1435
                id: urns.ownerIndicatoryEntity,
1436
                value: urns.user,
1437
                attributes: [{
1438
                  id: urns.ownerInstance,
1439
                  value: subject.id,
1440
                  attributes: []
1441
                }]
1442
              });
1443
          }
1444
          resource.data.meta.owners = ownerAttributes;
10✔
1445
          resource.meta = { owners: ownerAttributes };
10✔
1446
        } else if (action === AuthZAction.CREATE && resource?.data?.meta?.owners) {
2!
1447
          resource.meta = { owners: resource.data.meta.owners };
2✔
1448
        }
1449
      }
1450
    }
1451
    return resources;
17✔
1452
  }
1453
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc