• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

restorecommerce / scheduling-srv / 6966862478

23 Nov 2023 07:45AM UTC coverage: 67.763% (+0.1%) from 67.617%
6966862478

push

github

Arun-KumarH
chore: Release v1.2.3 - See CHANGELOG

272 of 449 branches covered (0.0%)

Branch coverage included in aggregate %.

655 of 919 relevant lines covered (71.27%)

6.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.1
/src/schedulingService.ts
1
import * as _ from 'lodash';
1✔
2
import { errors } from '@restorecommerce/chassis-srv';
1✔
3
import * as kafkaClient from '@restorecommerce/kafka-client';
4
import { AuthZAction, ACSAuthZ, updateConfig, DecisionResponse, Operation, PolicySetRQResponse } from '@restorecommerce/acs-client';
1✔
5
import {
1✔
6
  JobServiceImplementation as SchedulingServiceServiceImplementation,
7
  JobFailed, JobDone, DeepPartial, JobList, JobListResponse, Data,
8
  Backoff_Type, JobOptions_Priority, JobReadRequest, JobReadRequest_SortOrder
9
} from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/job';
10
import { createClient, RedisClientType } from 'redis';
1✔
11
import { NewJob, Priority } from './types';
1✔
12
import { parseExpression } from 'cron-parser';
1✔
13
import * as crypto from 'crypto';
1✔
14
import { _filterJobData, _filterJobOptions, _filterQueuedJob, checkAccessRequest, marshallProtobufAny } from './utilts';
1✔
15
import * as uuid from 'uuid';
1✔
16
import { Logger } from 'winston';
17
import { Response_Decision } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/access_control';
1✔
18
import { Attribute } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/attribute';
19
import { DeleteRequest, DeleteResponse } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/resource_base';
1✔
20
import { Queue, QueueOptions, Job } from 'bullmq';
1✔
21
import { parseInt } from 'lodash';
1✔
22

23
const JOB_DONE_EVENT = 'jobDone';
1✔
24
const JOB_FAILED_EVENT = 'jobFailed';
1✔
25
const DEFAULT_CLEANUP_COMPLETED_JOBS = 604800000; // 7 days in miliseconds
1✔
26
const COMPLETED_JOB_STATE = 'completed';
1✔
27
const FAILED_JOB_STATE = 'failed';
1✔
28
const QUEUE_CLEANUP = 'queueCleanup';
1✔
29

30
/**
31
 * A job scheduling service.
32
 */
33
export class SchedulingService implements SchedulingServiceServiceImplementation {
1✔
34

35
  jobEvents: kafkaClient.Topic;
36
  logger: Logger;
37

38
  queuesConfigList: any;
39
  queuesList: Queue[];
40
  defaultQueueName: string;
41

42
  redisClient: RedisClientType<any, any>;
43
  resourceEventsEnabled: boolean;
44
  canceledJobs: Set<string>;
45
  bullOptions: any;
46
  cfg: any;
47
  authZ: ACSAuthZ;
48
  authZCheck: boolean;
49
  repeatJobIdRedisClient: RedisClientType<any, any>;
50

51

52
  constructor(jobEvents: kafkaClient.Topic,
53
    private redisConfig: any, logger: any, redisClient: RedisClientType<any, any>,
1✔
54
    bullOptions: any, cfg: any, authZ: ACSAuthZ) {
55
    this.jobEvents = jobEvents;
1✔
56
    this.resourceEventsEnabled = true;
1✔
57
    this.bullOptions = bullOptions;
1✔
58
    this.logger = logger;
1✔
59
    this.queuesList = [];
1✔
60
    this.queuesConfigList = [];
1✔
61
    this.redisClient = redisClient;
1✔
62

63
    const repeatJobIdCfg = cfg.get('redis');
1✔
64
    repeatJobIdCfg.database = cfg.get('redis:db-indexes:db-repeatJobId');
1✔
65
    this.repeatJobIdRedisClient = createClient(repeatJobIdCfg);
1✔
66
    this.repeatJobIdRedisClient.on('error', (err) => logger.error('Redis client error in repeatable job store', err));
1✔
67
    this.repeatJobIdRedisClient.connect().then((data) => {
1✔
68
      logger.info('Redis client connection for repeatable job store successful');
1✔
69
    }).catch(err => logger.error('Redis client error for repeatable job store', err));
×
70

71
    this.canceledJobs = new Set<string>();
1✔
72
    this.cfg = cfg;
1✔
73
    this.authZ = authZ;
1✔
74
    this.authZCheck = this.cfg.get('authorization:enabled');
1✔
75

76
    // Read Queue Configuration file and find first queue which has "default": true,
77
    // then save it to defaultQueueName
78
    const queuesCfg = this.cfg.get('queue');
1✔
79
    if (_.isEmpty(queuesCfg)) {
1!
80
      this.logger.error('Queue configuration not found!');
×
81
      throw new Error('Queue configuration not found!');
×
82
    }
83
    let defaultTrueExists = false;
1✔
84
    for (let queueCfg of queuesCfg) {
1✔
85
      // Find configuration which has default=true
86
      if (queueCfg.default == true) {
1!
87
        defaultTrueExists = true;
1✔
88
        this.defaultQueueName = queueCfg.name;
1✔
89
        break;
1✔
90
      }
91
    }
92
    if (!defaultTrueExists) {
1!
93
      this.logger.error('Queue default configuration not found!');
×
94
      throw new Error('Queue default configuration not found!');
×
95
    }
96

97
    // Create Queues
98
    for (let queueCfg of queuesCfg) {
1✔
99
      let queueOptions: QueueOptions;
100
      const prefix = queueCfg.name;
3✔
101
      const rateLimiting = queueCfg.rateLimiting;
3✔
102
      const advancedSettings = queueCfg.advancedSettings;
3✔
103

104
      queueOptions = {
3✔
105
        connection: {
106
          ...redisConfig,
107
        }
108
      };
109

110
      // Create Queue Configuration - Add Rate Limiting if enabled
111
      if (!_.isEmpty(rateLimiting) && rateLimiting.enabled == true) {
3!
112
        this.logger.info(`Queue: ${queueCfg.name} - Rate limiting is ENABLED.`);
×
113
      }
114

115
      if (!_.isEmpty(advancedSettings)) {
3!
116
        queueOptions.settings = {
3✔
117
          ...advancedSettings,
118
        };
119
      }
120

121
      const redisURL = new URL((queueOptions.connection as any).url);
3✔
122

123
      if ('keyPrefix' in queueOptions.connection) {
3!
124
        delete queueOptions.connection.keyPrefix;
×
125
      }
126

127
      let queue = new Queue(prefix, {
3✔
128
        ...queueOptions,
129
        connection: {
130
          ...queueOptions.connection as any,
131
          host: redisURL.hostname,
132
          port: parseInt(redisURL.port)
133
        }
134
      });
135
      this.queuesList.push(queue);
3✔
136

137
      // Add Queue Configurations
138
      let queueCfgObj = {
3✔
139
        name: queueCfg.name,
140
        concurrency: queueCfg.concurrency,
141
        default: queueCfg.default,
142
        runMissedScheduled: queueCfg.runMissedScheduled
143
      };
144
      this.queuesConfigList.push(queueCfgObj);
3✔
145
    }
146
  }
147

148
  /**
149
   * Start handling the job queue, job scheduling and
150
   * managing job events.
151
   */
152
  async start(): Promise<any> {
153
    const logger = this.logger;
1✔
154
    const that = this;
1✔
155
    const events = [JOB_DONE_EVENT, JOB_FAILED_EVENT];
1✔
156
    for (let eventName of events) {
1✔
157
      // A Scheduling Service Event Listener
158
      await this.jobEvents.on(eventName, async (msg: any, ctx: any,
2✔
159
        config: any, eventName: string): Promise<any> => {
160
        let job = msg;
6✔
161
        // Match Job Type to Queue Name, else use Default Queue
162
        let queue = _.find(this.queuesList, { name: job.type });
6✔
163
        let defaultQueue = _.find(this.queuesList, { name: this.defaultQueueName });
6✔
164
        if (_.isEmpty(queue)) {
6!
165
          queue = defaultQueue;
×
166
        }
167

168
        if (eventName === JOB_FAILED_EVENT) {
6!
169
          logger.error(`job@${job.type}#${job.id} failed with error #${job.error}`,
×
170
            _filterQueuedJob<JobFailed>(job, this.logger));
171
        } else if (eventName === JOB_DONE_EVENT) {
6!
172
          logger.verbose(`job#${job.id} done`, _filterQueuedJob<JobDone>(job, this.logger));
6✔
173
        }
174

175
        logger.info('Received Job event', { event: eventName });
6✔
176
        logger.info('Job details', job);
6✔
177
        const jobData: any = await queue.getJob(job.id).catch(error => {
6✔
178
          that.logger.error('Error retrieving job ${job.id} from queue', error);
×
179
        });
180

181
        if (job?.delete_scheduled) {
6✔
182
          await queue.removeRepeatable(jobData.name, jobData.opts.repeat);
1✔
183
        }
184
      });
185
    }
186

187
    // Initialize Event Listeners for each Queue
188
    for (let queue of this.queuesList) {
1✔
189
      queue.on('error', (error) => {
3✔
190
        logger.error('queue error', error);
×
191
      });
192
      queue.on('waiting', (job) => {
3✔
193
        logger.verbose(`job#${job.id} scheduled`, job);
9✔
194
      });
195
      queue.on('removed', (job) => {
3✔
196
        logger.verbose(`job#${job.id} removed`, job);
13✔
197
      });
198
      queue.on('progress', (job) => {
3✔
199
        logger.verbose(`job#${job.id} progress`, job);
×
200
      });
201
    }
202

203
    // If the scheduling service goes down and if there were
204
    // recurring jobs which have missed schedules then
205
    // we will need to reschedule it for those missing intervals.
206
    await this._rescheduleMissedJobs();
1✔
207
  }
208

209
  /**
210
   * To reschedule the missed recurring jobs upon service restart
211
   */
212
  async _rescheduleMissedJobs(): Promise<void> {
213
    // for jobs created via Kafka currently there are no acs checks
214
    this.disableAC();
1✔
215
    const createDispatch = [];
1✔
216
    let result: Job[] = [];
1✔
217
    let thiz = this;
1✔
218

219
    // Get the jobs
220
    for (let queueCfg of this.queuesConfigList) {
1✔
221
      // If enabled in the config, or the config is missing,b
222
      // Reschedule the missed jobs, else skip.
223
      let queue = _.find(this.queuesList, { name: queueCfg.name });
3✔
224
      let runMissedScheduled = queueCfg.runMissedScheduled;
3✔
225
      if (_.isNil(runMissedScheduled) ||
3!
226
        (!_.isNil(runMissedScheduled) && runMissedScheduled == true)) {
227
        await queue.getJobs(this.bullOptions['activeAndFutureJobTypes']).then(jobs => {
3✔
228
          result = result.concat(jobs);
3✔
229
        }).catch(error => {
230
          thiz.logger.error('Error reading jobs to reschedule the missed recurring jobs', error);
×
231
        });
232
      }
233
    }
234
    let lastRunTime;
235
    for (let job of result) {
1✔
236
      // get the last run time for the job, we store the last run time only
237
      // for recurring jobs
238
      if (job?.name) {
×
239
        try {
×
240
          lastRunTime = await this.redisClient.get(job.name);
×
241
        } catch (err) {
242
          this.logger.error('Error reading the last run time for job type:', { name: job.name, code: err.code, message: err.message, stack: err.stack });
×
243
        }
244
      }
245
      // we store lastRunTime only for recurring jobs and if it exists check
246
      // cron interval and schedule immediate jobs for missed intervals
247
      this.logger.info(`Last run time of ${job.name} Job was:`, lastRunTime);
×
248
      if (lastRunTime) {
×
249
        // convert redis string value to object and get actual time value
250
        try {
×
251
          lastRunTime = JSON.parse(lastRunTime);
×
252
        } catch (error) {
253
          this.logger.error('Error parsing lastRunTime', {
×
254
            code: error.code,
255
            message: error.message, stack: error.stack
256
          });
257
        }
258

259
        if ((job?.opts?.repeat as any)?.pattern && lastRunTime?.time) {
×
260
          let options = {
×
261
            currentDate: new Date(lastRunTime.time),
262
            endDate: new Date(),
263
            iterator: true
264
          };
265
          let intervalTime;
266
          try {
×
267
            intervalTime =
×
268
              parseExpression((job.opts.repeat as any).pattern, options);
269
          } catch (error) {
270
            this.logger.error('Error parsing cron expression running missed schedules', { code: error.code, message: error.message, stack: error.stack });
×
271
          }
272
          while (intervalTime?.hasNext()) {
×
273
            let nextInterval: any = intervalTime.next();
×
274
            const nextIntervalTime = nextInterval.value.toString();
×
275
            // schedule it as one time job for now or immediately
276
            const data = {
×
277
              payload: marshallProtobufAny({
278
                value: { time: nextIntervalTime }
279
              })
280
            };
281
            const currentTime = new Date();
×
282
            const when = new Date(currentTime.setSeconds(currentTime.getSeconds() + 2)).toISOString();
×
283
            const immediateJob: any = {
×
284
              type: job.name,
285
              data,
286
              // give a delay of 2 sec between each job
287
              // to avoid time out of queued jobs
288
              when,
289
              options: {}
290
            };
291
            createDispatch.push(thiz.create({
×
292
              items: [immediateJob],
293
              total_count: 0,
294
            }, {}));
295
          }
296
        }
297
      }
298
    }
299
    this.restoreAC();
1✔
300
    await createDispatch;
1✔
301
  }
302

303
  /**
304
   * Disabling of CRUD events.
305
   */
306
  disableEvents(): void {
307
    this.resourceEventsEnabled = false;
×
308
  }
309

310
  /**
311
   * Enabling of CRUD events.
312
   */
313
  enableEvents(): any {
314
    this.resourceEventsEnabled = true;
×
315
  }
316

317
  _validateJob(job: NewJob): NewJob {
318
    if (_.isNil(job.type)) {
11!
319
      throw new errors.InvalidArgument('Job type not specified.');
×
320
    }
321

322
    if (!job.options) {
11!
323
      job.options = {};
×
324
    }
325

326
    if (job.when) {
11✔
327
      // If the jobSchedule time has already lapsed then do not schedule the job
328
      const jobScheduleTime = new Date(job.when).getTime();
7✔
329
      const currentTime = new Date().getTime();
7✔
330
      if (jobScheduleTime < currentTime) {
7!
331
        throw new errors.InvalidArgument('Cannot schedule a job for an elapsed time');
×
332
      }
333

334
      job.options.delay = jobScheduleTime - currentTime;
7✔
335
    }
336

337
    if (job.options.backoff && typeof job.options.backoff !== 'number') {
11!
338
      if (typeof job.options.backoff.type === 'number') {
11!
339
        job.options.backoff.type = Object.keys(Backoff_Type)[job.options.backoff.type];
×
340
      }
341
      job.options.backoff.type = job.options.backoff.type.toLowerCase();
11✔
342
    }
343

344
    if (job.options.priority && typeof job.options.priority === 'string') {
11!
345
      job.options.priority = JobOptions_Priority[job.options.priority] as any;
11✔
346
    }
347

348
    if (_.isEmpty(job.data)) {
11!
349
      throw new errors.InvalidArgument('No job data specified.');
×
350
    }
351

352
    job.data = _filterJobData(job.data, false, this.logger);
11✔
353

354
    return job;
11✔
355
  }
356

357
  /**
358
   * get next job execution time in mili seconds
359
   * @param millis
360
   * @param opts
361
   */
362
  getNextMillis(millis, opts) {
363
    if (opts?.every) {
1!
364
      return Math.floor(millis / opts.every) * opts.every + opts.every;
×
365
    }
366

367
    const currentDate =
368
      opts?.startDate && new Date(opts.startDate) > new Date(millis)
1!
369
        ? new Date(opts.startDate)
370
        : new Date(millis);
371
    const interval = parseExpression(
1✔
372
      opts.cron,
373
      _.defaults(
374
        {
375
          currentDate
376
        },
377
        opts
378
      )
379
    );
380

381
    try {
1✔
382
      return interval.next().getTime();
1✔
383
    } catch (e) {
384
      this.logger.error('Error getting next job execution time');
×
385
    }
386
  }
387

388
  private md5(str) {
389
    return crypto
×
390
      .createHash('md5')
391
      .update(str)
392
      .digest('hex');
393
  }
394

395
  /**
396
   * store the mapping from repeateKey to external interface SCS job Id, and
397
   * also the mapping other way around i.e. from SCS job Id to repeatKey (needed for read operations)
398
   * @param name - job name
399
   * @param repeat - job repeate options
400
   * @param jobId - job id
401
   */
402
  async storeRepeatKey(repeatId, scsJobId, options) {
403
    try {
11✔
404
      if (repeatId && scsJobId) {
11!
405
        this.logger.info('Repeat key mapped to external SCS JobId', { repeatId, scsJobId });
11✔
406
        await this.repeatJobIdRedisClient.set(repeatId, scsJobId);
11✔
407
        const jobIdData = { repeatId, options };
11✔
408
        await this.repeatJobIdRedisClient.set(scsJobId, JSON.stringify(jobIdData));
11✔
409
      }
410
    } catch (error) {
411
      this.logger.error('Error storing repeatKey to redis', {
×
412
        code: error.code,
413
        message: error.message, stack: error.stack
414
      });
415
    }
416
  }
417

418
  private idGen(): string {
419
    return uuid.v4().replace(/-/g, '');
8✔
420
  }
421

422
  /**
423
   * Create and queue jobs.
424
   * @param {any} call RPC call argument
425
   * @param {any} ctx RPC context
426
   */
427
  async create(request: JobList, ctx: any): Promise<DeepPartial<JobListResponse>> {
428
    let jobListResponse: JobListResponse = { items: [], operation_status: { code: 0, message: '' }, total_count: 0 };
9✔
429
    let subject = request.subject;
9✔
430
    if (_.isNil(request) || _.isNil(request.items)) {
9!
431
      return {
×
432
        items: [],
433
        total_count: 0,
434
        operation_status: {
435
          code: 400,
436
          message: 'Missing items in create request'
437
        }
438
      };
439
    }
440

441
    await this.createMetadata(request.items, AuthZAction.CREATE, subject);
9✔
442
    let acsResponse: DecisionResponse;
443
    try {
9✔
444
      if (!ctx) { ctx = {}; };
9!
445
      ctx.subject = subject;
9✔
446
      ctx.resources = request?.items;
9✔
447
      acsResponse = await checkAccessRequest(ctx, [{
9✔
448
        resource: 'job',
449
        id: request.items.map(item => item.id)
12✔
450
      }], AuthZAction.CREATE, Operation.isAllowed);
451
    } catch (err) {
452
      this.logger.error('Error requesting access-control-srv for create meta data', { code: err.code, message: err.message, stack: err.stack });
×
453
      return {
×
454
        items: [],
455
        total_count: 0,
456
        operation_status: {
457
          code: err.code,
458
          message: err.message
459
        }
460
      };
461
    }
462
    if (acsResponse.decision != Response_Decision.PERMIT) {
9✔
463
      return { items: [], total_count: 0, operation_status: acsResponse.operation_status };
1✔
464
    }
465

466
    let jobs: NewJob[] = [];
8✔
467
    for (let job of request?.items || []) {
8!
468
      try {
11✔
469
        jobs.push(this._validateJob(job as any));
11✔
470
      } catch (err) {
471
        this.logger.error('Error validating job', job);
×
472
        jobListResponse.items.push({
×
473
          status: {
474
            id: job.id,
475
            code: 400,
476
            message: err.message
477
          }
478
        });
479
      }
480
    }
481

482
    let result: Job[] = [];
8✔
483
    // Scheduling jobs
484
    for (let i = 0; i < jobs.length; i += 1) {
8✔
485
      let job = jobs[i];
11✔
486
      // if not jobID is specified generate a UUID
487
      if (!job.id) {
11✔
488
        job.id = this.idGen();
8✔
489
      }
490

491
      // map the id to jobId as needed in JobOpts for bull
492
      if (job?.id) {
11!
493
        // check if jobID already exists then map it as already exists error
494
        const existingJobId = await this.getRedisValue(job.id);
11✔
495
        if (existingJobId) {
11!
496
          // read job to check if data exists
497
          const jobData = await this.read(JobReadRequest.fromPartial({
×
498
            filter: {
499
              job_ids: [job.id]
500
            },
501
            subject
502
          }), ctx);
503
          if ((jobData?.items as any)[0]?.payload) {
×
504
            jobListResponse.items.push({
×
505
              status: {
506
                id: job.id,
507
                code: 403,
508
                message: `Job with ID ${job.id} already exists`
509
              }
510
            });
511
            continue;
×
512
          }
513
        }
514
        if (!job?.options) {
11!
515
          job.options = { jobId: job.id };
×
516
        } else {
517
          job.options.jobId = job.id;
11✔
518
        }
519
        if (job?.options?.repeat) {
11✔
520
          (job as any).options.repeat.jobId = job.id;
2✔
521
        }
522
      }
523

524
      if (!job.data.meta) {
11!
525
        const now = new Date();
×
526
        const metaObj = {
×
527
          created: now,
528
          modified: now,
529
          modified_by: '',
530
          owners: []
531
        };
532
        Object.assign(job.data, { meta: metaObj });
×
533
      }
534
      // if only owners are specified in meta
535
      if (job.data.meta && (!job.data.meta.created || !job.data.meta.modified)) {
11✔
536
        job.data.meta.created = new Date();
9✔
537
        job.data.meta.modified = new Date();
9✔
538
      }
539

540
      if (job?.data?.payload?.value) {
11!
541
        job.data.payload.value = job.data.payload.value.toString() as any;
11✔
542
      }
543

544
      // convert enum priority back to number as it's expected by bull
545
      if (job?.options?.priority) {
11!
546
        job.options.priority = typeof job.options.priority === 'number' ? job.options.priority : Priority[job.options.priority] as unknown as number;
11!
547
      }
548

549
      // if its a repeat job and tz is empty delete the key (else cron parser throws an error)
550
      if (job?.options?.repeat?.tz === '') {
11!
551
        delete job.options.repeat.tz;
×
552
      }
553

554
      const bullOptions = {
11✔
555
        ...job.options
556
      };
557

558
      if ((bullOptions as any).timeout === 1) {
11✔
559
        delete bullOptions['timeout'];
2✔
560
      }
561

562
      // Match the Job Type with the Queue Name and add the Job to this Queue.
563
      // If there is no match, add the Job to the Default Queue
564
      let queue = _.find(this.queuesList, { name: job?.queue_name });
11✔
565
      if (_.isEmpty(queue)) {
11!
566
        queue = _.find(this.queuesList, { name: this.defaultQueueName });
×
567
      }
568
      const submittedJob = await queue.add(job.type, job.data, bullOptions);
11✔
569
      if (submittedJob?.id?.startsWith('repeat:')) {
11✔
570
        const repeatJobId = submittedJob?.id?.split(':')[1];
2✔
571
        await this.storeRepeatKey(repeatJobId, job.id, job.options);
2✔
572
      } else if (submittedJob?.id) {
9!
573
        // future job with when
574
        await this.storeRepeatKey(submittedJob.id, job.id, job.options);
9✔
575
      }
576
      this.logger.verbose(`job@${job.type} created`, job);
11✔
577
      result.push(submittedJob);
11✔
578
    }
579

580
    for (let job of result) {
8✔
581
      let jobId = job.id as string;
11✔
582
      if (jobId.startsWith('repeat:')) {
11✔
583
        const repeatKey = jobId.split(':')[1];
2✔
584
        job.id = await this.getRedisValue(repeatKey);
2✔
585
      }
586
    }
587

588
    for (let job of result) {
8✔
589
      let when = job?.opts?.delay ? new Date(job?.opts?.delay).toString() : '';
11✔
590
      jobListResponse.items.push({
11✔
591
        payload: {
592
          id: job.id as string,
593
          type: job.name,
594
          queue_name: job?.queueName,
595
          data: _filterJobData(job.data, true, this.logger),
596
          options: _filterJobOptions(job.opts) as any,
597
          when
598
        },
599
        status: {
600
          id: (job?.id)?.toString(),
601
          code: 200,
602
          message: 'success'
603
        }
604
      });
605
    }
606
    const jobList = {
8✔
607
      items: result.map(job => ({
11✔
608
        id: job.id,
609
        type: job.name,
610
        data: _filterJobData(job.data, true, this.logger),
611
        options: _filterJobOptions(job.opts)
612
      })),
613
      total_count: result.length
614
    };
615

616
    if (this.resourceEventsEnabled) {
8!
617
      await this.jobEvents.emit('jobsCreated', jobList);
8✔
618
    }
619

620
    jobListResponse.operation_status = { code: 200, message: 'success' };
8✔
621
    return jobListResponse;
8✔
622
  }
623

624
  private filterByOwnerShip(customArgsObj, result) {
625
    // applying filter based on custom arguments (filterByOwnerShip)
626
    let customArgs = (customArgsObj)?.custom_arguments;
24✔
627
    if (customArgs?.value) {
24!
628
      let customArgsFilter;
629
      try {
24✔
630
        customArgsFilter = JSON.parse(customArgs.value.toString());
24✔
631
      } catch (error) {
632
        this.logger.error('Error parsing custom query arguments', {
×
633
          code: error.code,
634
          message: error.message, stack: error.stack
635
        });
636
      }
637
      if (!customArgsFilter) {
24!
638
        return [];
×
639
      }
640
      const ownerIndicatorEntity = customArgsFilter.entity;
24✔
641
      const ownerValues = customArgsFilter.instance;
24✔
642
      const ownerIndictaorEntURN = this.cfg.get('authorization:urns:ownerIndicatoryEntity');
24✔
643
      const ownerInstanceURN = this.cfg.get('authorization:urns:ownerInstance');
24✔
644
      result = result.filter(job => {
24✔
645
        if (job?.data?.meta?.owners?.length > 0) {
36!
646
          for (let owner of job.data.meta.owners) {
36✔
647
            if (owner?.id === ownerIndictaorEntURN && owner?.value === ownerIndicatorEntity && owner?.attributes?.length > 0) {
36!
648
              for (let ownerInstObj of owner.attributes) {
36✔
649
                if (ownerInstObj?.id === ownerInstanceURN && ownerInstObj?.value && ownerValues.includes(ownerInstObj.value)) {
36!
650
                  return job;
36✔
651
                }
652
              }
653
            }
654
          }
655
        }
656
      });
657
    }
658
    return result;
24✔
659
  }
660

661
  async deleteRedisKey(key: string): Promise<any> {
662
    try {
4✔
663
      await this.repeatJobIdRedisClient.del(key);
4✔
664
      this.logger.debug('Redis Key deleted successfully used for mapping repeatable jobID', { key });
4✔
665
    } catch (err) {
666
      this.logger.error('Error deleting redis key', { key, msg: err.message, stack: err.stack });
×
667
    }
668
  }
669

670
  async getRedisValue(key: string): Promise<any> {
671
    let redisValue;
672
    try {
46✔
673
      if (key) {
46!
674
        redisValue = await this.repeatJobIdRedisClient.get(key);
46✔
675
      }
676
      if (redisValue) {
46✔
677
        return JSON.parse(redisValue);
33✔
678
      } else {
679
        return;
13✔
680
      }
681
    } catch (err) {
682
      if (err?.message?.startsWith('Unexpected token') || err.message.startsWith('Unexpected number')) {
13✔
683
        return redisValue;
2✔
684
      } else {
685
        this.logger.error('Error reading redis key', { key, msg: err.message, stack: err.stack });
11✔
686
      }
687
    }
688
  }
689

690

691
  /**
692
   * Retrieve jobs.
693
   * @param {any} call RPC call argument
694
   * @param {any} ctx RPC context
695
   */
696
  async read(request: JobReadRequest, ctx: any): Promise<DeepPartial<JobListResponse>> {
697
    let jobListResponse: JobListResponse = { items: [], operation_status: { code: 0, message: '' }, total_count: 0 };
27✔
698
    let subject = request.subject;
27✔
699
    let acsResponse: PolicySetRQResponse;
700
    try {
27✔
701
      if (!ctx) { ctx = {}; };
27!
702
      ctx.subject = subject;
27✔
703
      ctx.resources = [];
27✔
704
      acsResponse = await checkAccessRequest(ctx, [{ resource: 'job' }], AuthZAction.READ,
27✔
705
        Operation.whatIsAllowed) as PolicySetRQResponse;
706
    } catch (err) {
707
      this.logger.error('Error requesting access-control-srv for read operation', { code: err.code, message: err.message, stack: err.stack });
×
708
      return {
×
709
        operation_status: {
710
          code: err.code,
711
          message: err.message
712
        }
713
      };
714
    }
715
    if (acsResponse.decision != Response_Decision.PERMIT) {
27✔
716
      return { operation_status: acsResponse.operation_status };
3✔
717
    }
718

719
    let result: Job[] = [];
24✔
720
    if (_.isEmpty(request) || _.isEmpty(request.filter)
24!
721
      && (!request.filter || !request.filter.job_ids
722
        || _.isEmpty(request.filter.job_ids))
723
      && (!request.filter || !request.filter.type ||
724
        _.isEmpty(request.filter.type))) {
725
      result = await this._getJobList();
12✔
726
      let custom_arguments;
727
      if (acsResponse?.custom_query_args?.length > 0) {
12!
728
        custom_arguments = acsResponse.custom_query_args[0].custom_arguments;
12✔
729
      }
730
      result = this.filterByOwnerShip({ custom_arguments }, result);
12✔
731
    } else {
732
      const that = this;
12✔
733
      let jobIDs = request.filter.job_ids || [];
12✔
734
      if (!_.isArray(jobIDs)) {
12!
735
        jobIDs = [jobIDs];
×
736
      }
737
      const typeFilterName = request.filter.type;
12✔
738

739
      // Search in all the queues and retrieve jobs after JobID
740
      // and add them to the jobIDsCopy list.
741
      // If the job is not found in any of the queues, continue looking
742
      // Finally compare the two lists and add an error to status for which
743
      // job could not be found in any of the queues.
744
      if (jobIDs.length > 0) {
12✔
745
        // jobIDsCopy should contain the jobIDs duplicate values
746
        // after the for loop ends
747
        let jobIDsCopy: string[] = [];
11✔
748
        for (let jobID of jobIDs) {
11✔
749
          const jobIdData = await this.getRedisValue(jobID as string);
11✔
750
          // future jobs scheduled with `when` will have same repeatId as external SCS jobID
751
          if (jobIdData?.repeatId && (jobIdData.repeatId != jobID)) {
11✔
752
            const repeatId = jobIdData.repeatId;
1✔
753
            if (jobIdData?.options?.repeat?.cron && jobIdData?.options?.repeat?.every) {
1!
754
              jobListResponse.items.push({
×
755
                status: {
756
                  id: jobID.toString(),
757
                  code: 400,
758
                  message: 'Both .cron and .every options are defined for this repeatable job'
759
                }
760
              });
761
              continue;
×
762
            }
763
            const nextMillis = this.getNextMillis(Date.now(), jobIdData.options.repeat);
1✔
764
            this.logger.debug('Repeatable job identifier', { id: jobID, repeatId: `repeat:${repeatId}:${nextMillis}` });
1✔
765
            // map the repeatKey with nextmilis for bull repeatable jobID
766
            jobID = `repeat:${repeatId}:${nextMillis}`;
1✔
767
          }
768
          for (let queue of this.queuesList) {
11✔
769
            await new Promise((resolve, reject) => {
33✔
770
              // getJob returns job or null
771
              queue.getJob(jobID).then(job => {
33✔
772
                if (job?.opts?.repeat?.pattern) {
33✔
773
                  (job.opts.repeat as any).cron = job.opts.repeat.pattern;
1✔
774
                  delete job.opts.repeat.pattern;
1✔
775
                }
776
                resolve(job);
33✔
777
                if (!_.isEmpty(job)) {
33✔
778
                  result.push(job);
10✔
779
                  if ((job as any)?.opts?.repeat?.jobId) {
10✔
780
                    jobIDsCopy.push((job as any).opts.repeat.jobId);
1✔
781
                  } else {
782
                    jobIDsCopy.push(jobID);
9✔
783
                  }
784
                }
785
              }).catch(err => {
786
                that.logger.error(`Error reading job ${jobID}`, err);
×
787
                if (err?.code && typeof err.code === 'string') {
×
788
                  err.code = 500;
×
789
                }
790
                jobListResponse.items.push({
×
791
                  status: {
792
                    id: jobID.toString(),
793
                    code: err.code,
794
                    message: err.message
795
                  }
796
                });
797
              });
798
            });
799
          }
800
        }
801
        if (!_.isEqual(jobIDs.sort(), jobIDsCopy.sort())) {
11✔
802
          const jobIDsDiff = _.difference(jobIDs, jobIDsCopy);
1✔
803
          for (let jobId of jobIDsDiff) {
1✔
804
            jobListResponse.items.push({
1✔
805
              status: {
806
                id: jobId.toString(),
807
                code: 404,
808
                message: `Job ID ${jobId} not found in any of the queues`
809
              }
810
            });
811
          }
812
        }
813
      } else {
814
        try {
1✔
815
          let jobsList: any[] = [];
1✔
816
          for (let queue of this.queuesList) {
1✔
817
            const getJobsResult = await queue.getJobs(this.bullOptions['activeAndFutureJobTypes']);
3✔
818
            getJobsResult.forEach((job) => {
3✔
819
              if (job?.opts?.repeat?.pattern) {
5!
820
                (job.opts.repeat as any).cron = job.opts.repeat.pattern;
×
821
                delete job.opts.repeat.pattern;
×
822
              }
823
            });
824
            jobsList = jobsList.concat(getJobsResult);
3✔
825
          }
826
          result = jobsList;
1✔
827
        } catch (err) {
828
          that.logger.error('Error reading jobs', err);
×
829
          if (typeof err.code === 'string') {
×
830
            err.code = 500;
×
831
          }
832
          return {
×
833
            operation_status: {
834
              code: err.code,
835
              message: err.message
836
            }
837
          };
838
        }
839
      }
840

841
      if (typeFilterName) {
12✔
842
        result = result.filter(job => job.name === typeFilterName);
6✔
843
      }
844
      let custom_arguments;
845
      if (acsResponse?.custom_query_args?.length > 0) {
12!
846
        custom_arguments = acsResponse.custom_query_args[0].custom_arguments;
12✔
847
      }
848
      result = this.filterByOwnerShip({ custom_arguments }, result);
12✔
849
    }
850

851
    result = result.filter(valid => !!valid);
36✔
852

853
    if (!_.isEmpty(request) && !_.isEmpty(request.sort)
24✔
854
      && _.includes(['ASCENDING', 'DESCENDING'], request.sort)) {
855
      let sort;
856
      switch (request.sort) {
2!
857
        case JobReadRequest_SortOrder.DESCENDING:
858
          sort = 'desc';
1✔
859
          break;
1✔
860
        case JobReadRequest_SortOrder.ASCENDING:
861
          sort = 'asc';
1✔
862
          break;
1✔
863
        default:
864
          this.logger.error(`Unknown sort option ${sort}`);
×
865
      }
866
      result = _.orderBy(result, ['id'], [sort]);
2✔
867
    }
868

869
    for (let job of result) {
24✔
870
      let jobId = job.id as string;
36✔
871
      if (jobId.startsWith('repeat:')) {
36✔
872
        const repeatKey = jobId.split(':')[1];
11✔
873
        // it could be possible the redis repeat key is deleted on index 8 and old completed
874
        // jobs exist in data_store if delete on complete was not set to true for repeatable jobs
875
        const jobRedisId = await this.getRedisValue(repeatKey);
11✔
876
        if (jobRedisId) {
11✔
877
          job.id = jobRedisId;
1✔
878
        }
879
      }
880
    }
881

882
    for (let job of result) {
24✔
883
      let when = job?.opts?.delay ? new Date(job?.opts?.delay).toString() : '';
36!
884
      jobListResponse.items.push({
36✔
885
        payload: {
886
          id: job.id as string,
887
          type: job.name,
888
          queue_name: job.queueName,
889
          data: _filterJobData(job.data, true, this.logger),
890
          options: _filterJobOptions(job.opts) as any,
891
          when
892
        },
893
        status: {
894
          id: job.id.toString(),
895
          code: 200,
896
          message: 'success'
897
        }
898
      });
899
    }
900
    jobListResponse.total_count = jobListResponse?.items?.length;
24✔
901
    jobListResponse.operation_status = {
24✔
902
      code: 200,
903
      message: 'success'
904
    };
905
    return jobListResponse;
24✔
906
  }
907

908
  async _getJobList(): Promise<Job[]> {
909
    let jobsList: any[] = [];
14✔
910
    for (let queue of this.queuesList) {
14✔
911
      const getJobsResult = await queue.getJobs(this.bullOptions['activeAndFutureJobTypes']);
42✔
912
      getJobsResult.forEach((job) => {
42✔
913
        if (job?.opts?.repeat?.pattern) {
26!
914
          (job.opts.repeat as any).cron = job.opts.repeat.pattern;
×
915
          delete job.opts.repeat.pattern;
×
916
        }
917
      });
918
      jobsList = jobsList.concat(getJobsResult);
42✔
919
    }
920
    return jobsList;
14✔
921
  }
922

923
  // delete a job by its job instance after processing 'jobDone' / 'jobFailed'
924
  async _deleteJobInstance(jobId: string, queue: Queue): Promise<void> {
925
    return this._removeBullJob(jobId, queue);
×
926
  }
927

928
  /**
929
   * Delete Job from queue.
930
   */
931
  async delete(request: DeleteRequest, ctx: any): Promise<DeepPartial<DeleteResponse>> {
932
    let deleteResponse: DeleteResponse = { status: [], operation_status: { code: 0, message: '' } };
6✔
933
    if (_.isEmpty(request)) {
6!
934
      return {
×
935
        operation_status: {
936
          code: 400,
937
          message: 'No arguments provided for delete operation'
938
        }
939
      };
940
    }
941
    const subject = request?.subject;
6✔
942
    const jobIDs = request?.ids;
6✔
943
    let resources = [];
6✔
944
    let action;
945
    if (jobIDs) {
6✔
946
      action = AuthZAction.DELETE;
3✔
947
      if (_.isArray(jobIDs)) {
3!
948
        for (let id of jobIDs) {
3✔
949
          resources.push({ id });
3✔
950
        }
951
      } else {
952
        resources = [{ id: jobIDs }];
×
953
      }
954
      await this.createMetadata(resources, action, subject);
3✔
955
    }
956
    if (request.collection) {
6✔
957
      action = AuthZAction.DROP;
3✔
958
      resources = [{ collection: request.collection }];
3✔
959
    }
960
    let acsResponse: DecisionResponse;
961
    try {
6✔
962
      if (!ctx) { ctx = {}; };
6!
963
      ctx.subject = subject;
6✔
964
      ctx.resources = resources;
6✔
965
      acsResponse = await checkAccessRequest(ctx, [{ resource: 'job', id: jobIDs as string[] }], action,
6✔
966
        Operation.isAllowed);
967
    } catch (err) {
968
      this.logger.error('Error requesting access-control-srv for delete operation', { code: err.code, message: err.message, stack: err.stack });
×
969
      return {
×
970
        operation_status: {
971
          code: err.code,
972
          message: err.message
973
        }
974
      };
975
    }
976
    if (acsResponse.decision != Response_Decision.PERMIT) {
6✔
977
      return { operation_status: acsResponse.operation_status };
1✔
978
    }
979
    const dispatch = [];
5✔
980
    this.logger.info('Received delete request');
5✔
981
    if ('collection' in request && request.collection) {
5✔
982
      this.logger.verbose('Deleting all jobs');
2✔
983

984
      await this._getJobList().then(async (jobs) => {
2✔
985
        for (let job of jobs) {
2✔
986
          await job.remove();
5✔
987
          if (this.resourceEventsEnabled) {
5!
988
            dispatch.push(this.jobEvents.emit('jobsDeleted', { id: job.id }));
5✔
989
          }
990
          deleteResponse.status.push({
5✔
991
            id: job.id.toString(),
992
            code: 200,
993
            message: 'success'
994
          });
995
        }
996
      });
997
      // FLUSH redis DB index 8 used for mapping of repeat jobIds (since req is for dropping job collection)
998
      const delResp = await this.repeatJobIdRedisClient.flushDb();
2✔
999
      if (delResp) {
2!
1000
        this.logger.debug('Mapped keys for repeatable jobs deleted successfully');
2✔
1001
      } else {
1002
        this.logger.debug('Could not delete repeatable job keys');
×
1003
      }
1004
    } else if ('ids' in request) {
3!
1005
      this.logger.verbose('Deleting jobs by their IDs', { id: request.ids });
3✔
1006

1007
      for (let queue of this.queuesList) {
3✔
1008
        for (let jobDataKey of request.ids) {
9✔
1009
          let callback: Promise<boolean>;
1010
          const jobIdData = await this.getRedisValue(jobDataKey as string);
9✔
1011
          // future jobs scheduled with `when` will have same repeatId as external SCS jobID
1012
          if (jobIdData && jobIdData.repeatId && (jobIdData.repeatId != jobDataKey)) {
9✔
1013
            const jobs = await queue.getRepeatableJobs();
2✔
1014
            for (let job of jobs) {
2✔
1015
              if (job.id === jobDataKey) {
1!
1016
                this.logger.debug('Removing Repeatable job by key for jobId', { id: job.id });
1✔
1017
                callback = queue.removeRepeatableByKey(job.key);
1✔
1018
                deleteResponse.status.push({
1✔
1019
                  id: job.id,
1020
                  code: 200,
1021
                  message: 'success'
1022
                });
1023
                await this.deleteRedisKey(jobDataKey as string);
1✔
1024
                await this.deleteRedisKey(jobIdData.repeatId);
1✔
1025
                break;
1✔
1026
              }
1027
            }
1028
          } else {
1029
            callback = queue.getJob(jobDataKey).then(async (jobData) => {
7✔
1030
              if (jobData) {
7✔
1031
                try {
2✔
1032
                  await this._removeBullJob(jobData.id, queue);
2✔
1033
                  await this.deleteRedisKey(jobData.id);
2✔
1034
                  deleteResponse.status.push({
2✔
1035
                    id: jobData.id.toString(),
1036
                    code: 200,
1037
                    message: 'success'
1038
                  });
1039
                } catch (err) {
1040
                  if (typeof err?.code === 'string') {
×
1041
                    err.code = 500;
×
1042
                  }
1043
                  deleteResponse.status.push({
×
1044
                    id: jobData.id.toString(),
1045
                    code: err.code,
1046
                    message: err.message
1047
                  });
1048
                  return false;
×
1049
                }
1050
                return true;
2✔
1051
              }
1052
              return false;
5✔
1053
            });
1054
          }
1055

1056
          // since no CB is returned for removeRepeatableByKey by bull
1057
          if (!callback) {
9✔
1058
            if (this.resourceEventsEnabled) {
1!
1059
              dispatch.push(this.jobEvents.emit(
1✔
1060
                'jobsDeleted', { id: jobDataKey })
1061
              );
1062
            }
1063
          } else {
1064
            callback.then(() => {
8✔
1065
              if (this.resourceEventsEnabled) {
8!
1066
                dispatch.push(this.jobEvents.emit(
8✔
1067
                  'jobsDeleted', { id: jobDataKey })
1068
                );
1069
              }
1070
            }).catch(err => {
1071
              this.logger.error('Error deleting job', { id: jobDataKey });
×
1072
              if (typeof err?.code === 'number') {
×
1073
                err.code = 500;
×
1074
              }
1075
              deleteResponse.status.push({
×
1076
                id: jobDataKey.toString(),
1077
                code: err.code,
1078
                message: err.message
1079
              });
1080
            });
1081
          }
1082
        }
1083
      }
1084
    }
1085

1086
    await Promise.all(dispatch);
5✔
1087
    deleteResponse.operation_status = { code: 200, message: 'success' };
5✔
1088
    return deleteResponse;
5✔
1089
  }
1090

1091
  /**
1092
   * Clean up queues - removes complted and failed jobs from queue
1093
   * @param {any} job clean up job
1094
   */
1095
  async cleanupJobs(ttlAfterFinished) {
1096
    for (let queue of this.queuesList) {
×
1097
      try {
×
1098
        await queue.clean(ttlAfterFinished, 0, COMPLETED_JOB_STATE);
×
1099
        await queue.clean(ttlAfterFinished, 0, FAILED_JOB_STATE);
×
1100
      } catch (err) {
1101
        this.logger.error('Error cleaning up jobs', err);
×
1102
      }
1103
    }
1104
    this.logger.info('Jobs cleaned up successfully');
×
1105
    let lastExecutedInterval = { lastExecutedInterval: (new Date()).toString() };
×
1106
    await this.repeatJobIdRedisClient.set(QUEUE_CLEANUP, JSON.stringify(lastExecutedInterval));
×
1107
  }
1108

1109
  async setupCleanInterval(cleanInterval: number, ttlAfterFinished: number) {
1110
    if (!ttlAfterFinished) {
×
1111
      ttlAfterFinished = DEFAULT_CLEANUP_COMPLETED_JOBS;
×
1112
    }
1113
    const intervalData = await this.getRedisValue(QUEUE_CLEANUP);
×
1114
    let timeInMs, delta;
1115
    const now = new Date().getTime();
×
1116
    if (intervalData?.lastExecutedInterval && typeof (intervalData.lastExecutedInterval) === 'string') {
×
1117
      timeInMs = new Date(intervalData.lastExecutedInterval).getTime();
×
1118
      this.logger.debug('Previous execution interval', intervalData);
×
1119
      delta = now - timeInMs;
×
1120
      this.logger.debug('Clean interval and previous difference', { cleanInterval, difference: delta });
×
1121
    }
1122

1123
    if (delta && (delta < cleanInterval)) {
×
1124
      // use setTimeout and then create interval on setTimeout
1125
      this.logger.info('Restoring previous execution interval with set timeout', { time: cleanInterval - delta });
×
1126
      setTimeout(async () => {
×
1127
        await this.cleanupJobs(ttlAfterFinished);
×
1128
        setInterval(this.cleanupJobs.bind(this), cleanInterval, ttlAfterFinished);
×
1129
      }, cleanInterval - delta);
1130
    } else {
1131
      setInterval(this.cleanupJobs.bind(this), cleanInterval, ttlAfterFinished);
×
1132
      this.logger.info('Clean up job interval set successfully');
×
1133
    }
1134
  }
1135

1136
  /**
1137
   * Reschedules a job - deletes it and recreates it with a new generated ID.
1138
   */
1139
  async update(request: JobList, ctx: any): Promise<DeepPartial<JobListResponse>> {
1140
    let subject = request.subject;
3✔
1141
    // update meta data for owners information
1142
    await this.createMetadata(request.items, AuthZAction.MODIFY, subject);
3✔
1143
    let acsResponse: DecisionResponse;
1144
    try {
3✔
1145
      if (!ctx) { ctx = {}; };
3!
1146
      ctx.subject = subject;
3✔
1147
      ctx.resources = request?.items;
3✔
1148
      acsResponse = await checkAccessRequest(ctx,
3✔
1149
        [{ resource: 'job', id: request.items.map(item => item.id) }],
3✔
1150
        AuthZAction.MODIFY, Operation.isAllowed);
1151
    } catch (err) {
1152
      this.logger.error('Error requesting access-control-srv for update operation', { code: err.code, message: err.message, stack: err.stack });
×
1153
      return {
×
1154
        operation_status: {
1155
          code: err.code,
1156
          message: err.message
1157
        }
1158
      };
1159
    }
1160
    if (acsResponse.decision != Response_Decision.PERMIT) {
3✔
1161
      return { operation_status: acsResponse.operation_status };
1✔
1162
    }
1163
    if (_.isNil(request) || _.isNil(request.items)) {
2!
1164
      return {
×
1165
        operation_status: {
1166
          code: 400,
1167
          message: 'Missing items in update request'
1168
        }
1169
      };
1170
    }
1171

1172
    const mappedJobs = request?.items?.reduce((obj, job) => {
2✔
1173
      obj[job.id] = job;
2✔
1174
      return obj;
2✔
1175
    }, {});
1176

1177
    const jobData = await this.read(JobReadRequest.fromPartial(
2✔
1178
      {
1179
        filter: {
1180
          job_ids: Object.keys(mappedJobs)
1181
        },
1182
        subject
1183
      }
1184
    ), ctx);
1185

1186
    await this.delete(DeleteRequest.fromPartial({
2✔
1187
      ids: Object.keys(mappedJobs),
1188
      subject
1189
    }), {});
1190

1191
    const result = [];
2✔
1192

1193
    jobData?.items?.forEach(async (job) => {
2✔
1194
      const mappedJob = mappedJobs[job?.payload?.id];
2✔
1195
      let endJob = {
2✔
1196
        id: mappedJob.id,
1197
        type: mappedJob.type,
1198
        queue_name: job?.payload?.queue_name,
1199
        options: {
1200
          ...job.payload.options,
1201
          ...(mappedJob.options ? mappedJob.options : {})
2!
1202
        },
1203
        data: mappedJob.data || job.payload.data,
2!
1204
        when: mappedJob.when,
1205
      };
1206

1207
      if (endJob.when && endJob.options) {
2!
1208
        delete endJob.options['delay'];
2✔
1209
      }
1210

1211
      result.push(endJob);
2✔
1212
    });
1213

1214
    return this.create(JobList.fromPartial({
2✔
1215
      items: result,
1216
      subject
1217
    }), ctx);
1218
  }
1219

1220
  /**
1221
   * Upserts a job - creates a new job if it does not exist or update the
1222
   * existing one if it already exists.
1223
   */
1224
  async upsert(request: JobList, ctx: any): Promise<DeepPartial<JobListResponse>> {
1225
    let subject = request.subject;
2✔
1226
    await this.createMetadata(request.items, AuthZAction.MODIFY, subject);
2✔
1227
    let acsResponse: DecisionResponse;
1228
    try {
2✔
1229
      if (!ctx) { ctx = {}; };
2!
1230
      ctx.subject = subject;
2✔
1231
      ctx.resources = request.items;
2✔
1232
      acsResponse = await checkAccessRequest(ctx,
2✔
1233
        [{ resource: 'job', id: request.items.map(item => item.id) }],
2✔
1234
        AuthZAction.MODIFY, Operation.isAllowed);
1235
    } catch (err) {
1236
      this.logger.error('Error requesting access-control-srv for upsert operation', { code: err.code, message: err.message, stack: err.stack });
×
1237
      return {
×
1238
        operation_status: {
1239
          code: err.code,
1240
          message: err.message
1241
        }
1242
      };
1243
    }
1244

1245
    if (acsResponse.decision != Response_Decision.PERMIT) {
2✔
1246
      return { operation_status: acsResponse.operation_status };
1✔
1247
    }
1248
    if (_.isNil(request) || _.isNil(request.items)) {
1!
1249
      return { operation_status: { code: 400, message: 'Missing items in upsert request' } };
×
1250
    }
1251

1252
    let result = [];
1✔
1253

1254
    for (let eachJob of request.items) {
1✔
1255
      let jobExists = false;
1✔
1256
      let origJobId = _.cloneDeep(eachJob.id);
1✔
1257
      for (let queue of this.queuesList) {
1✔
1258
        const jobIdData = await this.getRedisValue(eachJob.id as string);
2✔
1259
        // future jobs scheduled with `when` will have same repeatId as external SCS jobID
1260
        if (jobIdData?.repeatId && (jobIdData.repeatId != origJobId)) {
2!
1261
          const repeatId = jobIdData.repeatId;
×
1262
          if (jobIdData?.options?.repeat?.cron && jobIdData?.options?.repeat?.every) {
×
1263
            result.push({
×
1264
              status: {
1265
                id: origJobId,
1266
                code: 400,
1267
                message: 'Both .cron and .every options are defined for this repeatable job'
1268
              }
1269
            });
1270
            continue;
×
1271
          }
1272
          const nextMillis = this.getNextMillis(Date.now(), jobIdData.options.repeat);
×
1273
          this.logger.debug('Repeatable job identifier', { id: eachJob.id, repeatId: `repeat:${repeatId}:${nextMillis}` });
×
1274
          // map the repeatKey with nextmilis for bull repeatable jobID
1275
          eachJob.id = `repeat:${repeatId}:${nextMillis}`;
×
1276
        }
1277
        const jobInst = await queue.getJob(eachJob.id);
2✔
1278
        if (jobInst) {
2✔
1279
          // existing job update it with the given job identifier
1280
          if (eachJob.id.startsWith('repeat:')) {
1!
1281
            eachJob.id = origJobId;
×
1282
          }
1283
          result = [
1✔
1284
            ...result,
1285
            ...(await this.update(JobList.fromPartial({ items: [eachJob], subject }), ctx)).items
1286
          ];
1287
          jobExists = true;
1✔
1288
          break;
1✔
1289
        }
1290
      }
1291
      if (!jobExists) {
1!
1292
        // new job create it
1293
        result = [
×
1294
          ...result,
1295
          ...(await this.create(JobList.fromPartial({ items: [eachJob], subject }), ctx)).items
1296
        ];
1297
      }
1298
    }
1299

1300
    return {
1✔
1301
      items: result,
1302
      total_count: result.length,
1303
      operation_status: {
1304
        code: 200,
1305
        message: 'success'
1306
      }
1307
    };
1308
  }
1309

1310
  /**
1311
   * Clear all job data.
1312
   */
1313
  async clear(): Promise<any> {
1314
    let allJobs: any[] = [];
1✔
1315
    for (let queue of this.queuesList) {
1✔
1316
      allJobs = allJobs.concat(await queue.getJobs(this.bullOptions['allJobTypes']));
3✔
1317
    }
1318
    return Promise.all(allJobs.map(async (job) => job.remove())).catch(err => {
6✔
1319
      this.logger.error(`Error clearing jobs`, err);
×
1320
      throw err;
×
1321
    });
1322
  }
1323

1324
  async _removeBullJob(jobInstID: string, queue: Queue): Promise<void> {
1325
    return queue.getJob(jobInstID).then(job => {
2✔
1326
      if (job) {
2!
1327
        return job.remove();
2✔
1328
      }
1329
    }).then(() => {
1330
      this.logger.info(`Job#${jobInstID} removed`);
2✔
1331
    }).catch(err => {
1332
      this.logger.error(`Error removing job ${jobInstID}`, err);
×
1333
      throw err;
×
1334
    });
1335
  }
1336

1337
  /**
1338
   *  disable access control
1339
   */
1340
  disableAC() {
1341
    try {
1✔
1342
      this.cfg.set('authorization:enabled', false);
1✔
1343
      updateConfig(this.cfg);
1✔
1344
    } catch (err) {
1345
      this.logger.error('Error caught disabling authorization:', { err });
×
1346
      this.cfg.set('authorization:enabled', this.authZCheck);
×
1347
    }
1348
  }
1349

1350
  /**
1351
   *  enables access control
1352
   */
1353
  enableAC() {
1354
    try {
1✔
1355
      this.cfg.set('authorization:enabled', true);
1✔
1356
      updateConfig(this.cfg);
1✔
1357
    } catch (err) {
1358
      this.logger.error('Error caught enabling authorization:', { err });
×
1359
      this.cfg.set('authorization:enabled', this.authZCheck);
×
1360
    }
1361
  }
1362

1363
  /**
1364
   *  restore AC state to previous vale either before enabling or disabling AC
1365
   */
1366
  restoreAC() {
1367
    try {
1✔
1368
      this.cfg.set('authorization:enabled', this.authZCheck);
1✔
1369
      updateConfig(this.cfg);
1✔
1370
    } catch (err) {
1371
      this.logger.error('Error caught enabling authorization:', { err });
×
1372
      this.cfg.set('authorization:enabled', this.authZCheck);
×
1373
    }
1374
  }
1375

1376
  /**
1377
   * reads meta data from DB and updates owners information in resource if action is UPDATE / DELETE
1378
   * @param resources list of resources
1379
   * @param action resource action
1380
   * @param subject subject name
1381
   */
1382
  async createMetadata(resources: any, action: string, subject): Promise<any> {
1383
    let orgOwnerAttributes: Attribute[] = [];
17✔
1384
    if (resources && !_.isArray(resources)) {
17!
1385
      resources = [resources];
×
1386
    }
1387
    const urns = this.cfg.get('authorization:urns');
17✔
1388
    if (subject?.scope && (action === AuthZAction.CREATE || action === AuthZAction.MODIFY)) {
17✔
1389
      // add subject scope as default owners
1390
      orgOwnerAttributes.push(
14✔
1391
        {
1392
          id: urns?.ownerIndicatoryEntity,
1393
          value: urns?.organization,
1394
          attributes: [{
1395
            id: urns?.ownerInstance,
1396
            value: subject?.scope,
1397
            attributes: []
1398
          }]
1399
        });
1400
    }
1401

1402
    if (resources?.length > 0) {
17!
1403
      for (let resource of resources) {
17✔
1404
        if (!resource.data) {
20✔
1405
          resource.data = { meta: {} };
3✔
1406
        } else if (!resource.data.meta) {
17✔
1407
          resource.data.meta = {};
12✔
1408
        }
1409
        if (resource?.id && (action === AuthZAction.MODIFY || action === AuthZAction.DELETE)) {
20✔
1410
          let result;
1411
          try {
8✔
1412
            result = await this.read(JobReadRequest.fromPartial({
8✔
1413
              filter: {
1414
                job_ids: [resource.id]
1415
              },
1416
              subject
1417
            }), {});
1418
          } catch (err) {
1419
            if (err.message.startsWith('Error! Jobs not found in any of the queues') && action != AuthZAction.DELETE) {
×
1420
              this.logger.debug('New job should be created', { jobId: resource.id });
×
1421
              result = { items: [] };
×
1422
            } else {
1423
              this.logger.error(`Error reading job with resource ID ${resource.id}`, { code: err.code, message: err.message, stack: err.stack });
×
1424
            }
1425
          }
1426
          // update owners info
1427
          if (result?.items?.length === 1 && result?.items[0]?.payload) {
8✔
1428
            let item = result.items[0].payload;
6✔
1429
            resource.data.meta.owners = item.data.meta.owners;
6✔
1430
            // adding meta to resource root (needed by access-contorl-srv for owners information check)
1431
            // meta is inside data of resource since the data is persisted in redis using bull
1432
            resource.meta = { owners: item.data.meta.owners };
6✔
1433
          } else if ((!result || !result.items || !result.items[0] || !result.items[0].payload) && action === AuthZAction.MODIFY) {
2!
1434
            // job does not exist - create new job (ex: Upsert with action modify)
1435
            let ownerAttributes = _.cloneDeep(orgOwnerAttributes);
2✔
1436
            // add user as default owners
1437
            ownerAttributes.push(
2✔
1438
              {
1439
                id: urns?.ownerIndicatoryEntity,
1440
                value: urns?.user,
1441
                attributes: [{
1442
                  id: urns?.ownerInstance,
1443
                  value: subject?.id,
1444
                  attributes: []
1445
                }]
1446
              });
1447
            resource.data.meta.owners = ownerAttributes;
2✔
1448
            resource.meta = { owners: ownerAttributes };
2✔
1449
          }
1450
        } else if ((action === AuthZAction.CREATE || !resource.id) && !resource.data.meta.owners) {
12!
1451
          let ownerAttributes = _.cloneDeep(orgOwnerAttributes);
10✔
1452
          // add user as default owners
1453
          if (resource.id) {
10✔
1454
            ownerAttributes.push(
2✔
1455
              {
1456
                id: urns.ownerIndicatoryEntity,
1457
                value: urns.user,
1458
                attributes: [{
1459
                  id: urns.ownerInstance,
1460
                  value: subject.id,
1461
                  attributes: []
1462
                }]
1463
              });
1464
          }
1465
          resource.data.meta.owners = ownerAttributes;
10✔
1466
          resource.meta = { owners: ownerAttributes };
10✔
1467
        } else if (action === AuthZAction.CREATE && resource?.data?.meta?.owners) {
2!
1468
          resource.meta = { owners: resource.data.meta.owners };
2✔
1469
        }
1470
      }
1471
    }
1472
    return resources;
17✔
1473
  }
1474
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc