• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

restorecommerce / scheduling-srv / 6960060717

22 Nov 2023 04:22PM UTC coverage: 67.617% (-0.1%) from 67.763%
6960060717

push

github

Arun-KumarH
chore: Release v1.2.2 - See CHANGELOG

270 of 449 branches covered (0.0%)

Branch coverage included in aggregate %.

1 of 1 new or added line in 1 file covered. (100.0%)

1 existing line in 1 file now uncovered.

655 of 919 relevant lines covered (71.27%)

6.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.89
/src/schedulingService.ts
1
import * as _ from 'lodash';
1✔
2
import { errors } from '@restorecommerce/chassis-srv';
1✔
3
import * as kafkaClient from '@restorecommerce/kafka-client';
4
import { AuthZAction, ACSAuthZ, updateConfig, DecisionResponse, Operation, PolicySetRQResponse } from '@restorecommerce/acs-client';
1✔
5
import {
1✔
6
  JobServiceImplementation as SchedulingServiceServiceImplementation,
7
  JobFailed, JobDone, DeepPartial, JobList, JobListResponse, Data,
8
  Backoff_Type, JobOptions_Priority, JobReadRequest, JobReadRequest_SortOrder
9
} from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/job';
10
import { createClient, RedisClientType } from 'redis';
1✔
11
import { NewJob, Priority } from './types';
1✔
12
import { parseExpression } from 'cron-parser';
1✔
13
import * as crypto from 'crypto';
1✔
14
import { _filterJobData, _filterJobOptions, _filterQueuedJob, checkAccessRequest, marshallProtobufAny } from './utilts';
1✔
15
import * as uuid from 'uuid';
1✔
16
import { Logger } from 'winston';
17
import { Response_Decision } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/access_control';
1✔
18
import { Attribute } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/attribute';
19
import { DeleteRequest, DeleteResponse } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/resource_base';
1✔
20
import { Queue, QueueOptions, Job } from 'bullmq';
1✔
21
import { parseInt } from 'lodash';
1✔
22

23
const JOB_DONE_EVENT = 'jobDone';
1✔
24
const JOB_FAILED_EVENT = 'jobFailed';
1✔
25
const DEFAULT_CLEANUP_COMPLETED_JOBS = 604800000; // 7 days in miliseconds
1✔
26
const COMPLETED_JOB_STATE = 'completed';
1✔
27
const FAILED_JOB_STATE = 'failed';
1✔
28
const QUEUE_CLEANUP = 'queueCleanup';
1✔
29

30
/**
31
 * A job scheduling service.
32
 */
33
export class SchedulingService implements SchedulingServiceServiceImplementation {
1✔
34

35
  jobEvents: kafkaClient.Topic;
36
  logger: Logger;
37

38
  queuesConfigList: any;
39
  queuesList: Queue[];
40
  defaultQueueName: string;
41

42
  redisClient: RedisClientType<any, any>;
43
  resourceEventsEnabled: boolean;
44
  canceledJobs: Set<string>;
45
  bullOptions: any;
46
  cfg: any;
47
  authZ: ACSAuthZ;
48
  authZCheck: boolean;
49
  repeatJobIdRedisClient: RedisClientType<any, any>;
50

51

52
  constructor(jobEvents: kafkaClient.Topic,
53
    private redisConfig: any, logger: any, redisClient: RedisClientType<any, any>,
1✔
54
    bullOptions: any, cfg: any, authZ: ACSAuthZ) {
55
    this.jobEvents = jobEvents;
1✔
56
    this.resourceEventsEnabled = true;
1✔
57
    this.bullOptions = bullOptions;
1✔
58
    this.logger = logger;
1✔
59
    this.queuesList = [];
1✔
60
    this.queuesConfigList = [];
1✔
61
    this.redisClient = redisClient;
1✔
62

63
    const repeatJobIdCfg = cfg.get('redis');
1✔
64
    repeatJobIdCfg.database = cfg.get('redis:db-indexes:db-repeatJobId');
1✔
65
    this.repeatJobIdRedisClient = createClient(repeatJobIdCfg);
1✔
66
    this.repeatJobIdRedisClient.on('error', (err) => logger.error('Redis client error in repeatable job store', err));
1✔
67
    this.repeatJobIdRedisClient.connect().then((data) => {
1✔
68
      logger.info('Redis client connection for repeatable job store successful');
1✔
69
    }).catch(err => logger.error('Redis client error for repeatable job store', err));
×
70

71
    this.canceledJobs = new Set<string>();
1✔
72
    this.cfg = cfg;
1✔
73
    this.authZ = authZ;
1✔
74
    this.authZCheck = this.cfg.get('authorization:enabled');
1✔
75

76
    // Read Queue Configuration file and find first queue which has "default": true,
77
    // then save it to defaultQueueName
78
    const queuesCfg = this.cfg.get('queue');
1✔
79
    if (_.isEmpty(queuesCfg)) {
1!
80
      this.logger.error('Queue configuration not found!');
×
81
      throw new Error('Queue configuration not found!');
×
82
    }
83
    let defaultTrueExists = false;
1✔
84
    for (let queueCfg of queuesCfg) {
1✔
85
      // Find configuration which has default=true
86
      if (queueCfg.default == true) {
1!
87
        defaultTrueExists = true;
1✔
88
        this.defaultQueueName = queueCfg.name;
1✔
89
        break;
1✔
90
      }
91
    }
92
    if (!defaultTrueExists) {
1!
93
      this.logger.error('Queue default configuration not found!');
×
94
      throw new Error('Queue default configuration not found!');
×
95
    }
96

97
    // Create Queues
98
    for (let queueCfg of queuesCfg) {
1✔
99
      let queueOptions: QueueOptions;
100
      const prefix = queueCfg.name;
3✔
101
      const rateLimiting = queueCfg.rateLimiting;
3✔
102
      const advancedSettings = queueCfg.advancedSettings;
3✔
103

104
      queueOptions = {
3✔
105
        connection: {
106
          ...redisConfig,
107
        }
108
      };
109

110
      // Create Queue Configuration - Add Rate Limiting if enabled
111
      if (!_.isEmpty(rateLimiting) && rateLimiting.enabled == true) {
3!
112
        this.logger.info(`Queue: ${queueCfg.name} - Rate limiting is ENABLED.`);
×
113
      }
114

115
      if (!_.isEmpty(advancedSettings)) {
3!
116
        queueOptions.settings = {
3✔
117
          ...advancedSettings,
118
        };
119
      }
120

121
      const redisURL = new URL((queueOptions.connection as any).url);
3✔
122

123
      if ('keyPrefix' in queueOptions.connection) {
3!
124
        delete queueOptions.connection.keyPrefix;
×
125
      }
126

127
      let queue = new Queue(prefix, {
3✔
128
        ...queueOptions,
129
        connection: {
130
          ...queueOptions.connection as any,
131
          host: redisURL.hostname,
132
          port: parseInt(redisURL.port)
133
        }
134
      });
135
      this.queuesList.push(queue);
3✔
136

137
      // Add Queue Configurations
138
      let queueCfgObj = {
3✔
139
        name: queueCfg.name,
140
        concurrency: queueCfg.concurrency,
141
        default: queueCfg.default,
142
        runMissedScheduled: queueCfg.runMissedScheduled
143
      };
144
      this.queuesConfigList.push(queueCfgObj);
3✔
145
    }
146
  }
147

148
  /**
149
   * Start handling the job queue, job scheduling and
150
   * managing job events.
151
   */
152
  async start(): Promise<any> {
153
    const logger = this.logger;
1✔
154
    const that = this;
1✔
155
    const events = [JOB_DONE_EVENT, JOB_FAILED_EVENT];
1✔
156
    for (let eventName of events) {
1✔
157
      // A Scheduling Service Event Listener
158
      await this.jobEvents.on(eventName, async (msg: any, ctx: any,
2✔
159
        config: any, eventName: string): Promise<any> => {
160
        let job = msg;
6✔
161
        // Match Job Type to Queue Name, else use Default Queue
162
        let queue = _.find(this.queuesList, { name: job.type });
6✔
163
        let defaultQueue = _.find(this.queuesList, { name: this.defaultQueueName });
6✔
164
        if (_.isEmpty(queue)) {
6!
165
          queue = defaultQueue;
×
166
        }
167

168
        if (eventName === JOB_FAILED_EVENT) {
6!
169
          logger.error(`job@${job.type}#${job.id} failed with error #${job.error}`,
×
170
            _filterQueuedJob<JobFailed>(job, this.logger));
171
        } else if (eventName === JOB_DONE_EVENT) {
6!
172
          logger.verbose(`job#${job.id} done`, _filterQueuedJob<JobDone>(job, this.logger));
6✔
173
        }
174

175
        logger.info('Received Job event', { event: eventName });
6✔
176
        logger.info('Job details', job);
6✔
177
        const jobData: any = await queue.getJob(job.id).catch(error => {
6✔
178
          that.logger.error('Error retrieving job ${job.id} from queue', error);
×
179
        });
180

181
        if (job?.delete_scheduled) {
6✔
182
          await queue.removeRepeatable(jobData.name, jobData.opts.repeat);
1✔
183
        }
184
      });
185
    }
186

187
    // Initialize Event Listeners for each Queue
188
    for (let queue of this.queuesList) {
1✔
189
      queue.on('error', (error) => {
3✔
190
        logger.error('queue error', error);
×
191
      });
192
      queue.on('waiting', (job) => {
3✔
193
        logger.verbose(`job#${job.id} scheduled`, job);
9✔
194
      });
195
      queue.on('removed', (job) => {
3✔
196
        logger.verbose(`job#${job.id} removed`, job);
13✔
197
      });
198
      queue.on('progress', (job) => {
3✔
199
        logger.verbose(`job#${job.id} progress`, job);
×
200
      });
201
    }
202

203
    // If the scheduling service goes down and if there were
204
    // recurring jobs which have missed schedules then
205
    // we will need to reschedule it for those missing intervals.
206
    await this._rescheduleMissedJobs();
1✔
207
  }
208

209
  /**
210
   * To reschedule the missed recurring jobs upon service restart
211
   */
212
  async _rescheduleMissedJobs(): Promise<void> {
213
    // for jobs created via Kafka currently there are no acs checks
214
    this.disableAC();
1✔
215
    const createDispatch = [];
1✔
216
    let result: Job[] = [];
1✔
217
    let thiz = this;
1✔
218

219
    // Get the jobs
220
    for (let queueCfg of this.queuesConfigList) {
1✔
221
      // If enabled in the config, or the config is missing,b
222
      // Reschedule the missed jobs, else skip.
223
      let queue = _.find(this.queuesList, { name: queueCfg.name });
3✔
224
      let runMissedScheduled = queueCfg.runMissedScheduled;
3✔
225
      if (_.isNil(runMissedScheduled) ||
3!
226
        (!_.isNil(runMissedScheduled) && runMissedScheduled == true)) {
227
        await queue.getJobs(this.bullOptions['activeAndFutureJobTypes']).then(jobs => {
3✔
228
          result = result.concat(jobs);
3✔
229
        }).catch(error => {
230
          thiz.logger.error('Error reading jobs to reschedule the missed recurring jobs', error);
×
231
        });
232
      }
233
    }
234
    let lastRunTime;
235
    for (let job of result) {
1✔
236
      // get the last run time for the job, we store the last run time only
237
      // for recurring jobs
238
      if (job?.name) {
×
239
        try {
×
240
          lastRunTime = await this.redisClient.get(job.name);
×
241
        } catch (err) {
242
          this.logger.error('Error reading the last run time for job type:', { name: job.name, code: err.code, message: err.message, stack: err.stack });
×
243
        }
244
      }
245
      // we store lastRunTime only for recurring jobs and if it exists check
246
      // cron interval and schedule immediate jobs for missed intervals
247
      this.logger.info(`Last run time of ${job.name} Job was:`, lastRunTime);
×
248
      if (lastRunTime) {
×
249
        // convert redis string value to object and get actual time value
250
        try {
×
251
          lastRunTime = JSON.parse(lastRunTime);
×
252
        } catch (error) {
253
          this.logger.error('Error parsing lastRunTime', {
×
254
            code: error.code,
255
            message: error.message, stack: error.stack
256
          });
257
        }
258

259
        if ((job?.opts?.repeat as any)?.pattern && lastRunTime?.time) {
×
260
          let options = {
×
261
            currentDate: new Date(lastRunTime.time),
262
            endDate: new Date(),
263
            iterator: true
264
          };
265
          let intervalTime;
266
          try {
×
267
            intervalTime =
×
268
              parseExpression((job.opts.repeat as any).pattern, options);
269
          } catch (error) {
270
            this.logger.error('Error parsing cron expression running missed schedules', { code: error.code, message: error.message, stack: error.stack });
×
271
          }
272
          while (intervalTime?.hasNext()) {
×
273
            let nextInterval: any = intervalTime.next();
×
274
            const nextIntervalTime = nextInterval.value.toString();
×
275
            // schedule it as one time job for now or immediately
276
            const data = {
×
277
              payload: marshallProtobufAny({
278
                value: { time: nextIntervalTime }
279
              })
280
            };
281
            const currentTime = new Date();
×
282
            const when = new Date(currentTime.setSeconds(currentTime.getSeconds() + 2)).toISOString();
×
283
            const immediateJob: any = {
×
284
              type: job.name,
285
              data,
286
              // give a delay of 2 sec between each job
287
              // to avoid time out of queued jobs
288
              when,
289
              options: {}
290
            };
291
            createDispatch.push(thiz.create({
×
292
              items: [immediateJob],
293
              total_count: 0,
294
            }, {}));
295
          }
296
        }
297
      }
298
    }
299
    this.restoreAC();
1✔
300
    await createDispatch;
1✔
301
  }
302

303
  /**
304
   * Disabling of CRUD events.
305
   */
306
  disableEvents(): void {
307
    this.resourceEventsEnabled = false;
×
308
  }
309

310
  /**
311
   * Enabling of CRUD events.
312
   */
313
  enableEvents(): any {
314
    this.resourceEventsEnabled = true;
×
315
  }
316

317
  _validateJob(job: NewJob): NewJob {
318
    if (_.isNil(job.type)) {
11!
319
      throw new errors.InvalidArgument('Job type not specified.');
×
320
    }
321

322
    if (!job.options) {
11!
323
      job.options = {};
×
324
    }
325

326
    if (job.when) {
11✔
327
      // If the jobSchedule time has already lapsed then do not schedule the job
328
      const jobScheduleTime = new Date(job.when).getTime();
7✔
329
      const currentTime = new Date().getTime();
7✔
330
      if (jobScheduleTime < currentTime) {
7!
331
        throw new errors.InvalidArgument('Cannot schedule a job for an elapsed time');
×
332
      }
333

334
      job.options.delay = jobScheduleTime - currentTime;
7✔
335
    }
336

337
    if (job.options.backoff && typeof job.options.backoff !== 'number') {
11!
338
      if (typeof job.options.backoff.type === 'number') {
11!
339
        job.options.backoff.type = Object.keys(Backoff_Type)[job.options.backoff.type];
×
340
      }
341
      job.options.backoff.type = job.options.backoff.type.toLowerCase();
11✔
342
    }
343

344
    if (job.options.priority && typeof job.options.priority === 'string') {
11!
345
      job.options.priority = JobOptions_Priority[job.options.priority] as any;
11✔
346
    }
347

348
    if (_.isEmpty(job.data)) {
11!
349
      throw new errors.InvalidArgument('No job data specified.');
×
350
    }
351

352
    job.data = _filterJobData(job.data, false, this.logger);
11✔
353

354
    return job;
11✔
355
  }
356

357
  /**
358
   * get next job execution time in mili seconds
359
   * @param millis
360
   * @param opts
361
   */
362
  getNextMillis(millis, opts) {
363
    if (opts?.every) {
1!
364
      return Math.floor(millis / opts.every) * opts.every + opts.every;
×
365
    }
366

367
    const currentDate =
368
      opts?.startDate && new Date(opts.startDate) > new Date(millis)
1!
369
        ? new Date(opts.startDate)
370
        : new Date(millis);
371
    const interval = parseExpression(
1✔
372
      opts.cron,
373
      _.defaults(
374
        {
375
          currentDate
376
        },
377
        opts
378
      )
379
    );
380

381
    try {
1✔
382
      return interval.next().getTime();
1✔
383
    } catch (e) {
384
      this.logger.error('Error getting next job execution time');
×
385
    }
386
  }
387

388
  private md5(str) {
389
    return crypto
×
390
      .createHash('md5')
391
      .update(str)
392
      .digest('hex');
393
  }
394

395
  /**
396
   * store the mapping from repeateKey to external interface SCS job Id, and
397
   * also the mapping other way around i.e. from SCS job Id to repeatKey (needed for read operations)
398
   * @param name - job name
399
   * @param repeat - job repeate options
400
   * @param jobId - job id
401
   */
402
  async storeRepeatKey(repeatId, scsJobId, options) {
403
    try {
11✔
404
      if (repeatId && scsJobId) {
11!
405
        this.logger.info('Repeat key mapped to external SCS JobId', { repeatId, scsJobId });
11✔
406
        await this.repeatJobIdRedisClient.set(repeatId, scsJobId);
11✔
407
        const jobIdData = { repeatId, options };
11✔
408
        await this.repeatJobIdRedisClient.set(scsJobId, JSON.stringify(jobIdData));
11✔
409
      }
410
    } catch (error) {
411
      this.logger.error('Error storing repeatKey to redis', {
×
412
        code: error.code,
413
        message: error.message, stack: error.stack
414
      });
415
    }
416
  }
417

418
  private idGen(): string {
419
    return uuid.v4().replace(/-/g, '');
8✔
420
  }
421

422
  /**
423
   * Create and queue jobs.
424
   * @param {any} call RPC call argument
425
   * @param {any} ctx RPC context
426
   */
427
  async create(request: JobList, ctx: any): Promise<DeepPartial<JobListResponse>> {
428
    let jobListResponse: JobListResponse = { items: [], operation_status: { code: 0, message: '' }, total_count: 0 };
9✔
429
    let subject = request.subject;
9✔
430
    if (_.isNil(request) || _.isNil(request.items)) {
9!
431
      return {
×
432
        items: [],
433
        total_count: 0,
434
        operation_status: {
435
          code: 400,
436
          message: 'Missing items in create request'
437
        }
438
      };
439
    }
440

441
    await this.createMetadata(request.items, AuthZAction.CREATE, subject);
9✔
442
    let acsResponse: DecisionResponse;
443
    try {
9✔
444
      if (!ctx) { ctx = {}; };
9!
445
      ctx.subject = subject;
9✔
446
      ctx.resources = request?.items;
9✔
447
      acsResponse = await checkAccessRequest(ctx, [{
9✔
448
        resource: 'job',
449
        id: request.items.map(item => item.id)
12✔
450
      }], AuthZAction.CREATE, Operation.isAllowed);
451
    } catch (err) {
452
      this.logger.error('Error requesting access-control-srv for create meta data', { code: err.code, message: err.message, stack: err.stack });
×
453
      return {
×
454
        items: [],
455
        total_count: 0,
456
        operation_status: {
457
          code: err.code,
458
          message: err.message
459
        }
460
      };
461
    }
462
    if (acsResponse.decision != Response_Decision.PERMIT) {
9✔
463
      return { items: [], total_count: 0, operation_status: acsResponse.operation_status };
1✔
464
    }
465

466
    let jobs: NewJob[] = [];
8✔
467
    for (let job of request?.items || []) {
8!
468
      try {
11✔
469
        jobs.push(this._validateJob(job as any));
11✔
470
      } catch (err) {
471
        this.logger.error('Error validating job', job);
×
472
        jobListResponse.items.push({
×
473
          status: {
474
            id: job.id,
475
            code: 400,
476
            message: err.message
477
          }
478
        });
479
      }
480
    }
481

482
    let result: Job[] = [];
8✔
483
    // Scheduling jobs
484
    for (let i = 0; i < jobs.length; i += 1) {
8✔
485
      let job = jobs[i];
11✔
486
      // if not jobID is specified generate a UUID
487
      if (!job.id) {
11✔
488
        job.id = this.idGen();
8✔
489
      }
490

491
      // map the id to jobId as needed in JobOpts for bull
492
      if (job?.id) {
11!
493
        // check if jobID already exists then map it as already exists error
494
        const existingJobId = await this.getRedisValue(job.id);
11✔
495
        if (existingJobId) {
11!
496
          // read job to check if data exists
497
          const jobData = await this.read(JobReadRequest.fromPartial({
×
498
            filter: {
499
              job_ids: [job.id]
500
            },
501
            subject
502
          }), ctx);
503
          if ((jobData?.items as any)[0]?.payload) {
×
504
            jobListResponse.items.push({
×
505
              status: {
506
                id: job.id,
507
                code: 403,
508
                message: `Job with ID ${job.id} already exists`
509
              }
510
            });
511
            continue;
×
512
          }
513
        }
514
        if (!job?.options) {
11!
515
          job.options = { jobId: job.id };
×
516
        } else {
517
          job.options.jobId = job.id;
11✔
518
        }
519
        if (job?.options?.repeat) {
11✔
520
          (job as any).options.repeat.jobId = job.id;
2✔
521
        }
522
      }
523

524
      if (!job.data.meta) {
11!
525
        const now = new Date();
×
526
        const metaObj = {
×
527
          created: now,
528
          modified: now,
529
          modified_by: '',
530
          owners: []
531
        };
532
        Object.assign(job.data, { meta: metaObj });
×
533
      }
534
      // if only owners are specified in meta
535
      if (job.data.meta && (!job.data.meta.created || !job.data.meta.modified)) {
11✔
536
        job.data.meta.created = new Date();
9✔
537
        job.data.meta.modified = new Date();
9✔
538
      }
539

540
      if (job?.data?.payload?.value) {
11!
541
        job.data.payload.value = job.data.payload.value.toString() as any;
11✔
542
      }
543

544
      // convert enum priority back to number as it's expected by bull
545
      if (job?.options?.priority) {
11!
546
        job.options.priority = typeof job.options.priority === 'number' ? job.options.priority : Priority[job.options.priority] as unknown as number;
11!
547
      }
548

549
      // if its a repeat job and tz is empty delete the key (else cron parser throws an error)
550
      if (job?.options?.repeat?.tz === '') {
11!
551
        delete job.options.repeat.tz;
×
552
      }
553

554
      const bullOptions = {
11✔
555
        ...job.options
556
      };
557

558
      if ((bullOptions as any).timeout === 1) {
11✔
559
        delete bullOptions['timeout'];
2✔
560
      }
561

562
      // Match the Job Type with the Queue Name and add the Job to this Queue.
563
      // If there is no match, add the Job to the Default Queue
564
      let queue = _.find(this.queuesList, { name: job?.queue_name });
11✔
565
      if (_.isEmpty(queue)) {
11✔
566
        queue = _.find(this.queuesList, { name: this.defaultQueueName });
2✔
567
      }
568
      const submittedJob = await queue.add(job.type, job.data, bullOptions);
11✔
569
      if (submittedJob?.id?.startsWith('repeat:')) {
11✔
570
        const repeatJobId = submittedJob?.id?.split(':')[1];
2✔
571
        await this.storeRepeatKey(repeatJobId, job.id, job.options);
2✔
572
      } else if (submittedJob?.id) {
9!
573
        // future job with when
574
        await this.storeRepeatKey(submittedJob.id, job.id, job.options);
9✔
575
      }
576
      this.logger.verbose(`job@${job.type} created`, job);
11✔
577
      result.push(submittedJob);
11✔
578
    }
579

580
    for (let job of result) {
8✔
581
      let jobId = job.id as string;
11✔
582
      if (jobId.startsWith('repeat:')) {
11✔
583
        const repeatKey = jobId.split(':')[1];
2✔
584
        job.id = await this.getRedisValue(repeatKey);
2✔
585
      }
586
    }
587

588
    for (let job of result) {
8✔
589
      let when = job?.opts?.delay ? new Date(job?.opts?.delay).toString() : '';
11✔
590
      jobListResponse.items.push({
11✔
591
        payload: {
592
          id: job.id as string,
593
          type: job.name,
594
          data: _filterJobData(job.data, true, this.logger),
595
          options: _filterJobOptions(job.opts) as any,
596
          when
597
        },
598
        status: {
599
          id: (job?.id)?.toString(),
600
          code: 200,
601
          message: 'success'
602
        }
603
      });
604
    }
605
    const jobList = {
8✔
606
      items: result.map(job => ({
11✔
607
        id: job.id,
608
        type: job.name,
609
        data: _filterJobData(job.data, true, this.logger),
610
        options: _filterJobOptions(job.opts)
611
      })),
612
      total_count: result.length
613
    };
614

615
    if (this.resourceEventsEnabled) {
8!
616
      await this.jobEvents.emit('jobsCreated', jobList);
8✔
617
    }
618

619
    jobListResponse.operation_status = { code: 200, message: 'success' };
8✔
620
    return jobListResponse;
8✔
621
  }
622

623
  private filterByOwnerShip(customArgsObj, result) {
624
    // applying filter based on custom arguments (filterByOwnerShip)
625
    let customArgs = (customArgsObj)?.custom_arguments;
24✔
626
    if (customArgs?.value) {
24!
627
      let customArgsFilter;
628
      try {
24✔
629
        customArgsFilter = JSON.parse(customArgs.value.toString());
24✔
630
      } catch (error) {
631
        this.logger.error('Error parsing custom query arguments', {
×
632
          code: error.code,
633
          message: error.message, stack: error.stack
634
        });
635
      }
636
      if (!customArgsFilter) {
24!
637
        return [];
×
638
      }
639
      const ownerIndicatorEntity = customArgsFilter.entity;
24✔
640
      const ownerValues = customArgsFilter.instance;
24✔
641
      const ownerIndictaorEntURN = this.cfg.get('authorization:urns:ownerIndicatoryEntity');
24✔
642
      const ownerInstanceURN = this.cfg.get('authorization:urns:ownerInstance');
24✔
643
      result = result.filter(job => {
24✔
644
        if (job?.data?.meta?.owners?.length > 0) {
36!
645
          for (let owner of job.data.meta.owners) {
36✔
646
            if (owner?.id === ownerIndictaorEntURN && owner?.value === ownerIndicatorEntity && owner?.attributes?.length > 0) {
36!
647
              for (let ownerInstObj of owner.attributes) {
36✔
648
                if (ownerInstObj?.id === ownerInstanceURN && ownerInstObj?.value && ownerValues.includes(ownerInstObj.value)) {
36!
649
                  return job;
36✔
650
                }
651
              }
652
            }
653
          }
654
        }
655
      });
656
    }
657
    return result;
24✔
658
  }
659

660
  async deleteRedisKey(key: string): Promise<any> {
661
    try {
4✔
662
      await this.repeatJobIdRedisClient.del(key);
4✔
663
      this.logger.debug('Redis Key deleted successfully used for mapping repeatable jobID', { key });
4✔
664
    } catch (err) {
665
      this.logger.error('Error deleting redis key', { key, msg: err.message, stack: err.stack });
×
666
    }
667
  }
668

669
  async getRedisValue(key: string): Promise<any> {
670
    let redisValue;
671
    try {
45✔
672
      if (key) {
45!
673
        redisValue = await this.repeatJobIdRedisClient.get(key);
45✔
674
      }
675
      if (redisValue) {
45✔
676
        return JSON.parse(redisValue);
32✔
677
      } else {
678
        return;
13✔
679
      }
680
    } catch (err) {
681
      if (err?.message?.startsWith('Unexpected token') || err.message.startsWith('Unexpected number')) {
13!
682
        return redisValue;
13✔
683
      } else {
UNCOV
684
        this.logger.error('Error reading redis key', { key, msg: err.message, stack: err.stack });
×
685
      }
686
    }
687
  }
688

689

690
  /**
691
   * Retrieve jobs.
692
   * @param {any} call RPC call argument
693
   * @param {any} ctx RPC context
694
   */
695
  async read(request: JobReadRequest, ctx: any): Promise<DeepPartial<JobListResponse>> {
696
    let jobListResponse: JobListResponse = { items: [], operation_status: { code: 0, message: '' }, total_count: 0 };
27✔
697
    let subject = request.subject;
27✔
698
    let acsResponse: PolicySetRQResponse;
699
    try {
27✔
700
      if (!ctx) { ctx = {}; };
27!
701
      ctx.subject = subject;
27✔
702
      ctx.resources = [];
27✔
703
      acsResponse = await checkAccessRequest(ctx, [{ resource: 'job' }], AuthZAction.READ,
27✔
704
        Operation.whatIsAllowed) as PolicySetRQResponse;
705
    } catch (err) {
706
      this.logger.error('Error requesting access-control-srv for read operation', { code: err.code, message: err.message, stack: err.stack });
×
707
      return {
×
708
        operation_status: {
709
          code: err.code,
710
          message: err.message
711
        }
712
      };
713
    }
714
    if (acsResponse.decision != Response_Decision.PERMIT) {
27✔
715
      return { operation_status: acsResponse.operation_status };
3✔
716
    }
717

718
    let result: Job[] = [];
24✔
719
    if (_.isEmpty(request) || _.isEmpty(request.filter)
24!
720
      && (!request.filter || !request.filter.job_ids
721
        || _.isEmpty(request.filter.job_ids))
722
      && (!request.filter || !request.filter.type ||
723
        _.isEmpty(request.filter.type))) {
724
      result = await this._getJobList();
12✔
725
      let custom_arguments;
726
      if (acsResponse?.custom_query_args?.length > 0) {
12!
727
        custom_arguments = acsResponse.custom_query_args[0].custom_arguments;
12✔
728
      }
729
      result = this.filterByOwnerShip({ custom_arguments }, result);
12✔
730
    } else {
731
      const that = this;
12✔
732
      let jobIDs = request.filter.job_ids || [];
12✔
733
      if (!_.isArray(jobIDs)) {
12!
734
        jobIDs = [jobIDs];
×
735
      }
736
      const typeFilterName = request.filter.type;
12✔
737

738
      // Search in all the queues and retrieve jobs after JobID
739
      // and add them to the jobIDsCopy list.
740
      // If the job is not found in any of the queues, continue looking
741
      // Finally compare the two lists and add an error to status for which
742
      // job could not be found in any of the queues.
743
      if (jobIDs.length > 0) {
12✔
744
        // jobIDsCopy should contain the jobIDs duplicate values
745
        // after the for loop ends
746
        let jobIDsCopy: string[] = [];
11✔
747
        for (let jobID of jobIDs) {
11✔
748
          const jobIdData = await this.getRedisValue(jobID as string);
11✔
749
          // future jobs scheduled with `when` will have same repeatId as external SCS jobID
750
          if (jobIdData?.repeatId && (jobIdData.repeatId != jobID)) {
11✔
751
            const repeatId = jobIdData.repeatId;
1✔
752
            if (jobIdData?.options?.repeat?.cron && jobIdData?.options?.repeat?.every) {
1!
753
              jobListResponse.items.push({
×
754
                status: {
755
                  id: jobID.toString(),
756
                  code: 400,
757
                  message: 'Both .cron and .every options are defined for this repeatable job'
758
                }
759
              });
760
              continue;
×
761
            }
762
            const nextMillis = this.getNextMillis(Date.now(), jobIdData.options.repeat);
1✔
763
            this.logger.debug('Repeatable job identifier', { id: jobID, repeatId: `repeat:${repeatId}:${nextMillis}` });
1✔
764
            // map the repeatKey with nextmilis for bull repeatable jobID
765
            jobID = `repeat:${repeatId}:${nextMillis}`;
1✔
766
          }
767
          for (let queue of this.queuesList) {
11✔
768
            await new Promise((resolve, reject) => {
33✔
769
              // getJob returns job or null
770
              queue.getJob(jobID).then(job => {
33✔
771
                if (job?.opts?.repeat?.pattern) {
33✔
772
                  (job.opts.repeat as any).cron = job.opts.repeat.pattern;
1✔
773
                  delete job.opts.repeat.pattern;
1✔
774
                }
775
                resolve(job);
33✔
776
                if (!_.isEmpty(job)) {
33✔
777
                  result.push(job);
10✔
778
                  if ((job as any)?.opts?.repeat?.jobId) {
10✔
779
                    jobIDsCopy.push((job as any).opts.repeat.jobId);
1✔
780
                  } else {
781
                    jobIDsCopy.push(jobID);
9✔
782
                  }
783
                }
784
              }).catch(err => {
785
                that.logger.error(`Error reading job ${jobID}`, err);
×
786
                if (err?.code && typeof err.code === 'string') {
×
787
                  err.code = 500;
×
788
                }
789
                jobListResponse.items.push({
×
790
                  status: {
791
                    id: jobID.toString(),
792
                    code: err.code,
793
                    message: err.message
794
                  }
795
                });
796
              });
797
            });
798
          }
799
        }
800
        if (!_.isEqual(jobIDs.sort(), jobIDsCopy.sort())) {
11✔
801
          const jobIDsDiff = _.difference(jobIDs, jobIDsCopy);
1✔
802
          for (let jobId of jobIDsDiff) {
1✔
803
            jobListResponse.items.push({
1✔
804
              status: {
805
                id: jobId.toString(),
806
                code: 404,
807
                message: `Job ID ${jobId} not found in any of the queues`
808
              }
809
            });
810
          }
811
        }
812
      } else {
813
        try {
1✔
814
          let jobsList: any[] = [];
1✔
815
          for (let queue of this.queuesList) {
1✔
816
            const getJobsResult = await queue.getJobs(this.bullOptions['activeAndFutureJobTypes']);
3✔
817
            getJobsResult.forEach((job) => {
3✔
818
              if (job?.opts?.repeat?.pattern) {
5!
819
                (job.opts.repeat as any).cron = job.opts.repeat.pattern;
×
820
                delete job.opts.repeat.pattern;
×
821
              }
822
            });
823
            jobsList = jobsList.concat(getJobsResult);
3✔
824
          }
825
          result = jobsList;
1✔
826
        } catch (err) {
827
          that.logger.error('Error reading jobs', err);
×
828
          if (typeof err.code === 'string') {
×
829
            err.code = 500;
×
830
          }
831
          return {
×
832
            operation_status: {
833
              code: err.code,
834
              message: err.message
835
            }
836
          };
837
        }
838
      }
839

840
      if (typeFilterName) {
12✔
841
        result = result.filter(job => job.name === typeFilterName);
6✔
842
      }
843
      let custom_arguments;
844
      if (acsResponse?.custom_query_args?.length > 0) {
12!
845
        custom_arguments = acsResponse.custom_query_args[0].custom_arguments;
12✔
846
      }
847
      result = this.filterByOwnerShip({ custom_arguments }, result);
12✔
848
    }
849

850
    result = result.filter(valid => !!valid);
36✔
851

852
    if (!_.isEmpty(request) && !_.isEmpty(request.sort)
24✔
853
      && _.includes(['ASCENDING', 'DESCENDING'], request.sort)) {
854
      let sort;
855
      switch (request.sort) {
2!
856
        case JobReadRequest_SortOrder.DESCENDING:
857
          sort = 'desc';
1✔
858
          break;
1✔
859
        case JobReadRequest_SortOrder.ASCENDING:
860
          sort = 'asc';
1✔
861
          break;
1✔
862
        default:
863
          this.logger.error(`Unknown sort option ${sort}`);
×
864
      }
865
      result = _.orderBy(result, ['id'], [sort]);
2✔
866
    }
867

868
    for (let job of result) {
24✔
869
      let jobId = job.id as string;
36✔
870
      if (jobId.startsWith('repeat:')) {
36✔
871
        const repeatKey = jobId.split(':')[1];
11✔
872
        // it could be possible the redis repeat key is deleted on index 8 and old completed
873
        // jobs exist in data_store if delete on complete was not set to true for repeatable jobs
874
        const jobRedisId = await this.getRedisValue(repeatKey);
11✔
875
        if (jobRedisId) {
11!
876
          job.id = jobRedisId;
11✔
877
        }
878
      }
879
    }
880

881
    for (let job of result) {
24✔
882
      let when = job?.opts?.delay ? new Date(job?.opts?.delay).toString() : '';
36!
883
      jobListResponse.items.push({
36✔
884
        payload: {
885
          id: job.id as string,
886
          type: job.name,
887
          data: _filterJobData(job.data, true, this.logger),
888
          options: _filterJobOptions(job.opts) as any,
889
          when
890
        },
891
        status: {
892
          id: job.id.toString(),
893
          code: 200,
894
          message: 'success'
895
        }
896
      });
897
    }
898
    jobListResponse.total_count = jobListResponse?.items?.length;
24✔
899
    jobListResponse.operation_status = {
24✔
900
      code: 200,
901
      message: 'success'
902
    };
903
    return jobListResponse;
24✔
904
  }
905

906
  async _getJobList(): Promise<Job[]> {
907
    let jobsList: any[] = [];
14✔
908
    for (let queue of this.queuesList) {
14✔
909
      const getJobsResult = await queue.getJobs(this.bullOptions['activeAndFutureJobTypes']);
42✔
910
      getJobsResult.forEach((job) => {
42✔
911
        if (job?.opts?.repeat?.pattern) {
26!
912
          (job.opts.repeat as any).cron = job.opts.repeat.pattern;
×
913
          delete job.opts.repeat.pattern;
×
914
        }
915
      });
916
      jobsList = jobsList.concat(getJobsResult);
42✔
917
    }
918
    return jobsList;
14✔
919
  }
920

921
  // delete a job by its job instance after processing 'jobDone' / 'jobFailed'
922
  async _deleteJobInstance(jobId: string, queue: Queue): Promise<void> {
923
    return this._removeBullJob(jobId, queue);
×
924
  }
925

926
  /**
927
   * Delete Job from queue.
928
   */
929
  async delete(request: DeleteRequest, ctx: any): Promise<DeepPartial<DeleteResponse>> {
930
    let deleteResponse: DeleteResponse = { status: [], operation_status: { code: 0, message: '' } };
6✔
931
    if (_.isEmpty(request)) {
6!
932
      return {
×
933
        operation_status: {
934
          code: 400,
935
          message: 'No arguments provided for delete operation'
936
        }
937
      };
938
    }
939
    const subject = request?.subject;
6✔
940
    const jobIDs = request?.ids;
6✔
941
    let resources = [];
6✔
942
    let action;
943
    if (jobIDs) {
6✔
944
      action = AuthZAction.DELETE;
3✔
945
      if (_.isArray(jobIDs)) {
3!
946
        for (let id of jobIDs) {
3✔
947
          resources.push({ id });
3✔
948
        }
949
      } else {
950
        resources = [{ id: jobIDs }];
×
951
      }
952
      await this.createMetadata(resources, action, subject);
3✔
953
    }
954
    if (request.collection) {
6✔
955
      action = AuthZAction.DROP;
3✔
956
      resources = [{ collection: request.collection }];
3✔
957
    }
958
    let acsResponse: DecisionResponse;
959
    try {
6✔
960
      if (!ctx) { ctx = {}; };
6!
961
      ctx.subject = subject;
6✔
962
      ctx.resources = resources;
6✔
963
      acsResponse = await checkAccessRequest(ctx, [{ resource: 'job', id: jobIDs as string[] }], action,
6✔
964
        Operation.isAllowed);
965
    } catch (err) {
966
      this.logger.error('Error requesting access-control-srv for delete operation', { code: err.code, message: err.message, stack: err.stack });
×
967
      return {
×
968
        operation_status: {
969
          code: err.code,
970
          message: err.message
971
        }
972
      };
973
    }
974
    if (acsResponse.decision != Response_Decision.PERMIT) {
6✔
975
      return { operation_status: acsResponse.operation_status };
1✔
976
    }
977
    const dispatch = [];
5✔
978
    this.logger.info('Received delete request');
5✔
979
    if ('collection' in request && request.collection) {
5✔
980
      this.logger.verbose('Deleting all jobs');
2✔
981

982
      await this._getJobList().then(async (jobs) => {
2✔
983
        for (let job of jobs) {
2✔
984
          await job.remove();
5✔
985
          if (this.resourceEventsEnabled) {
5!
986
            dispatch.push(this.jobEvents.emit('jobsDeleted', { id: job.id }));
5✔
987
          }
988
          deleteResponse.status.push({
5✔
989
            id: job.id.toString(),
990
            code: 200,
991
            message: 'success'
992
          });
993
        }
994
      });
995
      // FLUSH redis DB index 8 used for mapping of repeat jobIds (since req is for dropping job collection)
996
      const delResp = await this.repeatJobIdRedisClient.flushDb();
2✔
997
      if (delResp) {
2!
998
        this.logger.debug('Mapped keys for repeatable jobs deleted successfully');
2✔
999
      } else {
1000
        this.logger.debug('Could not delete repeatable job keys');
×
1001
      }
1002
    } else if ('ids' in request) {
3!
1003
      this.logger.verbose('Deleting jobs by their IDs', { id: request.ids });
3✔
1004

1005
      for (let queue of this.queuesList) {
3✔
1006
        for (let jobDataKey of request.ids) {
9✔
1007
          let callback: Promise<boolean>;
1008
          const jobIdData = await this.getRedisValue(jobDataKey as string);
9✔
1009
          // future jobs scheduled with `when` will have same repeatId as external SCS jobID
1010
          if (jobIdData && jobIdData.repeatId && (jobIdData.repeatId != jobDataKey)) {
9✔
1011
            const jobs = await queue.getRepeatableJobs();
2✔
1012
            for (let job of jobs) {
2✔
1013
              if (job.id === jobDataKey) {
1!
1014
                this.logger.debug('Removing Repeatable job by key for jobId', { id: job.id });
1✔
1015
                callback = queue.removeRepeatableByKey(job.key);
1✔
1016
                deleteResponse.status.push({
1✔
1017
                  id: job.id,
1018
                  code: 200,
1019
                  message: 'success'
1020
                });
1021
                await this.deleteRedisKey(jobDataKey as string);
1✔
1022
                await this.deleteRedisKey(jobIdData.repeatId);
1✔
1023
                break;
1✔
1024
              }
1025
            }
1026
          } else {
1027
            callback = queue.getJob(jobDataKey).then(async (jobData) => {
7✔
1028
              if (jobData) {
7✔
1029
                try {
2✔
1030
                  await this._removeBullJob(jobData.id, queue);
2✔
1031
                  await this.deleteRedisKey(jobData.id);
2✔
1032
                  deleteResponse.status.push({
2✔
1033
                    id: jobData.id.toString(),
1034
                    code: 200,
1035
                    message: 'success'
1036
                  });
1037
                } catch (err) {
1038
                  if (typeof err?.code === 'string') {
×
1039
                    err.code = 500;
×
1040
                  }
1041
                  deleteResponse.status.push({
×
1042
                    id: jobData.id.toString(),
1043
                    code: err.code,
1044
                    message: err.message
1045
                  });
1046
                  return false;
×
1047
                }
1048
                return true;
2✔
1049
              }
1050
              return false;
5✔
1051
            });
1052
          }
1053

1054
          // since no CB is returned for removeRepeatableByKey by bull
1055
          if (!callback) {
9✔
1056
            if (this.resourceEventsEnabled) {
1!
1057
              dispatch.push(this.jobEvents.emit(
1✔
1058
                'jobsDeleted', { id: jobDataKey })
1059
              );
1060
            }
1061
          } else {
1062
            callback.then(() => {
8✔
1063
              if (this.resourceEventsEnabled) {
8!
1064
                dispatch.push(this.jobEvents.emit(
8✔
1065
                  'jobsDeleted', { id: jobDataKey })
1066
                );
1067
              }
1068
            }).catch(err => {
1069
              this.logger.error('Error deleting job', { id: jobDataKey });
×
1070
              if (typeof err?.code === 'number') {
×
1071
                err.code = 500;
×
1072
              }
1073
              deleteResponse.status.push({
×
1074
                id: jobDataKey.toString(),
1075
                code: err.code,
1076
                message: err.message
1077
              });
1078
            });
1079
          }
1080
        }
1081
      }
1082
    }
1083

1084
    await Promise.all(dispatch);
5✔
1085
    deleteResponse.operation_status = { code: 200, message: 'success' };
5✔
1086
    return deleteResponse;
5✔
1087
  }
1088

1089
  /**
1090
   * Clean up queues - removes complted and failed jobs from queue
1091
   * @param {any} job clean up job
1092
   */
1093
  async cleanupJobs(ttlAfterFinished) {
1094
    for (let queue of this.queuesList) {
×
1095
      try {
×
1096
        await queue.clean(ttlAfterFinished, 0, COMPLETED_JOB_STATE);
×
1097
        await queue.clean(ttlAfterFinished, 0, FAILED_JOB_STATE);
×
1098
      } catch (err) {
1099
        this.logger.error('Error cleaning up jobs', err);
×
1100
      }
1101
    }
1102
    this.logger.info('Jobs cleaned up successfully');
×
1103
    let lastExecutedInterval = { lastExecutedInterval: (new Date()).toString() };
×
1104
    await this.repeatJobIdRedisClient.set(QUEUE_CLEANUP, JSON.stringify(lastExecutedInterval));
×
1105
  }
1106

1107
  async setupCleanInterval(cleanInterval: number, ttlAfterFinished: number) {
1108
    if (!ttlAfterFinished) {
×
1109
      ttlAfterFinished = DEFAULT_CLEANUP_COMPLETED_JOBS;
×
1110
    }
1111
    const intervalData = await this.getRedisValue(QUEUE_CLEANUP);
×
1112
    let timeInMs, delta;
1113
    const now = new Date().getTime();
×
1114
    if (intervalData?.lastExecutedInterval && typeof (intervalData.lastExecutedInterval) === 'string') {
×
1115
      timeInMs = new Date(intervalData.lastExecutedInterval).getTime();
×
1116
      this.logger.debug('Previous execution interval', intervalData);
×
1117
      delta = now - timeInMs;
×
1118
      this.logger.debug('Clean interval and previous difference', { cleanInterval, difference: delta });
×
1119
    }
1120

1121
    if (delta && (delta < cleanInterval)) {
×
1122
      // use setTimeout and then create interval on setTimeout
1123
      this.logger.info('Restoring previous execution interval with set timeout', { time: cleanInterval - delta });
×
1124
      setTimeout(async () => {
×
1125
        await this.cleanupJobs(ttlAfterFinished);
×
1126
        setInterval(this.cleanupJobs.bind(this), cleanInterval, ttlAfterFinished);
×
1127
      }, cleanInterval - delta);
1128
    } else {
1129
      setInterval(this.cleanupJobs.bind(this), cleanInterval, ttlAfterFinished);
×
1130
      this.logger.info('Clean up job interval set successfully');
×
1131
    }
1132
  }
1133

1134
  /**
1135
   * Reschedules a job - deletes it and recreates it with a new generated ID.
1136
   */
1137
  async update(request: JobList, ctx: any): Promise<DeepPartial<JobListResponse>> {
1138
    let subject = request.subject;
3✔
1139
    // update meta data for owners information
1140
    await this.createMetadata(request.items, AuthZAction.MODIFY, subject);
3✔
1141
    let acsResponse: DecisionResponse;
1142
    try {
3✔
1143
      if (!ctx) { ctx = {}; };
3!
1144
      ctx.subject = subject;
3✔
1145
      ctx.resources = request?.items;
3✔
1146
      acsResponse = await checkAccessRequest(ctx,
3✔
1147
        [{ resource: 'job', id: request.items.map(item => item.id) }],
3✔
1148
        AuthZAction.MODIFY, Operation.isAllowed);
1149
    } catch (err) {
1150
      this.logger.error('Error requesting access-control-srv for update operation', { code: err.code, message: err.message, stack: err.stack });
×
1151
      return {
×
1152
        operation_status: {
1153
          code: err.code,
1154
          message: err.message
1155
        }
1156
      };
1157
    }
1158
    if (acsResponse.decision != Response_Decision.PERMIT) {
3✔
1159
      return { operation_status: acsResponse.operation_status };
1✔
1160
    }
1161
    if (_.isNil(request) || _.isNil(request.items)) {
2!
1162
      return {
×
1163
        operation_status: {
1164
          code: 400,
1165
          message: 'Missing items in update request'
1166
        }
1167
      };
1168
    }
1169

1170
    const mappedJobs = request?.items?.reduce((obj, job) => {
2✔
1171
      obj[job.id] = job;
2✔
1172
      return obj;
2✔
1173
    }, {});
1174

1175
    const jobData = await this.read(JobReadRequest.fromPartial(
2✔
1176
      {
1177
        filter: {
1178
          job_ids: Object.keys(mappedJobs)
1179
        },
1180
        subject
1181
      }
1182
    ), ctx);
1183

1184
    await this.delete(DeleteRequest.fromPartial({
2✔
1185
      ids: Object.keys(mappedJobs),
1186
      subject
1187
    }), {});
1188

1189
    const result = [];
2✔
1190

1191
    jobData?.items?.forEach(async (job) => {
2✔
1192
      const mappedJob = mappedJobs[job?.payload?.id];
2✔
1193
      let endJob = {
2✔
1194
        id: mappedJob.id,
1195
        type: mappedJob.type,
1196
        options: {
1197
          ...job.payload.options,
1198
          ...(mappedJob.options ? mappedJob.options : {})
2!
1199
        },
1200
        data: mappedJob.data || job.payload.data,
2!
1201
        when: mappedJob.when,
1202
      };
1203

1204
      if (endJob.when && endJob.options) {
2!
1205
        delete endJob.options['delay'];
2✔
1206
      }
1207

1208
      result.push(endJob);
2✔
1209
    });
1210

1211
    return this.create(JobList.fromPartial({
2✔
1212
      items: result,
1213
      subject
1214
    }), ctx);
1215
  }
1216

1217
  /**
1218
   * Upserts a job - creates a new job if it does not exist or update the
1219
   * existing one if it already exists.
1220
   */
1221
  async upsert(request: JobList, ctx: any): Promise<DeepPartial<JobListResponse>> {
1222
    let subject = request.subject;
2✔
1223
    await this.createMetadata(request.items, AuthZAction.MODIFY, subject);
2✔
1224
    let acsResponse: DecisionResponse;
1225
    try {
2✔
1226
      if (!ctx) { ctx = {}; };
2!
1227
      ctx.subject = subject;
2✔
1228
      ctx.resources = request.items;
2✔
1229
      acsResponse = await checkAccessRequest(ctx,
2✔
1230
        [{ resource: 'job', id: request.items.map(item => item.id) }],
2✔
1231
        AuthZAction.MODIFY, Operation.isAllowed);
1232
    } catch (err) {
1233
      this.logger.error('Error requesting access-control-srv for upsert operation', { code: err.code, message: err.message, stack: err.stack });
×
1234
      return {
×
1235
        operation_status: {
1236
          code: err.code,
1237
          message: err.message
1238
        }
1239
      };
1240
    }
1241

1242
    if (acsResponse.decision != Response_Decision.PERMIT) {
2✔
1243
      return { operation_status: acsResponse.operation_status };
1✔
1244
    }
1245
    if (_.isNil(request) || _.isNil(request.items)) {
1!
1246
      return { operation_status: { code: 400, message: 'Missing items in upsert request' } };
×
1247
    }
1248

1249
    let result = [];
1✔
1250

1251
    for (let eachJob of request.items) {
1✔
1252
      let jobExists = false;
1✔
1253
      let origJobId = _.cloneDeep(eachJob.id);
1✔
1254
      for (let queue of this.queuesList) {
1✔
1255
        const jobIdData = await this.getRedisValue(eachJob.id as string);
1✔
1256
        // future jobs scheduled with `when` will have same repeatId as external SCS jobID
1257
        if (jobIdData?.repeatId && (jobIdData.repeatId != origJobId)) {
1!
1258
          const repeatId = jobIdData.repeatId;
×
1259
          if (jobIdData?.options?.repeat?.cron && jobIdData?.options?.repeat?.every) {
×
1260
            result.push({
×
1261
              status: {
1262
                id: origJobId,
1263
                code: 400,
1264
                message: 'Both .cron and .every options are defined for this repeatable job'
1265
              }
1266
            });
1267
            continue;
×
1268
          }
1269
          const nextMillis = this.getNextMillis(Date.now(), jobIdData.options.repeat);
×
1270
          this.logger.debug('Repeatable job identifier', { id: eachJob.id, repeatId: `repeat:${repeatId}:${nextMillis}` });
×
1271
          // map the repeatKey with nextmilis for bull repeatable jobID
1272
          eachJob.id = `repeat:${repeatId}:${nextMillis}`;
×
1273
        }
1274
        const jobInst = await queue.getJob(eachJob.id);
1✔
1275
        if (jobInst) {
1!
1276
          // existing job update it with the given job identifier
1277
          if (eachJob.id.startsWith('repeat:')) {
1!
1278
            eachJob.id = origJobId;
×
1279
          }
1280
          result = [
1✔
1281
            ...result,
1282
            ...(await this.update(JobList.fromPartial({ items: [eachJob], subject }), ctx)).items
1283
          ];
1284
          jobExists = true;
1✔
1285
          break;
1✔
1286
        }
1287
      }
1288
      if (!jobExists) {
1!
1289
        // new job create it
1290
        result = [
×
1291
          ...result,
1292
          ...(await this.create(JobList.fromPartial({ items: [eachJob], subject }), ctx)).items
1293
        ];
1294
      }
1295
    }
1296

1297
    return {
1✔
1298
      items: result,
1299
      total_count: result.length,
1300
      operation_status: {
1301
        code: 200,
1302
        message: 'success'
1303
      }
1304
    };
1305
  }
1306

1307
  /**
1308
   * Clear all job data.
1309
   */
1310
  async clear(): Promise<any> {
1311
    let allJobs: any[] = [];
1✔
1312
    for (let queue of this.queuesList) {
1✔
1313
      allJobs = allJobs.concat(await queue.getJobs(this.bullOptions['allJobTypes']));
3✔
1314
    }
1315
    return Promise.all(allJobs.map(async (job) => job.remove())).catch(err => {
6✔
1316
      this.logger.error(`Error clearing jobs`, err);
×
1317
      throw err;
×
1318
    });
1319
  }
1320

1321
  async _removeBullJob(jobInstID: string, queue: Queue): Promise<void> {
1322
    return queue.getJob(jobInstID).then(job => {
2✔
1323
      if (job) {
2!
1324
        return job.remove();
2✔
1325
      }
1326
    }).then(() => {
1327
      this.logger.info(`Job#${jobInstID} removed`);
2✔
1328
    }).catch(err => {
1329
      this.logger.error(`Error removing job ${jobInstID}`, err);
×
1330
      throw err;
×
1331
    });
1332
  }
1333

1334
  /**
1335
   *  disable access control
1336
   */
1337
  disableAC() {
1338
    try {
1✔
1339
      this.cfg.set('authorization:enabled', false);
1✔
1340
      updateConfig(this.cfg);
1✔
1341
    } catch (err) {
1342
      this.logger.error('Error caught disabling authorization:', { err });
×
1343
      this.cfg.set('authorization:enabled', this.authZCheck);
×
1344
    }
1345
  }
1346

1347
  /**
1348
   *  enables access control
1349
   */
1350
  enableAC() {
1351
    try {
1✔
1352
      this.cfg.set('authorization:enabled', true);
1✔
1353
      updateConfig(this.cfg);
1✔
1354
    } catch (err) {
1355
      this.logger.error('Error caught enabling authorization:', { err });
×
1356
      this.cfg.set('authorization:enabled', this.authZCheck);
×
1357
    }
1358
  }
1359

1360
  /**
1361
   *  restore AC state to previous vale either before enabling or disabling AC
1362
   */
1363
  restoreAC() {
1364
    try {
1✔
1365
      this.cfg.set('authorization:enabled', this.authZCheck);
1✔
1366
      updateConfig(this.cfg);
1✔
1367
    } catch (err) {
1368
      this.logger.error('Error caught enabling authorization:', { err });
×
1369
      this.cfg.set('authorization:enabled', this.authZCheck);
×
1370
    }
1371
  }
1372

1373
  /**
1374
   * reads meta data from DB and updates owners information in resource if action is UPDATE / DELETE
1375
   * @param resources list of resources
1376
   * @param action resource action
1377
   * @param subject subject name
1378
   */
1379
  async createMetadata(resources: any, action: string, subject): Promise<any> {
1380
    let orgOwnerAttributes: Attribute[] = [];
17✔
1381
    if (resources && !_.isArray(resources)) {
17!
1382
      resources = [resources];
×
1383
    }
1384
    const urns = this.cfg.get('authorization:urns');
17✔
1385
    if (subject?.scope && (action === AuthZAction.CREATE || action === AuthZAction.MODIFY)) {
17✔
1386
      // add subject scope as default owners
1387
      orgOwnerAttributes.push(
14✔
1388
        {
1389
          id: urns?.ownerIndicatoryEntity,
1390
          value: urns?.organization,
1391
          attributes: [{
1392
            id: urns?.ownerInstance,
1393
            value: subject?.scope,
1394
            attributes: []
1395
          }]
1396
        });
1397
    }
1398

1399
    if (resources?.length > 0) {
17!
1400
      for (let resource of resources) {
17✔
1401
        if (!resource.data) {
20✔
1402
          resource.data = { meta: {} };
3✔
1403
        } else if (!resource.data.meta) {
17✔
1404
          resource.data.meta = {};
12✔
1405
        }
1406
        if (resource?.id && (action === AuthZAction.MODIFY || action === AuthZAction.DELETE)) {
20✔
1407
          let result;
1408
          try {
8✔
1409
            result = await this.read(JobReadRequest.fromPartial({
8✔
1410
              filter: {
1411
                job_ids: [resource.id]
1412
              },
1413
              subject
1414
            }), {});
1415
          } catch (err) {
1416
            if (err.message.startsWith('Error! Jobs not found in any of the queues') && action != AuthZAction.DELETE) {
×
1417
              this.logger.debug('New job should be created', { jobId: resource.id });
×
1418
              result = { items: [] };
×
1419
            } else {
1420
              this.logger.error(`Error reading job with resource ID ${resource.id}`, { code: err.code, message: err.message, stack: err.stack });
×
1421
            }
1422
          }
1423
          // update owners info
1424
          if (result?.items?.length === 1 && result?.items[0]?.payload) {
8✔
1425
            let item = result.items[0].payload;
6✔
1426
            resource.data.meta.owners = item.data.meta.owners;
6✔
1427
            // adding meta to resource root (needed by access-contorl-srv for owners information check)
1428
            // meta is inside data of resource since the data is persisted in redis using bull
1429
            resource.meta = { owners: item.data.meta.owners };
6✔
1430
          } else if ((!result || !result.items || !result.items[0] || !result.items[0].payload) && action === AuthZAction.MODIFY) {
2!
1431
            // job does not exist - create new job (ex: Upsert with action modify)
1432
            let ownerAttributes = _.cloneDeep(orgOwnerAttributes);
2✔
1433
            // add user as default owners
1434
            ownerAttributes.push(
2✔
1435
              {
1436
                id: urns?.ownerIndicatoryEntity,
1437
                value: urns?.user,
1438
                attributes: [{
1439
                  id: urns?.ownerInstance,
1440
                  value: subject?.id,
1441
                  attributes: []
1442
                }]
1443
              });
1444
            resource.data.meta.owners = ownerAttributes;
2✔
1445
            resource.meta = { owners: ownerAttributes };
2✔
1446
          }
1447
        } else if ((action === AuthZAction.CREATE || !resource.id) && !resource.data.meta.owners) {
12!
1448
          let ownerAttributes = _.cloneDeep(orgOwnerAttributes);
10✔
1449
          // add user as default owners
1450
          if (resource.id) {
10✔
1451
            ownerAttributes.push(
2✔
1452
              {
1453
                id: urns.ownerIndicatoryEntity,
1454
                value: urns.user,
1455
                attributes: [{
1456
                  id: urns.ownerInstance,
1457
                  value: subject.id,
1458
                  attributes: []
1459
                }]
1460
              });
1461
          }
1462
          resource.data.meta.owners = ownerAttributes;
10✔
1463
          resource.meta = { owners: ownerAttributes };
10✔
1464
        } else if (action === AuthZAction.CREATE && resource?.data?.meta?.owners) {
2!
1465
          resource.meta = { owners: resource.data.meta.owners };
2✔
1466
        }
1467
      }
1468
    }
1469
    return resources;
17✔
1470
  }
1471
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc