• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

restorecommerce / scheduling-srv / 6261363370

21 Sep 2023 12:04PM UTC coverage: 68.577% (-0.09%) from 68.663%
6261363370

push

github

Arun-KumarH
chore: Release v1.1.0 - See CHANGELOG

271 of 451 branches covered (0.0%)

Branch coverage included in aggregate %.

30 of 30 new or added lines in 3 files covered. (100.0%)

698 of 962 relevant lines covered (72.56%)

6.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.67
/src/schedulingService.ts
1
import * as _ from 'lodash';
1✔
2
import { errors } from '@restorecommerce/chassis-srv';
1✔
3
import * as kafkaClient from '@restorecommerce/kafka-client';
4
import { AuthZAction, ACSAuthZ, updateConfig, DecisionResponse, Operation, PolicySetRQResponse } from '@restorecommerce/acs-client';
1✔
5
import {
1✔
6
  JobServiceImplementation as SchedulingServiceServiceImplementation,
7
  JobFailed, JobDone, DeepPartial, JobList, JobListResponse, Data,
8
  Backoff_Type, JobOptions_Priority, JobReadRequest, JobReadRequest_SortOrder
9
} from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/job';
10
import { createClient, RedisClientType } from 'redis';
1✔
11
import { NewJob, Priority } from './types';
1✔
12
import { parseExpression } from 'cron-parser';
1✔
13
import * as crypto from 'crypto';
1✔
14
import { _filterJobData, _filterJobOptions, _filterQueuedJob, checkAccessRequest, marshallProtobufAny } from './utilts';
1✔
15
import * as uuid from 'uuid';
1✔
16
import { Logger } from 'winston';
17
import { Response_Decision } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/access_control';
1✔
18
import { Attribute } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/attribute';
19
import { DeleteRequest, DeleteResponse } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/resource_base';
1✔
20
import { Queue, QueueOptions, Job } from 'bullmq';
1✔
21
import { parseInt } from 'lodash';
1✔
22

23
const JOB_DONE_EVENT = 'jobDone';
1✔
24
const JOB_FAILED_EVENT = 'jobFailed';
1✔
25
const DEFAULT_CLEANUP_COMPLETED_JOBS = 604800000; // 7 days in miliseconds
1✔
26
const COMPLETED_JOB_STATE = 'completed';
1✔
27
const FAILED_JOB_STATE = 'failed';
1✔
28
const QUEUE_CLEANUP = 'queueCleanup';
1✔
29

30
/**
31
 * A job scheduling service.
32
 */
33
export class SchedulingService implements SchedulingServiceServiceImplementation {
1✔
34

35
  jobEvents: kafkaClient.Topic;
36
  logger: Logger;
37

38
  queuesConfigList: any;
39
  queuesList: Queue[];
40
  defaultQueueName: string;
41

42
  redisClient: RedisClientType<any, any>;
43
  resourceEventsEnabled: boolean;
44
  canceledJobs: Set<string>;
45
  bullOptions: any;
46
  cfg: any;
47
  authZ: ACSAuthZ;
48
  authZCheck: boolean;
49
  repeatJobIdRedisClient: RedisClientType<any, any>;
50

51

52
  constructor(jobEvents: kafkaClient.Topic,
53
    private redisConfig: any, logger: any, redisClient: RedisClientType<any, any>,
1✔
54
    bullOptions: any, cfg: any, authZ: ACSAuthZ) {
55
    this.jobEvents = jobEvents;
1✔
56
    this.resourceEventsEnabled = true;
1✔
57
    this.bullOptions = bullOptions;
1✔
58
    this.logger = logger;
1✔
59
    this.queuesList = [];
1✔
60
    this.queuesConfigList = [];
1✔
61
    this.redisClient = redisClient;
1✔
62

63
    const repeatJobIdCfg = cfg.get('redis');
1✔
64
    repeatJobIdCfg.database = cfg.get('redis:db-indexes:db-repeatJobId');
1✔
65
    this.repeatJobIdRedisClient = createClient(repeatJobIdCfg);
1✔
66
    this.repeatJobIdRedisClient.on('error', (err) => logger.error('Redis client error in repeatable job store', err));
1✔
67
    this.repeatJobIdRedisClient.connect().then((data) => {
1✔
68
      logger.info('Redis client connection for repeatable job store successful');
1✔
69
    }).catch(err => logger.error('Redis client error for repeatable job store', err));
×
70

71
    this.canceledJobs = new Set<string>();
1✔
72
    this.cfg = cfg;
1✔
73
    this.authZ = authZ;
1✔
74
    this.authZCheck = this.cfg.get('authorization:enabled');
1✔
75

76
    // Read Queue Configuration file and find first queue which has "default": true,
77
    // then save it to defaultQueueName
78
    const queuesCfg = this.cfg.get('queue');
1✔
79
    if (_.isEmpty(queuesCfg)) {
1!
80
      this.logger.error('Queue configuration not found!');
×
81
      throw new Error('Queue configuration not found!');
×
82
    }
83
    let defaultTrueExists = false;
1✔
84
    for (let queueCfg of queuesCfg) {
1✔
85
      // Find configuration which has default=true
86
      if (queueCfg.default == true) {
1!
87
        defaultTrueExists = true;
1✔
88
        this.defaultQueueName = queueCfg.name;
1✔
89
        break;
1✔
90
      }
91
    }
92
    if (!defaultTrueExists) {
1!
93
      this.logger.error('Queue default configuration not found!');
×
94
      throw new Error('Queue default configuration not found!');
×
95
    }
96

97
    // Create Queues
98
    for (let queueCfg of queuesCfg) {
1✔
99
      let queueOptions: QueueOptions;
100
      const prefix = queueCfg.name;
3✔
101
      const rateLimiting = queueCfg.rateLimiting;
3✔
102
      const advancedSettings = queueCfg.advancedSettings;
3✔
103

104
      queueOptions = {
3✔
105
        connection: {
106
          ...redisConfig,
107
        }
108
      };
109

110
      // Create Queue Configuration - Add Rate Limiting if enabled
111
      if (!_.isEmpty(rateLimiting) && rateLimiting.enabled == true) {
3!
112
        this.logger.info(`Queue: ${queueCfg.name} - Rate limiting is ENABLED.`);
×
113
      }
114

115
      if (!_.isEmpty(advancedSettings)) {
3!
116
        queueOptions.settings = {
3✔
117
          ...advancedSettings,
118
        };
119
      }
120

121
      const redisURL = new URL((queueOptions.connection as any).url);
3✔
122

123
      if ('keyPrefix' in queueOptions.connection) {
3!
124
        delete queueOptions.connection.keyPrefix;
×
125
      }
126

127
      let queue = new Queue(prefix, {
3✔
128
        ...queueOptions,
129
        connection: {
130
          ...queueOptions.connection as any,
131
          host: redisURL.hostname,
132
          port: parseInt(redisURL.port)
133
        }
134
      });
135
      this.queuesList.push(queue);
3✔
136

137
      // Add Queue Configurations
138
      let queueCfgObj = {
3✔
139
        name: queueCfg.name,
140
        concurrency: queueCfg.concurrency,
141
        default: queueCfg.default,
142
        runMissedScheduled: queueCfg.runMissedScheduled
143
      };
144
      this.queuesConfigList.push(queueCfgObj);
3✔
145
    }
146
  }
147

148
  /**
149
   * Start handling the job queue, job scheduling and
150
   * managing job events.
151
   */
152
  async start(): Promise<any> {
153
    const logger = this.logger;
1✔
154
    const that = this;
1✔
155
    const events = [JOB_DONE_EVENT, JOB_FAILED_EVENT];
1✔
156
    for (let eventName of events) {
1✔
157
      // A Scheduling Service Event Listener
158
      await this.jobEvents.on(eventName, async (msg: any, ctx: any,
2✔
159
        config: any, eventName: string): Promise<any> => {
160
        let job = msg;
6✔
161
        // Match Job Type to Queue Name, else use Default Queue
162
        let queue = _.find(this.queuesList, { name: job.type });
6✔
163
        let defaultQueue = _.find(this.queuesList, { name: this.defaultQueueName });
6✔
164
        if (_.isEmpty(queue)) {
6!
165
          queue = defaultQueue;
×
166
        }
167

168
        if (eventName === JOB_FAILED_EVENT) {
6!
169
          logger.error(`job@${job.type}#${job.id} failed with error #${job.error}`,
×
170
            _filterQueuedJob<JobFailed>(job, this.logger));
171
        } else if (eventName === JOB_DONE_EVENT) {
6!
172
          logger.verbose(`job#${job.id} done`, _filterQueuedJob<JobDone>(job, this.logger));
6✔
173
        }
174

175
        logger.info('Received Job event', { event: eventName });
6✔
176
        logger.info('Job details', job);
6✔
177
        const jobData: any = await queue.getJob(job.id).catch(error => {
6✔
178
          that.logger.error('Error retrieving job ${job.id} from queue', error);
×
179
        });
180

181
        if (job?.delete_scheduled) {
6✔
182
          await queue.removeRepeatable(jobData.name, jobData.opts.repeat);
1✔
183
        }
184
      });
185
    }
186

187
    // Initialize Event Listeners for each Queue
188
    for (let queue of this.queuesList) {
1✔
189
      queue.on('error', (error) => {
3✔
190
        logger.error('queue error', error);
×
191
      });
192
      queue.on('waiting', (job) => {
3✔
193
        logger.verbose(`job#${job.id} scheduled`, job);
9✔
194
      });
195
      queue.on('removed', (job) => {
3✔
196
        logger.verbose(`job#${job.id} removed`, job);
13✔
197
      });
198
      queue.on('progress', (job) => {
3✔
199
        logger.verbose(`job#${job.id} progress`, job);
×
200
      });
201
    }
202

203
    // If the scheduling service goes down and if there were
204
    // recurring jobs which have missed schedules then
205
    // we will need to reschedule it for those missing intervals.
206
    await this._rescheduleMissedJobs();
1✔
207
  }
208

209
  /**
210
   * To reschedule the missed recurring jobs upon service restart
211
   */
212
  async _rescheduleMissedJobs(): Promise<void> {
213
    // for jobs created via Kafka currently there are no acs checks
214
    this.disableAC();
1✔
215
    const createDispatch = [];
1✔
216
    let result: Job[] = [];
1✔
217
    let thiz = this;
1✔
218

219
    // Get the jobs
220
    for (let queueCfg of this.queuesConfigList) {
1✔
221
      // If enabled in the config, or the config is missing,b
222
      // Reschedule the missed jobs, else skip.
223
      let queue = _.find(this.queuesList, { name: queueCfg.name });
3✔
224
      let runMissedScheduled = queueCfg.runMissedScheduled;
3✔
225
      if (_.isNil(runMissedScheduled) ||
3!
226
        (!_.isNil(runMissedScheduled) && runMissedScheduled == true)) {
227
        await queue.getJobs(this.bullOptions['activeAndFutureJobTypes']).then(jobs => {
3✔
228
          result = result.concat(jobs);
3✔
229
        }).catch(error => {
230
          thiz.logger.error('Error reading jobs to reschedule the missed recurring jobs', error);
×
231
        });
232
      }
233
    }
234
    let lastRunTime;
235
    for (let job of result) {
1✔
236
      // get the last run time for the job, we store the last run time only
237
      // for recurring jobs
238
      try {
×
239
        lastRunTime = await this.redisClient.get(job.name);
×
240
      } catch (err) {
241
        this.logger.error(
×
242
          'Error occurred reading the last run time for job type:',
243
          { name: job.name, err });
244
      }
245
      // we store lastRunTime only for recurring jobs and if it exists check
246
      // cron interval and schedule immediate jobs for missed intervals
247
      this.logger.info(`Last run time of ${job.name} Job was:`, lastRunTime);
×
248
      if (lastRunTime) {
×
249
        // convert redis string value to object and get actual time value
250
        try {
×
251
          lastRunTime = JSON.parse(lastRunTime);
×
252
        } catch (error) {
253
          this.logger.error('Error parsing lastRunTime', {
×
254
            code: error.code,
255
            message: error.message, stack: error.stack
256
          });
257
        }
258

259
        if ((job?.opts?.repeat as any)?.cron && lastRunTime) {
×
260
          let options = {
×
261
            currentDate: new Date(lastRunTime.time),
262
            endDate: new Date(),
263
            iterator: true
264
          };
265
          const intervalTime =
266
            parseExpression((job.opts.repeat as any).cron, options);
×
267
          while (intervalTime.hasNext()) {
×
268
            let nextInterval: any = intervalTime.next();
×
269
            const nextIntervalTime = nextInterval.value.toString();
×
270
            // schedule it as one time job for now or immediately
271
            const data = {
×
272
              payload: marshallProtobufAny({
273
                value: { time: nextIntervalTime }
274
              })
275
            };
276
            const currentTime = new Date();
×
277
            const immediateJob: any = {
×
278
              type: job.name,
279
              data,
280
              // give a delay of 2 sec between each job
281
              // to avoid time out of queued jobs
282
              when: currentTime.setSeconds(currentTime.getSeconds() + 2).toString(),
283
              options: {}
284
            };
285
            createDispatch.push(thiz.create({
×
286
              items: [immediateJob],
287
              total_count: 0,
288
            }, {}));
289
          }
290
        }
291
      }
292
    }
293
    this.restoreAC();
1✔
294
    await createDispatch;
1✔
295
  }
296

297
  /**
298
   * Disabling of CRUD events.
299
   */
300
  disableEvents(): void {
301
    this.resourceEventsEnabled = false;
×
302
  }
303

304
  /**
305
   * Enabling of CRUD events.
306
   */
307
  enableEvents(): any {
308
    this.resourceEventsEnabled = true;
×
309
  }
310

311
  _validateJob(job: NewJob): NewJob {
312
    if (_.isNil(job.type)) {
11!
313
      throw new errors.InvalidArgument('Job type not specified.');
×
314
    }
315

316
    if (!job.options) {
11!
317
      job.options = {};
×
318
    }
319

320
    if (job.when) {
11✔
321
      // If the jobSchedule time has already lapsed then do not schedule the job
322
      const jobScheduleTime = new Date(job.when).getTime();
7✔
323
      const currentTime = new Date().getTime();
7✔
324
      if (jobScheduleTime < currentTime) {
7!
325
        throw new errors.InvalidArgument('Cannot schedule a job for an elapsed time');
×
326
      }
327

328
      job.options.delay = jobScheduleTime - currentTime;
7✔
329
    }
330

331
    if (job.options.backoff && typeof job.options.backoff !== 'number') {
11!
332
      if (typeof job.options.backoff.type === 'number') {
11!
333
        job.options.backoff.type = Object.keys(Backoff_Type)[job.options.backoff.type];
×
334
      }
335
      job.options.backoff.type = job.options.backoff.type.toLowerCase();
11✔
336
    }
337

338
    if (job.options.priority && typeof job.options.priority === 'string') {
11!
339
      job.options.priority = JobOptions_Priority[job.options.priority] as any;
11✔
340
    }
341

342
    if (_.isEmpty(job.data)) {
11!
343
      throw new errors.InvalidArgument('No job data specified.');
×
344
    }
345

346
    job.data = _filterJobData(job.data, false, this.logger);
11✔
347

348
    return job;
11✔
349
  }
350

351
  /**
352
   * get next job execution time in mili seconds
353
   * @param millis
354
   * @param opts
355
   */
356
  getNextMillis(millis, opts) {
357
    if (opts.every) {
1!
358
      return Math.floor(millis / opts.every) * opts.every + opts.every;
×
359
    }
360

361
    const currentDate =
362
      opts.startDate && new Date(opts.startDate) > new Date(millis)
1!
363
        ? new Date(opts.startDate)
364
        : new Date(millis);
365
    const interval = parseExpression(
1✔
366
      opts.cron,
367
      _.defaults(
368
        {
369
          currentDate
370
        },
371
        opts
372
      )
373
    );
374

375
    try {
1✔
376
      return interval.next().getTime();
1✔
377
    } catch (e) {
378
      this.logger.error('Error getting next job execution time');
×
379
    }
380
  }
381

382
  private md5(str) {
383
    return crypto
4✔
384
      .createHash('md5')
385
      .update(str)
386
      .digest('hex');
387
  }
388

389
  /**
390
   * Bull generates the repeatKey for repeatable jobs based on jobID, name and
391
   * cron settings - so below api is to generate the same repeat key and store in redis
392
   * DB index and map it before making request to bull
393
   * @param name - job name
394
   * @param repeat - job repeate options
395
   * @param jobId - job id
396
   */
397
  async storeRepeatKey(name, options, jobId) {
398
    try {
2✔
399
      const repeat = options.repeat;
2✔
400
      const endDate = repeat.endDate
2!
401
        ? new Date(repeat.endDate).getTime() + ':'
402
        : ':';
403
      const tz = repeat.tz ? repeat.tz + ':' : ':';
2!
404
      const suffix = repeat.cron ? tz + repeat.cron : String(repeat.every);
2✔
405
      const overrRiddentJobId = repeat.jobId ? repeat.jobId + ':' : ':';
2!
406
      const md5Key = this.md5(name + ':' + overrRiddentJobId + endDate + suffix);
2✔
407
      const repeatKey = this.md5(name + jobId + ':' + md5Key);
2✔
408
      this.logger.info('Repeat key generated for JobId is', { repeatKey, jobId });
2✔
409
      const jobIdData = { repeatKey, options };
2✔
410
      // map jobID with jobIdData - containing repeatKey and options
411
      await this.repeatJobIdRedisClient.set(jobId, JSON.stringify(jobIdData));
2✔
412
      // to resolve the jobId based on repeatkey
413
      await this.repeatJobIdRedisClient.set(repeatKey, jobId);
2✔
414
      return repeatKey;
2✔
415
    } catch (error) {
416
      this.logger.error('Error storing repeatKey to redis', {
×
417
        code: error.code,
418
        message: error.message, stack: error.stack
419
      });
420
    }
421
  }
422

423
  private idGen(): string {
424
    return uuid.v4().replace(/-/g, '');
8✔
425
  }
426

427
  /**
428
   * Create and queue jobs.
429
   * @param {any} call RPC call argument
430
   * @param {any} ctx RPC context
431
   */
432
  async create(request: JobList, ctx: any): Promise<DeepPartial<JobListResponse>> {
433
    let jobListResponse: JobListResponse = { items: [], operation_status: { code: 0, message: '' }, total_count: 0 };
9✔
434
    let subject = request.subject;
9✔
435
    if (_.isNil(request) || _.isNil(request.items)) {
9!
436
      return {
×
437
        items: [],
438
        total_count: 0,
439
        operation_status: {
440
          code: 400,
441
          message: 'Missing items in create request'
442
        }
443
      };
444
    }
445

446
    await this.createMetadata(request.items, AuthZAction.CREATE, subject);
9✔
447
    let acsResponse: DecisionResponse;
448
    try {
9✔
449
      if (!ctx) { ctx = {}; };
9!
450
      ctx.subject = subject;
9✔
451
      ctx.resources = request?.items;
9✔
452
      acsResponse = await checkAccessRequest(ctx, [{
9✔
453
        resource: 'job',
454
        id: request.items.map(item => item.id)
12✔
455
      }], AuthZAction.CREATE, Operation.isAllowed);
456
    } catch (err) {
457
      this.logger.error('Error occurred requesting access-control-srv:', err);
×
458
      return {
×
459
        items: [],
460
        total_count: 0,
461
        operation_status: {
462
          code: err.code,
463
          message: err.message
464
        }
465
      };
466
    }
467
    if (acsResponse.decision != Response_Decision.PERMIT) {
9✔
468
      return { items: [], total_count: 0, operation_status: acsResponse.operation_status };
1✔
469
    }
470

471
    let jobs: NewJob[] = [];
8✔
472
    for (let job of request?.items || []) {
8!
473
      try {
11✔
474
        jobs.push(this._validateJob(job as any));
11✔
475
      } catch (err) {
476
        this.logger.error('Error validating job', job);
×
477
        jobListResponse.items.push({
×
478
          status: {
479
            id: job.id,
480
            code: 400,
481
            message: err.message
482
          }
483
        });
484
      }
485
    }
486

487
    let result: Job[] = [];
8✔
488
    // Scheduling jobs
489
    for (let i = 0; i < jobs.length; i += 1) {
8✔
490
      let job = jobs[i];
11✔
491
      // if not jobID is specified generate a UUID
492
      if (!job.id) {
11✔
493
        job.id = this.idGen();
8✔
494
      }
495

496
      // map the id to jobId as needed in JobOpts for bull
497
      if (job?.id) {
11!
498
        // check if jobID already exists then map it as already exists error
499
        const existingJobId = await this.getRedisValue(job.id);
11✔
500
        if (existingJobId) {
11!
501
          // read job to check if data exists
502
          const jobData = await this.read(JobReadRequest.fromPartial({
×
503
            filter: {
504
              job_ids: [job.id]
505
            },
506
            subject
507
          }), ctx);
508
          if ((jobData?.items as any)[0]?.payload) {
×
509
            jobListResponse.items.push({
×
510
              status: {
511
                id: job.id,
512
                code: 403,
513
                message: `Job with ID ${job.id} already exists`
514
              }
515
            });
516
            continue;
×
517
          }
518
        }
519
        if (!job?.options) {
11!
520
          job.options = { jobId: job.id };
×
521
        } else {
522
          job.options.jobId = job.id;
11✔
523
        }
524
        if (job?.options?.repeat) {
11✔
525
          (job as any).options.repeat.jobId = job.id;
2✔
526
          await this.storeRepeatKey(job.type, job.options, job.id);
2✔
527
        }
528
      }
529

530
      if (!job.data.meta) {
11!
531
        const now = new Date();
×
532
        const metaObj = {
×
533
          created: now,
534
          modified: now,
535
          modified_by: '',
536
          owners: []
537
        };
538
        Object.assign(job.data, { meta: metaObj });
×
539
      }
540
      // if only owners are specified in meta
541
      if (job.data.meta && (!job.data.meta.created || !job.data.meta.modified)) {
11✔
542
        job.data.meta.created = new Date();
9✔
543
        job.data.meta.modified = new Date();
9✔
544
      }
545

546
      if (job?.data?.payload?.value) {
11!
547
        job.data.payload.value = job.data.payload.value.toString() as any;
11✔
548
      }
549

550
      // convert enum priority back to number as it's expected by bull
551
      if (job?.options?.priority) {
11!
552
        job.options.priority = typeof job.options.priority === 'number' ? job.options.priority : Priority[job.options.priority] as unknown as number;
11!
553
      }
554

555
      // if its a repeat job and tz is empty delete the key (else cron parser throws an error)
556
      if (job?.options?.repeat?.tz === '') {
11!
557
        delete job.options.repeat.tz;
×
558
      }
559

560
      const bullOptions = {
11✔
561
        ...job.options
562
      };
563

564
      if ((bullOptions as any).timeout === 1) {
11✔
565
        delete bullOptions['timeout'];
2✔
566
      }
567

568
      // Match the Job Type with the Queue Name and add the Job to this Queue.
569
      // If there is no match, add the Job to the Default Queue
570
      let queue = _.find(this.queuesList, { name: job.type });
11✔
571
      if (_.isEmpty(queue)) {
11!
572
        queue = _.find(this.queuesList, { name: this.defaultQueueName });
×
573
      }
574
      const submittedJob = await queue.add(job.type, job.data, bullOptions);
11✔
575
      this.logger.verbose(`job@${job.type} created`, job);
11✔
576
      result.push(submittedJob);
11✔
577
    }
578

579
    for (let job of result) {
8✔
580
      let jobId = job.id as string;
11✔
581
      if (jobId.startsWith('repeat:')) {
11✔
582
        const repeatKey = jobId.split(':')[1];
2✔
583
        job.id = await this.getRedisValue(repeatKey);
2✔
584
      }
585
    }
586

587
    for (let job of result) {
8✔
588
      let when = job?.opts?.delay ? new Date(job?.opts?.delay).toString() : '';
11✔
589
      jobListResponse.items.push({
11✔
590
        payload: {
591
          id: job.id as string,
592
          type: job.name,
593
          data: _filterJobData(job.data, true, this.logger),
594
          options: _filterJobOptions(job.opts) as any,
595
          when
596
        },
597
        status: {
598
          id: (job?.id)?.toString(),
599
          code: 200,
600
          message: 'success'
601
        }
602
      });
603
    }
604
    const jobList = {
8✔
605
      items: result.map(job => ({
11✔
606
        id: job.id,
607
        type: job.name,
608
        data: _filterJobData(job.data, true, this.logger),
609
        options: _filterJobOptions(job.opts)
610
      })),
611
      total_count: result.length
612
    };
613

614
    if (this.resourceEventsEnabled) {
8!
615
      await this.jobEvents.emit('jobsCreated', jobList);
8✔
616
    }
617

618
    jobListResponse.operation_status = { code: 200, message: 'success' };
8✔
619
    return jobListResponse;
8✔
620
  }
621

622
  private filterByOwnerShip(customArgsObj, result) {
623
    // applying filter based on custom arguments (filterByOwnerShip)
624
    let customArgs = (customArgsObj)?.custom_arguments;
24✔
625
    if (customArgs?.value) {
24!
626
      let customArgsFilter;
627
      try {
24✔
628
        customArgsFilter = JSON.parse(customArgs.value.toString());
24✔
629
      } catch (error) {
630
        this.logger.error('Error parsing custom query arguments', {
×
631
          code: error.code,
632
          message: error.message, stack: error.stack
633
        });
634
      }
635
      if (!customArgsFilter) {
24!
636
        return [];
×
637
      }
638
      const ownerIndicatorEntity = customArgsFilter.entity;
24✔
639
      const ownerValues = customArgsFilter.instance;
24✔
640
      const ownerIndictaorEntURN = this.cfg.get('authorization:urns:ownerIndicatoryEntity');
24✔
641
      const ownerInstanceURN = this.cfg.get('authorization:urns:ownerInstance');
24✔
642
      result = result.filter(job => {
24✔
643
        if (job?.data?.meta?.owners?.length > 0) {
35!
644
          for (let owner of job.data.meta.owners) {
35✔
645
            if (owner?.id === ownerIndictaorEntURN && owner?.value === ownerIndicatorEntity && owner?.attributes?.length > 0) {
35!
646
              for (let ownerInstObj of owner.attributes) {
35✔
647
                if (ownerInstObj?.id === ownerInstanceURN && ownerInstObj?.value && ownerValues.includes(ownerInstObj.value)) {
35!
648
                  return job;
35✔
649
                }
650
              }
651
            }
652
          }
653
        }
654
      });
655
    }
656
    return result;
24✔
657
  }
658

659
  async deleteRedisKey(key: string): Promise<any> {
660
    try {
2✔
661
      await this.repeatJobIdRedisClient.del(key);
2✔
662
      this.logger.debug('Redis Key deleted successfully used for mapping repeatable jobID', { key });
2✔
663
    } catch (err) {
664
      this.logger.error(
×
665
        'Error occurred deleting redis key',
666
        { key, msg: err.message });
667
    }
668
  }
669

670
  async getRedisValue(key: string): Promise<any> {
671
    let redisValue;
672
    try {
45✔
673
      redisValue = await this.repeatJobIdRedisClient.get(key);
45✔
674
      if (redisValue) {
45✔
675
        return JSON.parse(redisValue);
3✔
676
      } else {
677
        return;
42✔
678
      }
679
    } catch (err) {
680
      if (err?.message?.startsWith('Unexpected token') || err.message.startsWith('Unexpected number')) {
×
681
        return redisValue;
×
682
      } else {
683
        this.logger.error(
×
684
          'Error occurred reading redis key',
685
          { key, msg: err.message });
686
      }
687
    }
688
  }
689

690

691
  /**
692
   * Retrieve jobs.
693
   * @param {any} call RPC call argument
694
   * @param {any} ctx RPC context
695
   */
696
  async read(request: JobReadRequest, ctx: any): Promise<DeepPartial<JobListResponse>> {
697
    let jobListResponse: JobListResponse = { items: [], operation_status: { code: 0, message: '' }, total_count: 0 };
27✔
698
    let subject = request.subject;
27✔
699
    let acsResponse: PolicySetRQResponse;
700
    try {
27✔
701
      if (!ctx) { ctx = {}; };
27!
702
      ctx.subject = subject;
27✔
703
      ctx.resources = [];
27✔
704
      acsResponse = await checkAccessRequest(ctx, [{ resource: 'job' }], AuthZAction.READ,
27✔
705
        Operation.whatIsAllowed) as PolicySetRQResponse;
706
    } catch (err) {
707
      this.logger.error('Error occurred requesting access-control-srv:', err);
×
708
      return {
×
709
        operation_status: {
710
          code: err.code,
711
          message: err.message
712
        }
713
      };
714
    }
715
    if (acsResponse.decision != Response_Decision.PERMIT) {
27✔
716
      return { operation_status: acsResponse.operation_status };
3✔
717
    }
718

719
    let result: Job[] = [];
24✔
720
    if (_.isEmpty(request) || _.isEmpty(request.filter)
24!
721
      && (!request.filter || !request.filter.job_ids
722
        || _.isEmpty(request.filter.job_ids))
723
      && (!request.filter || !request.filter.type ||
724
        _.isEmpty(request.filter.type))) {
725
      result = await this._getJobList();
12✔
726
      let custom_arguments;
727
      if (acsResponse?.custom_query_args?.length > 0) {
12!
728
        custom_arguments = acsResponse.custom_query_args[0].custom_arguments;
12✔
729
      }
730
      result = this.filterByOwnerShip({ custom_arguments }, result);
12✔
731
    } else {
732
      const that = this;
12✔
733
      let jobIDs = request.filter.job_ids || [];
12✔
734
      if (!_.isArray(jobIDs)) {
12!
735
        jobIDs = [jobIDs];
×
736
      }
737
      const typeFilterName = request.filter.type;
12✔
738

739
      // Search in all the queues and retrieve jobs after JobID
740
      // and add them to the jobIDsCopy list.
741
      // If the job is not found in any of the queues, continue looking
742
      // Finally compare the two lists and add an error to status for which
743
      // job could not be found in any of the queues.
744
      if (jobIDs.length > 0) {
12✔
745
        // jobIDsCopy should contain the jobIDs duplicate values
746
        // after the for loop ends
747
        let jobIDsCopy: string[] = [];
11✔
748
        for (let jobID of jobIDs) {
11✔
749
          const jobIdData = await this.getRedisValue(jobID as string);
11✔
750
          if (jobIdData?.repeatKey) {
11✔
751
            const repeatKey = jobIdData.repeatKey;
1✔
752
            if (jobIdData?.options?.repeat?.cron && jobIdData?.options?.repeat?.every) {
1!
753
              jobListResponse.items.push({
×
754
                status: {
755
                  id: jobID.toString(),
756
                  code: 400,
757
                  message: 'Both .cron and .every options are defined for this repeatable job'
758
                }
759
              });
760
              continue;
×
761
            }
762
            const nextMillis = this.getNextMillis(Date.now(), jobIdData.options.repeat);
1✔
763
            this.logger.debug('Repeatable job identifier', { id: jobID, repeatId: `repeat:${repeatKey}:${nextMillis}` });
1✔
764
            // map the repeatKey with nextmilis for bull repeatable jobID
765
            jobID = `repeat:${repeatKey}:${nextMillis}`;
1✔
766
          }
767
          for (let queue of this.queuesList) {
11✔
768
            await new Promise((resolve, reject) => {
33✔
769
              // getJob returns job or null
770
              queue.getJob(jobID).then(job => {
33✔
771
                resolve(job);
33✔
772
                if (!_.isEmpty(job)) {
33✔
773
                  result.push(job);
9✔
774
                  if ((job as any)?.opts?.repeat?.jobId) {
9!
775
                    jobIDsCopy.push((job as any).opts.repeat.jobId);
×
776
                  } else {
777
                    jobIDsCopy.push(jobID);
9✔
778
                  }
779
                }
780
              }).catch(err => {
781
                that.logger.error(`Error reading job ${jobID}`, err);
×
782
                if (err?.code && typeof err.code === 'string') {
×
783
                  err.code = 500;
×
784
                }
785
                jobListResponse.items.push({
×
786
                  status: {
787
                    id: jobID.toString(),
788
                    code: err.code,
789
                    message: err.message
790
                  }
791
                });
792
              });
793
            });
794
          }
795
        }
796
        if (!_.isEqual(jobIDs.sort(), jobIDsCopy.sort())) {
11✔
797
          const jobIDsDiff = _.difference(jobIDs, jobIDsCopy);
2✔
798
          for (let jobId of jobIDsDiff) {
2✔
799
            jobListResponse.items.push({
2✔
800
              status: {
801
                id: jobId.toString(),
802
                code: 404,
803
                message: `Job ID ${jobId} not found in any of the queues`
804
              }
805
            });
806
          }
807
        }
808
      } else {
809
        try {
1✔
810
          let jobsList: any[] = [];
1✔
811
          for (let queue of this.queuesList) {
1✔
812
            const getJobsResult = await queue.getJobs(this.bullOptions['activeAndFutureJobTypes']);
3✔
813
            jobsList = jobsList.concat(getJobsResult);
3✔
814
          }
815
          result = jobsList;
1✔
816
        } catch (err) {
817
          that.logger.error('Error reading jobs', err);
×
818
          if (typeof err.code === 'string') {
×
819
            err.code = 500;
×
820
          }
821
          return {
×
822
            operation_status: {
823
              code: err.code,
824
              message: err.message
825
            }
826
          };
827
        }
828
      }
829

830
      if (typeFilterName) {
12✔
831
        result = result.filter(job => job.name === typeFilterName);
6✔
832
      }
833
      let custom_arguments;
834
      if (acsResponse?.custom_query_args?.length > 0) {
12!
835
        custom_arguments = acsResponse.custom_query_args[0].custom_arguments;
12✔
836
      }
837
      result = this.filterByOwnerShip({ custom_arguments }, result);
12✔
838
    }
839

840
    result = result.filter(valid => !!valid);
35✔
841

842
    if (!_.isEmpty(request) && !_.isEmpty(request.sort)
24✔
843
      && _.includes(['ASCENDING', 'DESCENDING'], request.sort)) {
844
      let sort;
845
      switch (request.sort) {
2!
846
        case JobReadRequest_SortOrder.DESCENDING:
847
          sort = 'desc';
1✔
848
          break;
1✔
849
        case JobReadRequest_SortOrder.ASCENDING:
850
          sort = 'asc';
1✔
851
          break;
1✔
852
        default:
853
          this.logger.error(`Unknown sort option ${sort}`);
×
854
      }
855
      result = _.orderBy(result, ['id'], [sort]);
2✔
856
    }
857

858
    for (let job of result) {
24✔
859
      let jobId = job.id as string;
35✔
860
      if (jobId.startsWith('repeat:')) {
35✔
861
        const repeatKey = jobId.split(':')[1];
10✔
862
        // it could be possible the redis repeat key is deleted on index 8 and old completed
863
        // jobs exist in data_store if delete on complete was not set to true for repeatable jobs
864
        const jobRedisId = await this.getRedisValue(repeatKey);
10✔
865
        if (jobRedisId) {
10!
866
          job.id = jobRedisId;
×
867
        }
868
      }
869
    }
870

871
    for (let job of result) {
24✔
872
      let when = job?.opts?.delay ? new Date(job?.opts?.delay).toString() : '';
35!
873
      jobListResponse.items.push({
35✔
874
        payload: {
875
          id: job.id as string,
876
          type: job.name,
877
          data: _filterJobData(job.data, true, this.logger),
878
          options: _filterJobOptions(job.opts) as any,
879
          when
880
        },
881
        status: {
882
          id: job.id.toString(),
883
          code: 200,
884
          message: 'success'
885
        }
886
      });
887
    }
888
    jobListResponse.total_count = jobListResponse?.items?.length;
24✔
889
    jobListResponse.operation_status = {
24✔
890
      code: 200,
891
      message: 'success'
892
    };
893
    return jobListResponse;
24✔
894
  }
895

896
  async _getJobList(): Promise<Job[]> {
897
    let jobsList: any[] = [];
14✔
898
    for (let queue of this.queuesList) {
14✔
899
      const getJobsResult = await queue.getJobs(this.bullOptions['activeAndFutureJobTypes']);
42✔
900
      jobsList = jobsList.concat(getJobsResult);
42✔
901
    }
902
    return jobsList;
14✔
903
  }
904

905
  // delete a job by its job instance after processing 'jobDone' / 'jobFailed'
906
  async _deleteJobInstance(jobId: string, queue: Queue): Promise<void> {
907
    return this._removeBullJob(jobId, queue);
×
908
  }
909

910
  /**
911
   * Delete Job from queue.
912
   */
913
  async delete(request: DeleteRequest, ctx: any): Promise<DeepPartial<DeleteResponse>> {
914
    let deleteResponse: DeleteResponse = { status: [], operation_status: { code: 0, message: '' } };
6✔
915
    if (_.isEmpty(request)) {
6!
916
      return {
×
917
        operation_status: {
918
          code: 400,
919
          message: 'No arguments provided for delete operation'
920
        }
921
      };
922
    }
923
    const subject = request?.subject;
6✔
924
    const jobIDs = request?.ids;
6✔
925
    let resources = [];
6✔
926
    let action;
927
    if (jobIDs) {
6✔
928
      action = AuthZAction.DELETE;
3✔
929
      if (_.isArray(jobIDs)) {
3!
930
        for (let id of jobIDs) {
3✔
931
          resources.push({ id });
3✔
932
        }
933
      } else {
934
        resources = [{ id: jobIDs }];
×
935
      }
936
      await this.createMetadata(resources, action, subject);
3✔
937
    }
938
    if (request.collection) {
6✔
939
      action = AuthZAction.DROP;
3✔
940
      resources = [{ collection: request.collection }];
3✔
941
    }
942
    let acsResponse: DecisionResponse;
943
    try {
6✔
944
      if (!ctx) { ctx = {}; };
6!
945
      ctx.subject = subject;
6✔
946
      ctx.resources = resources;
6✔
947
      acsResponse = await checkAccessRequest(ctx, [{ resource: 'job', id: jobIDs as string[] }], action,
6✔
948
        Operation.isAllowed);
949
    } catch (err) {
950
      this.logger.error('Error occurred requesting access-control-srv:', err);
×
951
      return {
×
952
        operation_status: {
953
          code: err.code,
954
          message: err.message
955
        }
956
      };
957
    }
958
    if (acsResponse.decision != Response_Decision.PERMIT) {
6✔
959
      return { operation_status: acsResponse.operation_status };
1✔
960
    }
961
    const dispatch = [];
5✔
962
    this.logger.info('Received delete request');
5✔
963
    if ('collection' in request && request.collection) {
5✔
964
      this.logger.verbose('Deleting all jobs');
2✔
965

966
      await this._getJobList().then(async (jobs) => {
2✔
967
        for (let job of jobs) {
2✔
968
          await job.remove();
5✔
969
          if (this.resourceEventsEnabled) {
5!
970
            dispatch.push(this.jobEvents.emit('jobsDeleted', { id: job.id }));
5✔
971
          }
972
          deleteResponse.status.push({
5✔
973
            id: job.id.toString(),
974
            code: 200,
975
            message: 'success'
976
          });
977
        }
978
      });
979
      // FLUSH redis DB index 8 used for mapping of repeat jobIds (since req is for dropping job collection)
980
      const delResp = await this.repeatJobIdRedisClient.flushDb();
2✔
981
      if (delResp) {
2!
982
        this.logger.debug('Mapped keys for repeatable jobs deleted successfully');
2✔
983
      } else {
984
        this.logger.debug('Could not delete repeatable job keys');
×
985
      }
986
    } else if ('ids' in request) {
3!
987
      this.logger.verbose('Deleting jobs by their IDs', request.ids);
3✔
988

989
      for (let queue of this.queuesList) {
3✔
990
        for (let jobDataKey of request.ids) {
9✔
991
          let callback: Promise<boolean>;
992
          const jobIdData = await this.getRedisValue(jobDataKey as string);
9✔
993
          if (jobIdData && jobIdData.repeatKey) {
9✔
994
            const jobs = await queue.getRepeatableJobs();
2✔
995
            for (let job of jobs) {
2✔
996
              if (job.id === jobDataKey) {
1!
997
                this.logger.debug('Removing Repeatable job by key for jobId', { id: job.id });
1✔
998
                callback = queue.removeRepeatableByKey(job.key);
1✔
999
                deleteResponse.status.push({
1✔
1000
                  id: job.id,
1001
                  code: 200,
1002
                  message: 'success'
1003
                });
1004
                await this.deleteRedisKey(jobDataKey as string);
1✔
1005
                await this.deleteRedisKey(jobIdData.repeatKey);
1✔
1006
                break;
1✔
1007
              }
1008
            }
1009
          } else {
1010
            callback = queue.getJob(jobDataKey).then(async (jobData) => {
7✔
1011
              if (jobData) {
7✔
1012
                try {
2✔
1013
                  await this._removeBullJob(jobData.id, queue);
2✔
1014
                  deleteResponse.status.push({
2✔
1015
                    id: jobData.id.toString(),
1016
                    code: 200,
1017
                    message: 'success'
1018
                  });
1019
                } catch (err) {
1020
                  if (typeof err?.code === 'string') {
×
1021
                    err.code = 500;
×
1022
                  }
1023
                  deleteResponse.status.push({
×
1024
                    id: jobData.id.toString(),
1025
                    code: err.code,
1026
                    message: err.message
1027
                  });
1028
                  return false;
×
1029
                }
1030
                return true;
2✔
1031
              }
1032
              return false;
5✔
1033
            });
1034
          }
1035

1036
          // since no CB is returned for removeRepeatableByKey by bull
1037
          if (!callback) {
9✔
1038
            if (this.resourceEventsEnabled) {
1!
1039
              dispatch.push(this.jobEvents.emit(
1✔
1040
                'jobsDeleted', { id: jobDataKey })
1041
              );
1042
            }
1043
          } else {
1044
            callback.then(() => {
8✔
1045
              if (this.resourceEventsEnabled) {
8!
1046
                dispatch.push(this.jobEvents.emit(
8✔
1047
                  'jobsDeleted', { id: jobDataKey })
1048
                );
1049
              }
1050
            }).catch(err => {
1051
              this.logger.error('Error deleting job', { id: jobDataKey });
×
1052
              if (typeof err?.code === 'number') {
×
1053
                err.code = 500;
×
1054
              }
1055
              deleteResponse.status.push({
×
1056
                id: jobDataKey.toString(),
1057
                code: err.code,
1058
                message: err.message
1059
              });
1060
            });
1061
          }
1062
        }
1063
      }
1064
    }
1065

1066
    await Promise.all(dispatch);
5✔
1067
    deleteResponse.operation_status = { code: 200, message: 'success' };
5✔
1068
    return deleteResponse;
5✔
1069
  }
1070

1071
  /**
1072
   * Clean up queues - removes complted and failed jobs from queue
1073
   * @param {any} job clean up job
1074
   */
1075
  async cleanupJobs(ttlAfterFinished) {
1076
    for (let queue of this.queuesList) {
×
1077
      try {
×
1078
        await queue.clean(ttlAfterFinished, 0, COMPLETED_JOB_STATE);
×
1079
        await queue.clean(ttlAfterFinished, 0, FAILED_JOB_STATE);
×
1080
      } catch (err) {
1081
        this.logger.error('Error cleaning up jobs', err);
×
1082
      }
1083
    }
1084
    this.logger.info('Jobs cleaned up successfully');
×
1085
    let lastExecutedInterval = { lastExecutedInterval: (new Date()).toString() };
×
1086
    await this.repeatJobIdRedisClient.set(QUEUE_CLEANUP, JSON.stringify(lastExecutedInterval));
×
1087
  }
1088

1089
  async setupCleanInterval(cleanInterval: number, ttlAfterFinished: number) {
1090
    if (!ttlAfterFinished) {
×
1091
      ttlAfterFinished = DEFAULT_CLEANUP_COMPLETED_JOBS;
×
1092
    }
1093
    const intervalData = await this.getRedisValue(QUEUE_CLEANUP);
×
1094
    let timeInMs, delta;
1095
    const now = new Date().getTime();
×
1096
    if (intervalData?.lastExecutedInterval && typeof (intervalData.lastExecutedInterval) === 'string') {
×
1097
      timeInMs = new Date(intervalData.lastExecutedInterval).getTime();
×
1098
      this.logger.debug('Previous execution interval', intervalData);
×
1099
      delta = now - timeInMs;
×
1100
      this.logger.debug('Clean interval and previous difference', { cleanInterval, difference: delta });
×
1101
    }
1102

1103
    if (delta && (delta < cleanInterval)) {
×
1104
      // use setTimeout and then create interval on setTimeout
1105
      this.logger.info('Restoring previous execution interval with set timeout', { time: cleanInterval - delta });
×
1106
      setTimeout(async () => {
×
1107
        await this.cleanupJobs(ttlAfterFinished);
×
1108
        setInterval(this.cleanupJobs.bind(this), cleanInterval, ttlAfterFinished);
×
1109
      }, cleanInterval - delta);
1110
    } else {
1111
      setInterval(this.cleanupJobs.bind(this), cleanInterval, ttlAfterFinished);
×
1112
      this.logger.info('Clean up job interval set successfully');
×
1113
    }
1114
  }
1115

1116
  /**
1117
   * Reschedules a job - deletes it and recreates it with a new generated ID.
1118
   */
1119
  async update(request: JobList, ctx: any): Promise<DeepPartial<JobListResponse>> {
1120
    let subject = request.subject;
3✔
1121
    // update meta data for owners information
1122
    await this.createMetadata(request.items, AuthZAction.MODIFY, subject);
3✔
1123
    let acsResponse: DecisionResponse;
1124
    try {
3✔
1125
      if (!ctx) { ctx = {}; };
3!
1126
      ctx.subject = subject;
3✔
1127
      ctx.resources = request?.items;
3✔
1128
      acsResponse = await checkAccessRequest(ctx,
3✔
1129
        [{ resource: 'job', id: request.items.map(item => item.id) }],
3✔
1130
        AuthZAction.MODIFY, Operation.isAllowed);
1131
    } catch (err) {
1132
      this.logger.error('Error occurred requesting access-control-srv:', err);
×
1133
      return {
×
1134
        operation_status: {
1135
          code: err.code,
1136
          message: err.message
1137
        }
1138
      };
1139
    }
1140
    if (acsResponse.decision != Response_Decision.PERMIT) {
3✔
1141
      return { operation_status: acsResponse.operation_status };
1✔
1142
    }
1143
    if (_.isNil(request) || _.isNil(request.items)) {
2!
1144
      return {
×
1145
        operation_status: {
1146
          code: 400,
1147
          message: 'Missing items in update request'
1148
        }
1149
      };
1150
    }
1151

1152
    const mappedJobs = request?.items?.reduce((obj, job) => {
2✔
1153
      obj[job.id] = job;
2✔
1154
      return obj;
2✔
1155
    }, {});
1156

1157
    const jobData = await this.read(JobReadRequest.fromPartial(
2✔
1158
      {
1159
        filter: {
1160
          job_ids: Object.keys(mappedJobs)
1161
        },
1162
        subject
1163
      }
1164
    ), ctx);
1165

1166
    await this.delete(DeleteRequest.fromPartial({
2✔
1167
      ids: Object.keys(mappedJobs),
1168
      subject
1169
    }), {});
1170

1171
    const result = [];
2✔
1172

1173
    jobData?.items?.forEach(async (job) => {
2✔
1174
      const mappedJob = mappedJobs[job?.payload?.id];
2✔
1175
      // update job repeate key based on updated job repeat options
1176
      if (job?.payload?.options?.repeat) {
2!
1177
        await this.storeRepeatKey(job?.payload?.type, job?.payload?.options, job?.payload?.id);
×
1178
      }
1179
      let endJob = {
2✔
1180
        id: mappedJob.id,
1181
        type: mappedJob.type,
1182
        options: {
1183
          ...job.payload.options,
1184
          ...(mappedJob.options ? mappedJob.options : {})
2!
1185
        },
1186
        data: mappedJob.data || job.payload.data,
2!
1187
        when: mappedJob.when,
1188
      };
1189

1190
      if (endJob.when && endJob.options) {
2!
1191
        delete endJob.options['delay'];
2✔
1192
      }
1193

1194
      result.push(endJob);
2✔
1195
    });
1196

1197
    return this.create(JobList.fromPartial({
2✔
1198
      items: result,
1199
      subject
1200
    }), ctx);
1201
  }
1202

1203
  /**
1204
   * Upserts a job - creates a new job if it does not exist or update the
1205
   * existing one if it already exists.
1206
   */
1207
  async upsert(request: JobList, ctx: any): Promise<DeepPartial<JobListResponse>> {
1208
    let subject = request.subject;
2✔
1209
    await this.createMetadata(request.items, AuthZAction.MODIFY, subject);
2✔
1210
    let acsResponse: DecisionResponse;
1211
    try {
2✔
1212
      if (!ctx) { ctx = {}; };
2!
1213
      ctx.subject = subject;
2✔
1214
      ctx.resources = request.items;
2✔
1215
      acsResponse = await checkAccessRequest(ctx,
2✔
1216
        [{ resource: 'job', id: request.items.map(item => item.id) }],
2✔
1217
        AuthZAction.MODIFY, Operation.isAllowed);
1218
    } catch (err) {
1219
      this.logger.error('Error occurred requesting access-control-srv:', err);
×
1220
      return {
×
1221
        operation_status: {
1222
          code: err.code,
1223
          message: err.message
1224
        }
1225
      };
1226
    }
1227

1228
    if (acsResponse.decision != Response_Decision.PERMIT) {
2✔
1229
      return { operation_status: acsResponse.operation_status };
1✔
1230
    }
1231
    if (_.isNil(request) || _.isNil(request.items)) {
1!
1232
      return { operation_status: { code: 400, message: 'Missing items in upsert request' } };
×
1233
    }
1234

1235
    let result = [];
1✔
1236

1237
    for (let eachJob of request.items) {
1✔
1238
      let jobExists = false;
1✔
1239
      let origJobId = _.cloneDeep(eachJob.id);
1✔
1240
      for (let queue of this.queuesList) {
1✔
1241
        const jobIdData = await this.getRedisValue(eachJob.id as string);
2✔
1242
        if (jobIdData && jobIdData.repeatKey) {
2!
1243
          const repeatKey = jobIdData.repeatKey;
×
1244
          if (jobIdData?.options?.repeat?.cron && jobIdData?.options?.repeat?.every) {
×
1245
            result.push({
×
1246
              status: {
1247
                id: origJobId,
1248
                code: 400,
1249
                message: 'Both .cron and .every options are defined for this repeatable job'
1250
              }
1251
            });
1252
            continue;
×
1253
          }
1254
          const nextMillis = this.getNextMillis(Date.now(), jobIdData.options.repeat);
×
1255
          this.logger.debug('Repeatable job identifier', { id: eachJob.id, repeatId: `repeat:${repeatKey}:${nextMillis}` });
×
1256
          // map the repeatKey with nextmilis for bull repeatable jobID
1257
          eachJob.id = `repeat:${repeatKey}:${nextMillis}`;
×
1258
        }
1259
        const jobInst = await queue.getJob(eachJob.id);
2✔
1260
        if (jobInst) {
2✔
1261
          // existing job update it with the given job identifier
1262
          if (eachJob.id.startsWith('repeat:')) {
1!
1263
            eachJob.id = origJobId;
×
1264
          }
1265
          result = [
1✔
1266
            ...result,
1267
            ...(await this.update(JobList.fromPartial({ items: [eachJob], subject }), ctx)).items
1268
          ];
1269
          jobExists = true;
1✔
1270
          break;
1✔
1271
        }
1272
      }
1273
      if (!jobExists) {
1!
1274
        // new job create it
1275
        result = [
×
1276
          ...result,
1277
          ...(await this.create(JobList.fromPartial({ items: [eachJob], subject }), ctx)).items
1278
        ];
1279
      }
1280
    }
1281

1282
    return {
1✔
1283
      items: result,
1284
      total_count: result.length,
1285
      operation_status: {
1286
        code: 200,
1287
        message: 'success'
1288
      }
1289
    };
1290
  }
1291

1292
  /**
1293
   * Clear all job data.
1294
   */
1295
  async clear(): Promise<any> {
1296
    let allJobs: any[] = [];
1✔
1297
    for (let queue of this.queuesList) {
1✔
1298
      allJobs = allJobs.concat(await queue.getJobs(this.bullOptions['allJobTypes']));
3✔
1299
    }
1300
    return Promise.all(allJobs.map(async (job) => job.remove())).catch(err => {
6✔
1301
      this.logger.error(`Error clearing jobs`, err);
×
1302
      throw err;
×
1303
    });
1304
  }
1305

1306
  async _removeBullJob(jobInstID: string, queue: Queue): Promise<void> {
1307
    return queue.getJob(jobInstID).then(job => {
2✔
1308
      if (job) {
2!
1309
        return job.remove();
2✔
1310
      }
1311
    }).then(() => {
1312
      this.logger.info(`Job#${jobInstID} removed`);
2✔
1313
    }).catch(err => {
1314
      this.logger.error(`Error removing job ${jobInstID}`, err);
×
1315
      throw err;
×
1316
    });
1317
  }
1318

1319
  /**
1320
   *  disable access control
1321
   */
1322
  disableAC() {
1323
    try {
1✔
1324
      this.cfg.set('authorization:enabled', false);
1✔
1325
      updateConfig(this.cfg);
1✔
1326
    } catch (err) {
1327
      this.logger.error('Error caught disabling authorization:', { err });
×
1328
      this.cfg.set('authorization:enabled', this.authZCheck);
×
1329
    }
1330
  }
1331

1332
  /**
1333
   *  enables access control
1334
   */
1335
  enableAC() {
1336
    try {
1✔
1337
      this.cfg.set('authorization:enabled', true);
1✔
1338
      updateConfig(this.cfg);
1✔
1339
    } catch (err) {
1340
      this.logger.error('Error caught enabling authorization:', { err });
×
1341
      this.cfg.set('authorization:enabled', this.authZCheck);
×
1342
    }
1343
  }
1344

1345
  /**
1346
   *  restore AC state to previous vale either before enabling or disabling AC
1347
   */
1348
  restoreAC() {
1349
    try {
1✔
1350
      this.cfg.set('authorization:enabled', this.authZCheck);
1✔
1351
      updateConfig(this.cfg);
1✔
1352
    } catch (err) {
1353
      this.logger.error('Error caught enabling authorization:', { err });
×
1354
      this.cfg.set('authorization:enabled', this.authZCheck);
×
1355
    }
1356
  }
1357

1358
  /**
1359
   * reads meta data from DB and updates owners information in resource if action is UPDATE / DELETE
1360
   * @param resources list of resources
1361
   * @param action resource action
1362
   * @param subject subject name
1363
   */
1364
  async createMetadata(resources: any, action: string, subject): Promise<any> {
1365
    let orgOwnerAttributes: Attribute[] = [];
17✔
1366
    if (resources && !_.isArray(resources)) {
17!
1367
      resources = [resources];
×
1368
    }
1369
    const urns = this.cfg.get('authorization:urns');
17✔
1370
    if (subject?.scope && (action === AuthZAction.CREATE || action === AuthZAction.MODIFY)) {
17✔
1371
      // add subject scope as default owners
1372
      orgOwnerAttributes.push(
14✔
1373
        {
1374
          id: urns?.ownerIndicatoryEntity,
1375
          value: urns?.organization,
1376
          attributes: [{
1377
            id: urns?.ownerInstance,
1378
            value: subject?.scope,
1379
            attributes: []
1380
          }]
1381
        });
1382
    }
1383

1384
    if (resources?.length > 0) {
17!
1385
      for (let resource of resources) {
17✔
1386
        if (!resource.data) {
20✔
1387
          resource.data = { meta: {} };
3✔
1388
        } else if (!resource.data.meta) {
17✔
1389
          resource.data.meta = {};
12✔
1390
        }
1391
        if (action === AuthZAction.MODIFY || action === AuthZAction.DELETE) {
20✔
1392
          let result;
1393
          try {
8✔
1394
            result = await this.read(JobReadRequest.fromPartial({
8✔
1395
              filter: {
1396
                job_ids: [resource.id]
1397
              },
1398
              subject
1399
            }), {});
1400
          } catch (err) {
1401
            if (err.message.startsWith('Error! Jobs not found in any of the queues') && action != AuthZAction.DELETE) {
×
1402
              this.logger.debug('New job should be created', { jobId: resource.id });
×
1403
              result = { items: [] };
×
1404
            } else {
1405
              this.logger.error(`Error reading job with resource ID ${resource.id}`, { code: err.code, message: err.message, stack: err.stack });
×
1406
            }
1407
          }
1408
          // update owners info
1409
          if (result?.items?.length === 1 && result?.items[0]?.payload) {
8✔
1410
            let item = result.items[0].payload;
5✔
1411
            resource.data.meta.owners = item.data.meta.owners;
5✔
1412
            // adding meta to resource root (needed by access-contorl-srv for owners information check)
1413
            // meta is inside data of resource since the data is persisted in redis using bull
1414
            resource.meta = { owners: item.data.meta.owners };
5✔
1415
          } else if ((!result || !result.items || !result.items[0] || !result.items[0].payload) && action === AuthZAction.MODIFY) {
3✔
1416
            // job does not exist - create new job (ex: Upsert with action modify)
1417
            let ownerAttributes = _.cloneDeep(orgOwnerAttributes);
2✔
1418
            // add user as default owners
1419
            ownerAttributes.push(
2✔
1420
              {
1421
                id: urns?.ownerIndicatoryEntity,
1422
                value: urns?.user,
1423
                attributes: [{
1424
                  id: urns?.ownerInstance,
1425
                  value: subject?.id,
1426
                  attributes: []
1427
                }]
1428
              });
1429
            resource.data.meta.owners = ownerAttributes;
2✔
1430
            resource.meta = { owners: ownerAttributes };
2✔
1431
          }
1432
        } else if (action === AuthZAction.CREATE && !resource.data.meta.owners) {
12✔
1433
          let ownerAttributes = _.cloneDeep(orgOwnerAttributes);
10✔
1434
          // add user as default owners
1435
          if (resource.id) {
10✔
1436
            ownerAttributes.push(
2✔
1437
              {
1438
                id: urns.ownerIndicatoryEntity,
1439
                value: urns.user,
1440
                attributes: [{
1441
                  id: urns.ownerInstance,
1442
                  value: subject.id,
1443
                  attributes: []
1444
                }]
1445
              });
1446
          }
1447
          resource.data.meta.owners = ownerAttributes;
10✔
1448
          resource.meta = { owners: ownerAttributes };
10✔
1449
        } else if (action === AuthZAction.CREATE && resource?.data?.meta?.owners) {
2!
1450
          resource.meta = { owners: resource.data.meta.owners };
2✔
1451
        }
1452
      }
1453
    }
1454
    return resources;
17✔
1455
  }
1456
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc