• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

restorecommerce / scheduling-srv / 15237791286

25 May 2025 12:19PM UTC coverage: 75.575% (+0.07%) from 75.503%
15237791286

push

github

Arun-KumarH
fix: remove console log

217 of 317 branches covered (68.45%)

Branch coverage included in aggregate %.

1655 of 2160 relevant lines covered (76.62%)

9.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.33
/src/schedulingService.ts
1
import * as _ from 'lodash-es';
1✔
2
import { errors } from '@restorecommerce/chassis-srv';
1✔
3
import * as kafkaClient from '@restorecommerce/kafka-client';
1✔
4
import {
1✔
5
  AuthZAction,
1✔
6
  ACSAuthZ,
1✔
7
  updateConfig,
1✔
8
  DecisionResponse,
1✔
9
  Operation,
1✔
10
  PolicySetRQResponse,
1✔
11
  CtxResource,
1✔
12
  CustomQueryArgs
1✔
13
} from '@restorecommerce/acs-client';
1✔
14
import {
1✔
15
  JobServiceImplementation as SchedulingServiceServiceImplementation,
1✔
16
  JobFailed, JobDone, DeepPartial, JobList, JobListResponse,
1✔
17
  Backoff_Type, JobOptions_Priority, JobReadRequest, JobReadRequest_SortOrder,
1✔
18
  JobResponse, Job,
1✔
19
} from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/job.js';
1✔
20
import { createClient, RedisClientType } from 'redis';
1✔
21
import { NewJob, Priority } from './types.js';
1✔
22
import pkg from 'cron-parser';
1✔
23
import * as crypto from 'node:crypto';
1✔
24
import { _filterJobData, _filterJobOptions, _filterQueuedJob, checkAccessRequest, marshallProtobufAny } from './utilts.js';
1✔
25
import * as uuid from 'uuid';
1✔
26
import { Logger } from 'winston';
1✔
27
import { Response_Decision } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/access_control.js';
1✔
28
import { Attribute } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/attribute.js';
1✔
29
import {
1✔
30
  DeleteRequest,
1✔
31
  DeleteResponse,
1✔
32
  ResourceListResponse
1✔
33
} from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/resource_base.js';
1✔
34
import { Queue, QueueOptions, Job as BullJob } from 'bullmq';
1✔
35
import { Status } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/status.js';
1✔
36
import { Subject } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/auth.js';
1✔
37
import { Meta } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/meta.js';
1✔
38

1✔
39
const { CronExpressionParser } = pkg;
1✔
40
const JOB_DONE_EVENT = 'jobDone';
1✔
41
const JOB_FAILED_EVENT = 'jobFailed';
1✔
42
const DEFAULT_CLEANUP_COMPLETED_JOBS = 604800000; // 7 days in miliseconds
1✔
43
const COMPLETED_JOB_STATE = 'completed';
1✔
44
const FAILED_JOB_STATE = 'failed';
1✔
45
const QUEUE_CLEANUP = 'queueCleanup';
1✔
46

1✔
47
/**
1✔
48
 * A job scheduling service.
1✔
49
 */
1✔
50
export class SchedulingService implements SchedulingServiceServiceImplementation {
1✔
51

2✔
52
  queuesConfigList: any;
2✔
53
  queuesList: Queue[];
2✔
54
  defaultQueueName: string;
2✔
55
  resourceEventsEnabled: boolean;
2✔
56
  canceledJobs: Set<string>;
2✔
57
  authZCheck: boolean;
2✔
58
  repeatJobIdRedisClient: RedisClientType<any, any>;
2✔
59

2✔
60
  constructor(
2✔
61
    private readonly jobEvents: kafkaClient.Topic,
2✔
62
    private readonly redisConfig: any,
2✔
63
    private readonly logger: Logger,
2✔
64
    private readonly redisClient: RedisClientType<any, any>,
2✔
65
    private readonly bullOptions: any,
2✔
66
    private readonly cfg: any,
2✔
67
    private readonly authZ: ACSAuthZ
2✔
68
  ) {
2✔
69
    this.resourceEventsEnabled = true;
2✔
70
    this.queuesList = [];
2✔
71
    this.queuesConfigList = [];
2✔
72

2✔
73
    const repeatJobIdCfg = cfg.get('redis');
2✔
74
    repeatJobIdCfg.database = cfg.get('redis:db-indexes:db-repeatJobId');
2✔
75
    this.repeatJobIdRedisClient = createClient(repeatJobIdCfg);
2✔
76
    this.repeatJobIdRedisClient.on(
2✔
77
      'error',
2✔
78
      (err) => logger.error('Redis client error in repeatable job store', { err }));
2✔
79
    this.repeatJobIdRedisClient.connect().then((data) => {
2✔
80
      logger.info('Redis client connection for repeatable job store successful');
2✔
81
    }).catch(err => logger.error('Redis client error for repeatable job store', { err }));
2✔
82

2✔
83
    this.canceledJobs = new Set<string>();
2✔
84
    this.authZ = authZ;
2✔
85
    this.authZCheck = this.cfg.get('authorization:enabled');
2✔
86

2✔
87
    // Read Queue Configuration file and find first queue which has "default": true,
2✔
88
    // then save it to defaultQueueName
2✔
89
    const queuesCfg = this.cfg.get('queue');
2✔
90
    if (_.isEmpty(queuesCfg)) {
2!
91
      this.logger.error('Queue configuration not found!');
×
92
      throw new Error('Queue configuration not found!');
×
93
    }
×
94
    let defaultTrueExists = false;
2✔
95
    for (const queueCfg of queuesCfg) {
2✔
96
      // Find configuration which has default=true
2✔
97
      if (queueCfg.default == true) {
2✔
98
        defaultTrueExists = true;
2✔
99
        this.defaultQueueName = queueCfg.name;
2✔
100
        break;
2✔
101
      }
2✔
102
    }
2✔
103
    if (!defaultTrueExists) {
2!
104
      this.logger.error('Queue default configuration not found!');
×
105
      throw new Error('Queue default configuration not found!');
×
106
    }
×
107

2✔
108
    // Create Queues
2✔
109
    for (const queueCfg of queuesCfg) {
2✔
110
      const prefix = queueCfg.name;
6✔
111
      const rateLimiting = queueCfg.rateLimiting;
6✔
112
      const advancedSettings = queueCfg.advancedSettings;
6✔
113

6✔
114
      const queueOptions: QueueOptions = {
6✔
115
        connection: {
6✔
116
          ...redisConfig,
6✔
117
        }
6✔
118
      };
6✔
119

6✔
120
      // Create Queue Configuration - Add Rate Limiting if enabled
6✔
121
      if (!_.isEmpty(rateLimiting) && rateLimiting.enabled == true) {
6!
122
        this.logger.info(`Queue: ${queueCfg.name} - Rate limiting is ENABLED.`);
×
123
      }
×
124

6✔
125
      if (!_.isEmpty(advancedSettings)) {
6✔
126
        queueOptions.settings = {
6✔
127
          ...advancedSettings,
6✔
128
        };
6✔
129
      }
6✔
130

6✔
131
      const redisURL = new URL((queueOptions.connection as any).url);
6✔
132

6✔
133
      if ('keyPrefix' in queueOptions.connection) {
6!
134
        delete queueOptions.connection.keyPrefix;
×
135
      }
×
136

6✔
137
      const queue = new Queue(prefix, {
6✔
138
        ...queueOptions,
6✔
139
        connection: {
6✔
140
          ...queueOptions.connection as any,
6✔
141
          host: redisURL.hostname,
6✔
142
          port: _.parseInt(redisURL.port)
6✔
143
        }
6✔
144
      });
6✔
145
      this.queuesList.push(queue);
6✔
146

6✔
147
      // Add Queue Configurations
6✔
148
      const queueCfgObj = {
6✔
149
        name: queueCfg.name,
6✔
150
        concurrency: queueCfg.concurrency,
6✔
151
        default: queueCfg.default,
6✔
152
        runMissedScheduled: queueCfg.runMissedScheduled
6✔
153
      };
6✔
154
      this.queuesConfigList.push(queueCfgObj);
6✔
155
    }
6✔
156
  }
2✔
157

2✔
158
  private catchOperationStatus(error: any, message?: string): ResourceListResponse {
2✔
159
    this.logger?.error(message ?? error?.message, error);
×
160
    return {
×
161
      total_count: 0,
×
162
      operation_status: {
×
163
        code: Number.isInteger(error?.code) ? error.code : 500,
×
164
        message: error?.message ?? error?.msg ?? error?.details ?? message,
×
165
      },
×
166
    };
×
167
  }
×
168

2✔
169
  private catchStatus(id: string, error: any, message?: string): Status {
2✔
170
    this.logger?.error(message ?? error?.message, error);
×
171
    return {
×
172
      id,
×
173
      code: Number.isInteger(error?.code) ? error.code : 500,
×
174
      message: error?.message ?? error?.msg ?? error?.details ?? message,
×
175
    };
×
176
  }
×
177

2✔
178
  /**
2✔
179
   * Start handling the job queue, job scheduling and
2✔
180
   * managing job events.
2✔
181
   */
2✔
182
  async start(): Promise<any> {
2✔
183
    const logger = this.logger;
2✔
184
    const events = [JOB_DONE_EVENT, JOB_FAILED_EVENT];
2✔
185
    for (const eventName of events) {
2✔
186
      // A Scheduling Service Event Listener
4✔
187
      await this.jobEvents.on(eventName, async (msg: any, ctx: any,
4✔
188
        config: any, eventName: string): Promise<any> => {
10✔
189
        const job = msg;
10✔
190
        // Match Job Type to Queue Name, else use Default Queue
10✔
191
        let queue = _.find(this.queuesList, { name: job.type });
10✔
192
        const defaultQueue = _.find(this.queuesList, { name: this.defaultQueueName });
10✔
193
        if (_.isEmpty(queue)) {
10!
194
          queue = defaultQueue;
×
195
        }
×
196

10✔
197
        if (eventName === JOB_FAILED_EVENT) {
10!
198
          logger.error(`job@${job.type}#${job.id} failed with error #${job.error}`,
×
199
            _filterQueuedJob<JobFailed>(job, this.logger));
×
200
        } else if (eventName === JOB_DONE_EVENT) {
10✔
201
          logger.verbose(`job#${job.id} done`, _filterQueuedJob<JobDone>(job, this.logger));
10✔
202
        }
10✔
203

10✔
204
        logger.info('Received Job event', { event: eventName });
10✔
205
        logger.info('Job details', job);
10✔
206
        const jobData: any = await queue.getJob(job.id).catch(error => {
10✔
207
          logger.error('Error retrieving job ${job.id} from queue', error);
×
208
        });
10✔
209

10✔
210
        if (job?.delete_scheduled) {
10!
211
          await queue.removeRepeatable(jobData.name, jobData.opts.repeat);
×
212
        }
×
213
      });
4✔
214
    }
4✔
215

2✔
216
    // Initialize Event Listeners for each Queue
2✔
217
    for (const queue of this.queuesList) {
2✔
218
      queue.on('error', (error) => {
6✔
219
        logger.error('queue error', error);
×
220
      });
6✔
221
      queue.on('waiting', (job) => {
6✔
222
        logger.verbose(`job#${job.id} scheduled`, job);
15✔
223
      });
6✔
224
      queue.on('removed', (job) => {
6✔
225
        logger.verbose(`job#${job.id} removed`, job);
21✔
226
      });
6✔
227
      queue.on('progress', (job) => {
6✔
228
        logger.verbose(`job#${job.id} progress`, job);
×
229
      });
6✔
230
    }
6✔
231

2✔
232
    // If the scheduling service goes down and if there were
2✔
233
    // recurring jobs which have missed schedules then
2✔
234
    // we will need to reschedule it for those missing intervals.
2✔
235
    await this._rescheduleMissedJobs();
2✔
236
  }
2✔
237

2✔
238
  /**
2✔
239
   * To reschedule the missed recurring jobs upon service restart
2✔
240
   */
2✔
241
  async _rescheduleMissedJobs(): Promise<void> {
2✔
242
    // for jobs created via Kafka currently there are no acs checks
2✔
243
    this.disableAC();
2✔
244
    const createDispatch = [];
2✔
245
    let result: BullJob[] = [];
2✔
246
    const logger = this.logger;
2✔
247
    const create = this.create;
2✔
248
    // Get the jobs
2✔
249
    for (const queueCfg of this.queuesConfigList) {
2✔
250
      // If enabled in the config, or the config is missing,b
6✔
251
      // Reschedule the missed jobs, else skip.
6✔
252
      const queue = _.find(this.queuesList, { name: queueCfg.name });
6✔
253
      const runMissedScheduled = queueCfg.runMissedScheduled;
6✔
254
      if (_.isNil(runMissedScheduled) ||
6✔
255
        (!_.isNil(runMissedScheduled) && runMissedScheduled == true)) {
6✔
256
        await queue.getJobs(['active', 'delayed', 'repeat']).then(jobs => {
6✔
257
          result = result.concat(jobs);
6✔
258
        }).catch(error => {
6✔
259
          logger.error('Error reading jobs to reschedule the missed recurring jobs', error);
×
260
        });
6✔
261
      }
6✔
262
    }
6✔
263
    let lastRunTime;
2✔
264
    for (const job of result) {
2!
265
      // get the last run time for the job, we store the last run time only
×
266
      // for recurring jobs
×
267
      if (job?.name) {
×
268
        try {
×
269
          lastRunTime = await this.redisClient.get(job.name);
×
270
        } catch (error) {
×
271
          this.logger.error('Error reading the last run time for job type:', { name: job.name, error });
×
272
        }
×
273
      }
×
274
      // we store lastRunTime only for recurring jobs and if it exists check
×
275
      // cron interval and schedule immediate jobs for missed intervals
×
276
      this.logger.info(`Last run time of ${job?.name} Job was:`, lastRunTime);
×
277
      if (lastRunTime) {
×
278
        // convert redis string value to object and get actual time value
×
279
        try {
×
280
          lastRunTime = JSON.parse(lastRunTime);
×
281
        } catch (error) {
×
282
          this.logger.error('Error parsing lastRunTime', { error, lastRunTime });
×
283
        }
×
284

×
285
        if ((job?.opts?.repeat as any)?.pattern && lastRunTime?.time) {
×
286
          const options = {
×
287
            currentDate: new Date(lastRunTime.time),
×
288
            endDate: new Date(),
×
289
            iterator: true
×
290
          };
×
291
          let intervalTime;
×
292
          try {
×
293
            intervalTime =
×
294
            CronExpressionParser.parse((job.opts.repeat as any).pattern, options);
×
295
          } catch (error) {
×
296
            this.logger.error('Error parsing cron expression running missed schedules', { error });
×
297
          }
×
298
          while (intervalTime?.hasNext()) {
×
299
            const nextInterval: any = intervalTime.next();
×
300
            const nextIntervalTime = nextInterval.value.toString();
×
301
            // schedule it as one time job for now or immediately
×
302
            const data = {
×
303
              payload: marshallProtobufAny({
×
304
                value: { time: nextIntervalTime }
×
305
              })
×
306
            };
×
307
            const currentTime = new Date();
×
308
            const when = new Date(currentTime.setSeconds(currentTime.getSeconds() + 2)).toISOString();
×
309
            const immediateJob: any = {
×
310
              type: job.name,
×
311
              data,
×
312
              // give a delay of 2 sec between each job
×
313
              // to avoid time out of queued jobs
×
314
              when,
×
315
              options: {}
×
316
            };
×
317
            createDispatch.push(create({
×
318
              items: [immediateJob],
×
319
              total_count: 0,
×
320
            }, {}));
×
321
          }
×
322
        }
×
323
      }
×
324
    }
×
325
    this.restoreAC();
2✔
326
    await createDispatch;
2✔
327
  }
2✔
328

2✔
329
  /**
2✔
330
   * Disabling of CRUD events.
2✔
331
   */
2✔
332
  disableEvents(): void {
2✔
333
    this.resourceEventsEnabled = false;
×
334
  }
×
335

2✔
336
  /**
2✔
337
   * Enabling of CRUD events.
2✔
338
   */
2✔
339
  enableEvents(): any {
2✔
340
    this.resourceEventsEnabled = true;
×
341
  }
×
342

2✔
343
  _validateJob(job: NewJob): NewJob {
2✔
344
    if (_.isNil(job.type)) {
18!
345
      throw new errors.InvalidArgument('Job type not specified.');
×
346
    }
×
347

18✔
348
    if (!job.options) {
18!
349
      job.options = {};
×
350
    }
×
351

18✔
352
    if (job.when) {
18✔
353
      // If the jobSchedule time has already lapsed then do not schedule the job
13✔
354
      const jobScheduleTime = new Date(job.when).getTime();
13✔
355
      const currentTime = new Date().getTime();
13✔
356
      if (jobScheduleTime < currentTime) {
13!
357
        throw new errors.InvalidArgument('Cannot schedule a job for an elapsed time');
×
358
      }
×
359

13✔
360
      job.options.delay = jobScheduleTime - currentTime;
13✔
361
    }
13✔
362

18✔
363
    if (job.options.backoff && typeof job.options.backoff !== 'number') {
18✔
364
      if (typeof job.options.backoff.type === 'number') {
18!
365
        job.options.backoff.type = Object.keys(Backoff_Type)[job.options.backoff.type];
×
366
      }
×
367
      job.options.backoff.type = job.options.backoff.type.toLowerCase();
18✔
368
    }
18✔
369

18✔
370
    if (job.options.priority && typeof job.options.priority === 'string') {
18✔
371
      job.options.priority = JobOptions_Priority[job.options.priority] as any;
18✔
372
    }
18✔
373

18✔
374
    if (_.isEmpty(job.data)) {
18!
375
      throw new errors.InvalidArgument('No job data specified.');
×
376
    }
×
377

18✔
378
    job.data = _filterJobData(job.data, false, this.logger);
18✔
379

18✔
380
    return job;
18✔
381
  }
18✔
382

2✔
383
  /**
2✔
384
   * get next job execution time in mili seconds
2✔
385
   * @param millis
2✔
386
   * @param opts
2✔
387
   */
2✔
388
  getNextMillis(millis: number, opts: any) {
2✔
389
    if (opts?.every) {
1!
390
      return Math.floor(millis / opts.every) * opts.every + opts.every;
×
391
    }
×
392

1✔
393
    const currentDate =
1✔
394
      opts?.startDate && new Date(opts.startDate) > new Date(millis)
1!
395
        ? new Date(opts.startDate)
1!
396
        : new Date(millis);
1✔
397
    const interval = CronExpressionParser.parse(
1✔
398
      opts.cron,
1✔
399
      _.defaults(
1✔
400
        {
1✔
401
          currentDate
1✔
402
        },
1✔
403
        opts
1✔
404
      )
1✔
405
    );
1✔
406

1✔
407
    try {
1✔
408
      return interval.next().getTime();
1✔
409
    } catch (e) {
1!
410
      this.logger.error('Error getting next job execution time');
×
411
    }
×
412
  }
1✔
413

2✔
414
  private md5(str: string) {
2✔
415
    return crypto
×
416
      .createHash('md5')
×
417
      .update(str)
×
418
      .digest('hex');
×
419
  }
×
420

2✔
421
  /**
2✔
422
   * store the mapping from repeateKey to external interface SCS job Id, and
2✔
423
   * also the mapping other way around i.e. from SCS job Id to repeatKey (needed for read operations)
2✔
424
   * @param name - job name
2✔
425
   * @param repeat - job repeate options
2✔
426
   * @param jobId - job id
2✔
427
   */
2✔
428
  async storeRepeatKey(repeatId: string, scsJobId: string, options: any) {
2✔
429
    try {
18✔
430
      if (repeatId && scsJobId) {
18✔
431
        this.logger.info('Repeat key mapped to external SCS JobId', { repeatId, scsJobId });
18✔
432
        await this.repeatJobIdRedisClient.set(repeatId, scsJobId);
18✔
433
        const jobIdData = { repeatId, options };
18✔
434
        await this.repeatJobIdRedisClient.set(scsJobId, JSON.stringify(jobIdData));
18✔
435
      }
18✔
436
    } catch (error: any) {
18!
437
      this.logger.error('Error storing repeatKey to redis', {
×
438
        code: error.code,
×
439
        message: error.message,
×
440
        stack: error.stack
×
441
      });
×
442
    }
×
443
  }
18✔
444

2✔
445
  private idGen(): string {
2✔
446
    return uuid.v4().replace(/-/g, '');
×
447
  }
×
448

2✔
449
  /**
2✔
450
   * Create and queue jobs.
2✔
451
   * @param {any} call RPC call argument
2✔
452
   * @param {any} ctx RPC context
2✔
453
   */
2✔
454
  async create(request: JobList, ctx: any): Promise<DeepPartial<JobListResponse>> {
2✔
455
    const jobListResponse: JobListResponse = { items: [], operation_status: { code: 0, message: '' }, total_count: 0 };
13✔
456
    const subject = request.subject;
13✔
457
    if (_.isNil(request) || _.isNil(request.items)) {
13!
458
      return {
×
459
        items: [],
×
460
        total_count: 0,
×
461
        operation_status: {
×
462
          code: 400,
×
463
          message: 'Missing items in create request'
×
464
        }
×
465
      };
×
466
    }
×
467

13✔
468
    await this.createMetadata(request.items, AuthZAction.CREATE, subject);
13✔
469
    let acsResponse: DecisionResponse;
13✔
470
    try {
13✔
471
      if (!ctx) { ctx = {}; };
13!
472
      ctx.subject = subject;
13✔
473
      ctx.resources = request?.items?.map((job) => {
13✔
474
        const { data, ...resource } = job;
19✔
475
        return resource;
19✔
476
      });
13✔
477
      acsResponse = await checkAccessRequest(ctx, [{
13✔
478
        resource: 'job',
13✔
479
        id: request.items.map(item => item.id)
13✔
480
      }], AuthZAction.CREATE, Operation.isAllowed);
13✔
481
    } catch (err: any) {
13!
482
      return this.catchOperationStatus(err, 'Error requesting access-control-srv for create meta data');
×
483
    }
×
484
    if (acsResponse.decision != Response_Decision.PERMIT) {
13✔
485
      return { items: [], total_count: 0, operation_status: acsResponse.operation_status };
1✔
486
    }
1✔
487

12✔
488
    const jobs: NewJob[] = [];
12✔
489
    for (const job of request?.items || []) {
13!
490
      try {
18✔
491
        jobs.push(this._validateJob(job as any));
18✔
492
      } catch (err: any) {
18!
493
        this.logger.error('Error validating job', job);
×
494
        jobListResponse.items.push({
×
495
          status: {
×
496
            id: job.id,
×
497
            code: 400,
×
498
            message: err.message
×
499
          }
×
500
        });
×
501
      }
×
502
    }
18✔
503

12✔
504
    const result: BullJob[] = [];
12✔
505
    // Scheduling jobs
12✔
506
    for (let i = 0; i < jobs.length; i += 1) {
13✔
507
      const job = jobs[i];
18✔
508
      // if not jobID is specified generate a UUID
18✔
509
      if (!job.id) {
18!
510
        job.id = this.idGen();
×
511
      }
×
512

18✔
513
      // map the id to jobId as needed in JobOpts for bull
18✔
514
      if (job?.id) {
18✔
515
        // check if jobID already exists then map it as already exists error
18✔
516
        const existingJobId = await this.getRedisValue(job.id);
18✔
517
        if (existingJobId) {
18!
518
          // read job to check if data exists
×
519
          const jobData = await this.read(JobReadRequest.fromPartial({
×
520
            filter: {
×
521
              job_ids: [job.id]
×
522
            },
×
523
            subject
×
524
          }), ctx);
×
525
          if ((jobData?.items as any)[0]?.payload) {
×
526
            jobListResponse.items.push({
×
527
              status: {
×
528
                id: job.id,
×
529
                code: 403,
×
530
                message: `Job with ID ${job.id} already exists`
×
531
              }
×
532
            });
×
533
            continue;
×
534
          }
×
535
        }
×
536
        if (!job?.options) {
18!
537
          job.options = { jobId: job.id };
×
538
        } else {
18✔
539
          job.options.jobId = job.id;
18✔
540
        }
18✔
541
        if (job?.options?.repeat) {
18✔
542
          (job as any).options.repeat.jobId = job.id;
3✔
543
        }
3✔
544
      }
18✔
545

18✔
546
      if (!job.data.meta) {
18!
547
        const now = new Date();
×
548
        const metaObj: Meta = {
×
549
          created: now,
×
550
          modified: now,
×
551
          modified_by: '',
×
552
          owners: []
×
553
        };
×
554
        Object.assign(job.data, { meta: metaObj });
×
555
      }
×
556
      // if only owners are specified in meta
18✔
557
      if (job.data.meta && (!job.data.meta.created || !job.data.meta.modified)) {
18✔
558
        job.data.meta.created = new Date();
15✔
559
        job.data.meta.modified = new Date();
15✔
560
      }
15✔
561
      if (job?.data?.meta) {
18✔
562
        job.data.meta.created_by = subject?.id;
18✔
563
        job.data.meta.modified_by = subject?.id;
18✔
564
      }
18✔
565

18✔
566
      if (job?.data?.payload?.value) {
18✔
567
        job.data.payload.value = job.data.payload.value.toString() as any;
18✔
568
      }
18✔
569

18✔
570
      // convert enum priority back to number as it's expected by bull
18✔
571
      if (job?.options?.priority) {
18✔
572
        job.options.priority = typeof job.options.priority === 'number' ? job.options.priority : Priority[job.options.priority] as unknown as number;
18!
573
      }
18✔
574

18✔
575
      // if its a repeat job and tz is empty delete the key (else cron parser throws an error)
18✔
576
      if (job?.options?.repeat?.tz === '') {
18!
577
        delete job.options.repeat.tz;
×
578
      }
×
579

18✔
580
      const bullOptions = {
18✔
581
        ...job.options
18✔
582
      };
18✔
583

18✔
584
      if ((bullOptions as any).timeout === 1) {
18✔
585
        delete (bullOptions as any).timeout;
2✔
586
      }
2✔
587

18✔
588
      // Match the Job Type with the Queue Name and add the Job to this Queue.
18✔
589
      // If there is no match, add the Job to the Default Queue
18✔
590
      let queue = _.find(this.queuesList, { name: job?.queue_name });
18✔
591
      if (_.isEmpty(queue)) {
18!
592
        queue = _.find(this.queuesList, { name: this.defaultQueueName });
×
593
      }
×
594
      const submittedJob = await queue.add(job.type, job.data, bullOptions);
18✔
595
      if (submittedJob?.id?.startsWith('repeat:')) {
18✔
596
        const repeatJobId = submittedJob?.id?.split(':')[1];
3✔
597
        await this.storeRepeatKey(repeatJobId, job.id, job.options);
3✔
598
      } else if (submittedJob?.id) {
18✔
599
        // future job with when
15✔
600
        await this.storeRepeatKey(submittedJob.id, job.id, job.options);
15✔
601
      }
15✔
602
      this.logger.verbose(`job@${job.type} created`, job);
18✔
603
      result.push(submittedJob);
18✔
604
    }
18✔
605

12✔
606
    for (const job of result) {
13✔
607
      const jobId = job.id as string;
18✔
608
      if (jobId.startsWith('repeat:')) {
18✔
609
        const repeatKey = jobId.split(':')[1];
3✔
610
        job.id = await this.getRedisValue(repeatKey);
3✔
611
      }
3✔
612
    }
18✔
613

12✔
614
    for (const job of result) {
13✔
615
      const when = job?.opts?.delay ? new Date(job?.opts?.delay).toString() : '';
18✔
616
      jobListResponse.items.push({
18✔
617
        payload: {
18✔
618
          id: job.id as string,
18✔
619
          type: job.name,
18✔
620
          queue_name: job?.queueName,
18✔
621
          data: _filterJobData(job.data, true, this.logger),
18✔
622
          options: _filterJobOptions(job.opts) as any,
18✔
623
          when
18✔
624
        },
18✔
625
        status: {
18✔
626
          id: (job?.id)?.toString(),
18✔
627
          code: 200,
18✔
628
          message: 'success'
18✔
629
        }
18✔
630
      });
18✔
631
    }
18✔
632
    const jobList = {
12✔
633
      items: result.map(job => ({
12✔
634
        id: job.id,
18✔
635
        type: job.name,
18✔
636
        data: _filterJobData(job.data, true, this.logger),
18✔
637
        options: _filterJobOptions(job.opts)
18✔
638
      })),
12✔
639
      total_count: result.length
12✔
640
    };
12✔
641

12✔
642
    if (this.resourceEventsEnabled) {
12✔
643
      await this.jobEvents.emit('jobsCreated', jobList);
12✔
644
    }
12✔
645

12✔
646
    jobListResponse.operation_status = { code: 200, message: 'success' };
12✔
647
    return jobListResponse;
12✔
648
  }
12✔
649

2✔
650
  private filterByOwnerShip(customArgsObj: CustomQueryArgs, result: BullJob[]): BullJob[] {
2✔
651
    // applying filter based on custom arguments (filterByOwnerShip)
39✔
652
    const filteredResult = new Array<BullJob>();
39✔
653
    const customArgs = customArgsObj?.custom_arguments;
39✔
654
    if (customArgs?.value) {
39✔
655
      let customArgsFilter;
13✔
656
      try {
13✔
657
        customArgsFilter = JSON.parse(customArgs.value.toString());
13✔
658
      } catch (error: any) {
13!
659
        this.logger.error('Error parsing custom query arguments', {
×
660
          code: error.code,
×
661
          message: error.message, stack: error.stack
×
662
        });
×
663
      }
×
664
      if (customArgsFilter?.length === 0) {
13!
665
        return [];
×
666
      }
×
667
      if (!Array.isArray(customArgsFilter)) {
13!
668
        customArgsFilter = [customArgsFilter];
×
669
      }
×
670
      for (const customArgObj of customArgsFilter) {
13✔
671
        const ownerIndicatorEntity = customArgObj?.entity;
13✔
672
        const ownerValues = customArgObj?.instance;
13✔
673
        const ownerIndictaorEntURN = this.cfg.get('authorization:urns:ownerIndicatoryEntity');
13✔
674
        const ownerInstanceURN = this.cfg.get('authorization:urns:ownerInstance');
13✔
675
        const filteredResp = result.filter(job => {
13✔
676
          if (job?.data?.meta?.owners?.length > 0) {
27✔
677
            for (const owner of job.data.meta.owners) {
21✔
678
              if (owner?.id === ownerIndictaorEntURN && owner?.value === ownerIndicatorEntity && owner?.attributes?.length > 0) {
21✔
679
                for (const ownerInstObj of owner.attributes) {
21✔
680
                  if (ownerInstObj?.id === ownerInstanceURN && ownerInstObj?.value && ownerValues.includes(ownerInstObj.value)) {
21✔
681
                    return job;
21✔
682
                  }
21✔
683
                }
21✔
684
              }
×
685
            }
21✔
686
          }
×
687
        }) as BullJob[];
13✔
688
        filteredResult.push(...filteredResp);
13✔
689
      }
13✔
690
      return filteredResult;
13✔
691
    } else {
39✔
692
      // no custom filters exist, return complete result set
26✔
693
      return result;
26✔
694
    }
26✔
695
  }
39✔
696

2✔
697
  async deleteRedisKey(key: string): Promise<any> {
2✔
698
    try {
5✔
699
      await this.repeatJobIdRedisClient.del(key);
5✔
700
      this.logger.debug('Redis Key deleted successfully used for mapping repeatable jobID', { key });
5✔
701
    } catch (err: any) {
5!
702
      this.logger.error('Error deleting redis key', { key, msg: err.message, stack: err.stack });
×
703
    }
×
704
  }
5✔
705

2✔
706
  async getRedisValue(key: string): Promise<any> {
2✔
707
    let redisValue;
72✔
708
    try {
72✔
709
      if (key) {
72✔
710
        redisValue = await this.repeatJobIdRedisClient.get(key);
72✔
711
      }
72✔
712
      if (redisValue) {
72✔
713
        return JSON.parse(redisValue);
50✔
714
      } else {
72✔
715
        return;
22✔
716
      }
22✔
717
    } catch (err: any) {
72✔
718
      if (err.message?.startsWith('Unexpected token') || err.message?.startsWith('Unexpected number') || err.message?.startsWith('Unexpected non-whitespace character')) {
24✔
719
        return redisValue;
24✔
720
      } else {
24!
721
        this.logger.error('Error reading redis key', { key, msg: err.message, stack: err.stack });
×
722
      }
×
723
    }
24✔
724
  }
72✔
725

2✔
726

2✔
727
  /**
2✔
728
   * Retrieve jobs.
2✔
729
   * @param {any} call RPC call argument
2✔
730
   * @param {any} ctx RPC context
2✔
731
   */
2✔
732
  async read(request: JobReadRequest, ctx: any): Promise<DeepPartial<JobListResponse>> {
2✔
733
    const jobListResponse: JobListResponse = { items: [], operation_status: { code: 0, message: '' }, total_count: 0 };
42✔
734
    const subject = request.subject;
42✔
735
    let acsResponse: PolicySetRQResponse;
42✔
736
    try {
42✔
737
      if (!ctx) { ctx = {}; };
42!
738
      ctx.subject = subject;
42✔
739
      ctx.resources = [];
42✔
740
      acsResponse = await checkAccessRequest(ctx, [{ resource: 'job' }], AuthZAction.READ,
42✔
741
        Operation.whatIsAllowed) as PolicySetRQResponse;
42✔
742
    } catch (err: any) {
42!
743
      return this.catchOperationStatus(err, 'Error requesting access-control-srv for read operation');
×
744
    }
×
745

42✔
746
    if (acsResponse.decision !== Response_Decision.PERMIT) {
42✔
747
      return { operation_status: acsResponse.operation_status };
3✔
748
    }
3✔
749

39✔
750
    let result: Array<BullJob>;
39✔
751
    if (_.isEmpty(request) || _.isEmpty(request.filter)
39✔
752
      && (!request.filter || !request.filter.job_ids
39!
753
        || _.isEmpty(request.filter.job_ids))
24!
754
      && (!request.filter || !request.filter.type ||
39!
755
        _.isEmpty(request.filter.type))) {
42✔
756
      result = await this._getJobList();
24✔
757
      const custom_arguments = acsResponse.custom_query_args?.[0];
24✔
758
      result = this.filterByOwnerShip(custom_arguments, result);
24✔
759
    } else {
42✔
760
      result = new Array<BullJob>();
15✔
761
      const jobIDs = (request?.filter?.job_ids?.length && Array.isArray(request.filter.job_ids)) ?
15✔
762
        request.filter.job_ids : request?.filter?.job_ids ? [request.filter.job_ids] : [];
15!
763
      const typeFilterName = request.filter.type;
15✔
764

15✔
765
      // Search in all the queues and retrieve jobs after JobID
15✔
766
      // and add them to the jobIDsCopy list.
15✔
767
      // If the job is not found in any of the queues, continue looking
15✔
768
      // Finally compare the two lists and add an error to status for which
15✔
769
      // job could not be found in any of the queues.
15✔
770
      if (jobIDs?.length > 0) {
15✔
771
        // jobIDsCopy should contain the jobIDs duplicate values
14✔
772
        // after the for loop ends
14✔
773
        const jobIDsCopy: string[] = [];
14✔
774
        for (let jobID of jobIDs || []) {
14!
775
          const jobIdData = await this.getRedisValue(jobID as string);
14✔
776
          // future jobs scheduled with `when` will have same repeatId as external SCS jobID
14✔
777
          if (jobIdData?.repeatId && (jobIdData.repeatId != jobID)) {
14✔
778
            const repeatId = jobIdData.repeatId;
1✔
779
            if (jobIdData?.options?.repeat?.cron && jobIdData?.options?.repeat?.every) {
1!
780
              jobListResponse.items.push({
×
781
                status: {
×
782
                  id: jobID.toString(),
×
783
                  code: 400,
×
784
                  message: 'Both .cron and .every options are defined for this repeatable job'
×
785
                }
×
786
              });
×
787
              continue;
×
788
            }
×
789
            const nextMillis = this.getNextMillis(Date.now(), jobIdData.options.repeat);
1✔
790
            this.logger.debug('Repeatable job identifier', { id: jobID, repeatId: `repeat:${repeatId}:${nextMillis}` });
1✔
791
            // map the repeatKey with nextmilis for bull repeatable jobID
1✔
792
            jobID = `repeat:${repeatId}:${nextMillis}`;
1✔
793
          }
1✔
794
          for (const queue of this.queuesList) {
14✔
795
            await new Promise((resolve, reject) => {
42✔
796
              // getJob returns job or null
42✔
797
              queue.getJob(jobID).then(job => {
42✔
798
                if (job?.opts?.repeat?.pattern) {
42✔
799
                  (job.opts.repeat as any).cron = job.opts.repeat.pattern;
1✔
800
                  delete job.opts.repeat.pattern;
1✔
801
                }
1✔
802
                resolve(job);
42✔
803
                if (!_.isEmpty(job)) {
42✔
804
                  result.push(job);
13✔
805
                  if ((job as any)?.opts?.repeat?.jobId) {
13✔
806
                    jobIDsCopy.push((job as any).opts.repeat.jobId);
1✔
807
                  } else {
13✔
808
                    jobIDsCopy.push(jobID);
12✔
809
                  }
12✔
810
                }
13✔
811
              }).catch(err => {
42✔
812
                jobListResponse.items.push({
×
813
                  status: this.catchStatus(jobID.toString(), err, `Error reading job ${jobID}`)
×
814
                });
×
815
              });
42✔
816
            });
42✔
817
          }
42✔
818
        }
14✔
819
        if (!_.isEqual(jobIDs.sort(), jobIDsCopy.sort())) {
14✔
820
          const jobIDsDiff = _.difference(jobIDs, jobIDsCopy);
1✔
821
          for (const jobId of jobIDsDiff) {
1✔
822
            jobListResponse.items.push({
1✔
823
              status: {
1✔
824
                id: jobId.toString(),
1✔
825
                code: 404,
1✔
826
                message: `Job ID ${jobId} not found in any of the queues`
1✔
827
              }
1✔
828
            });
1✔
829
          }
1✔
830
        }
1✔
831
      } else {
15✔
832
        try {
1✔
833
          let jobsList: any[] = [];
1✔
834
          for (const queue of this.queuesList) {
1✔
835
            const getJobsResult = await queue.getJobs(['active', 'delayed', 'repeat']);
3✔
836
            getJobsResult.forEach((job) => {
3✔
837
              if (job?.opts?.repeat?.pattern) {
6!
838
                (job.opts.repeat as any).cron = job.opts.repeat.pattern;
×
839
                delete job.opts.repeat.pattern;
×
840
              }
×
841
            });
3✔
842
            jobsList = jobsList.concat(getJobsResult);
3✔
843
          }
3✔
844
          result = jobsList;
1✔
845
        } catch (err: any) {
1!
846
          return this.catchOperationStatus(err, 'Error reading jobs');
×
847
        }
×
848
      }
1✔
849

15✔
850
      if (typeFilterName) {
15✔
851
        result = result.filter(job => job?.name === typeFilterName);
2✔
852
      }
2✔
853
      const custom_arguments = acsResponse.custom_query_args?.[0]?.custom_arguments;
15✔
854
      result = this.filterByOwnerShip(custom_arguments, result);
15✔
855
    }
15✔
856

39✔
857
    result = result.filter(valid => !!valid);
39✔
858

39✔
859
    if (!_.isEmpty(request) && !_.isEmpty(request.sort)
39✔
860
      && _.includes(['ASCENDING', 'DESCENDING'], request.sort)) {
42✔
861
      let sort: boolean | "desc" | "asc";
2✔
862
      switch (request.sort) {
2✔
863
        case JobReadRequest_SortOrder.DESCENDING:
2✔
864
          sort = 'desc';
1✔
865
          break;
1✔
866
        case JobReadRequest_SortOrder.ASCENDING:
2✔
867
          sort = 'asc';
1✔
868
          break;
1✔
869
        default:
2!
870
          this.logger.error(`Unknown sort option ${request.sort}`);
×
871
      }
2✔
872
      result = _.orderBy(result, ['id'], [sort]);
2✔
873
    }
2✔
874

39✔
875
    for (const job of result) {
42✔
876
      const jobId = job.id as string;
62✔
877
      if (jobId.startsWith('repeat:')) {
62✔
878
        const repeatKey = jobId.split(':')[1];
21✔
879
        // it could be possible the redis repeat key is deleted on index 8 and old completed
21✔
880
        // jobs exist in data_store if delete on complete was not set to true for repeatable jobs
21✔
881
        const jobRedisId = await this.getRedisValue(repeatKey);
21✔
882
        if (jobRedisId) {
21✔
883
          job.id = jobRedisId;
21✔
884
        }
21✔
885
      }
21✔
886
    }
62✔
887

39✔
888
    for (const job of result) {
42✔
889
      const when = job?.opts?.delay ? new Date(job?.opts?.delay).toString() : '';
62!
890
      jobListResponse.items.push({
62✔
891
        payload: {
62✔
892
          id: job.id as string,
62✔
893
          type: job.name,
62✔
894
          queue_name: job.queueName,
62✔
895
          data: _filterJobData(job.data, true, this.logger),
62✔
896
          options: _filterJobOptions(job.opts) as any,
62✔
897
          when
62✔
898
        },
62✔
899
        status: {
62✔
900
          id: job.id.toString(),
62✔
901
          code: 200,
62✔
902
          message: 'success'
62✔
903
        }
62✔
904
      });
62✔
905
    }
62✔
906
    jobListResponse.total_count = jobListResponse?.items?.length;
42✔
907
    jobListResponse.operation_status = {
42✔
908
      code: 200,
42✔
909
      message: 'success'
42✔
910
    };
42✔
911
    return jobListResponse;
42✔
912
  }
42✔
913

2✔
914
  async _getJobList(): Promise<BullJob[]> {
2✔
915
    let jobsList: any[] = [];
24✔
916
    for (const queue of this.queuesList) {
24✔
917
      const getJobsResult = await queue.getJobs(['active', 'delayed', 'repeat']);
72✔
918
      getJobsResult.forEach((job) => {
72✔
919
        if (job?.opts?.repeat?.pattern) {
57!
920
          (job.opts.repeat as any).cron = job.opts.repeat.pattern;
×
921
          delete job.opts.repeat.pattern;
×
922
        }
×
923
      });
72✔
924
      jobsList = jobsList.concat(getJobsResult);
72✔
925
    }
72✔
926
    return jobsList;
24✔
927
  }
24✔
928

2✔
929
  // delete a job by its job instance after processing 'jobDone' / 'jobFailed'
2✔
930
  async _deleteJobInstance(jobId: string, queue: Queue): Promise<void> {
2✔
931
    return this._removeBullJob(jobId, queue);
×
932
  }
×
933

2✔
934
  /**
2✔
935
   * Delete Job from queue.
2✔
936
   */
2✔
937
  async delete(request: DeleteRequest, ctx: any): Promise<DeleteResponse> {
2✔
938
    const deleteResponse: DeleteResponse = { status: [], operation_status: { code: 0, message: '' } };
9✔
939
    if (_.isEmpty(request)) {
9!
940
      return {
×
941
        operation_status: {
×
942
          code: 400,
×
943
          message: 'No arguments provided for delete operation'
×
944
        }
×
945
      };
×
946
    }
×
947

9✔
948
    try {
9✔
949
      const subject = request?.subject;
9✔
950
      const jobIDs = request?.ids;
9✔
951
      let resources = new Array<DeepPartial<CtxResource>>();
9✔
952
      let action;
9✔
953
      if (jobIDs) {
9✔
954
        action = AuthZAction.DELETE;
4✔
955
        if (_.isArray(jobIDs)) {
4✔
956
          for (const id of jobIDs) {
4✔
957
            resources.push({ id });
4✔
958
          }
4✔
959
        } else {
4!
960
          resources = [{ id: jobIDs }];
×
961
        }
×
962
        await this.createMetadata(resources, action, subject);
4✔
963
      }
4✔
964
      if (request.collection) {
9✔
965
        action = AuthZAction.DROP;
5✔
966
        resources = [{ collection: request.collection }];
5✔
967
      }
5✔
968
      let acsResponse: DecisionResponse;
9✔
969
      try {
9✔
970
        if (!ctx) { ctx = {}; };
9!
971
        ctx.subject = subject;
9✔
972
        ctx.resources = resources;
9✔
973
        acsResponse = await checkAccessRequest(
9✔
974
          ctx, [{ resource: 'job', id: jobIDs as string[] }],
9✔
975
          action,
9✔
976
          Operation.isAllowed
9✔
977
        );
9✔
978
      } catch (err: any) {
9!
979
        return this.catchOperationStatus(err, 'Error requesting access-control-srv for delete operation');
×
980
      }
×
981
      if (acsResponse.decision != Response_Decision.PERMIT) {
9✔
982
        return {
1✔
983
          status: [],
1✔
984
          operation_status: acsResponse.operation_status
1✔
985
        };
1✔
986
      }
1✔
987
      const dispatch = [];
8✔
988
      this.logger.info('Received delete request');
8✔
989
      if ('collection' in request && request.collection) {
9✔
990
        this.logger.info('Deleting all jobs');
4✔
991
        for (const queue of this.queuesList || []) {
4!
992
          const jobs = await queue.getJobs(['paused', 'repeat', 'wait', 'active', 'delayed',
12✔
993
            'prioritized', 'waiting', 'waiting-children', 'completed', 'failed']);
12✔
994
          for (const job of jobs || []) {
12!
995
            if (!job) {
22✔
996
              continue;
2✔
997
            }
2✔
998
            let deleted;
20✔
999
            if (job?.repeatJobKey) {
22✔
1000
              deleted = await queue.removeJobScheduler(job?.repeatJobKey);
8✔
1001
            } else {
22✔
1002
              const id = job.key ? job.key : job.id;
12!
1003
              deleted = await queue.removeJobScheduler(id);
12✔
1004
            }
12✔
1005
            this.logger.info('Job deleted with key', { key: job.key, name: job.name });
20✔
1006
            const jobIdentifier = job.id ? job.id : job.name;
22!
1007
            if (this.resourceEventsEnabled) {
22✔
1008
              dispatch.push(this.jobEvents.emit('jobsDeleted', { id: jobIdentifier }));
20✔
1009
            }
20✔
1010
            deleteResponse.status.push({
20✔
1011
              id: jobIdentifier,
20✔
1012
              code: 200,
20✔
1013
              message: 'success'
20✔
1014
            });
20✔
1015
          }
20✔
1016
        }
12✔
1017
        // FLUSH redis DB index 8 used for mapping of repeat jobIds (since req is for dropping job collection)
4✔
1018
        const delResp = await this.repeatJobIdRedisClient.flushDb();
4✔
1019
        if (delResp) {
4✔
1020
          this.logger.info('Mapped keys for repeatable jobs deleted successfully');
4✔
1021
        } else {
4!
1022
          this.logger.info('Could not delete repeatable job keys');
×
1023
        }
×
1024
        await this.clear();
4✔
1025
      } else if ('ids' in request) {
4✔
1026
        this.logger.info('Deleting jobs by their IDs', { id: request.ids });
4✔
1027

4✔
1028
        for (const queue of this.queuesList) {
4✔
1029
          for (const jobDataKey of request.ids) {
12✔
1030
            let callback: Promise<boolean>;
12✔
1031
            const jobIdData = await this.getRedisValue(jobDataKey as string);
12✔
1032
            // future jobs scheduled with `when` will have same repeatId as external SCS jobID
12✔
1033
            if (jobIdData?.repeatId && (jobIdData.repeatId != jobDataKey)) {
12✔
1034
              const jobs = await queue.getJobSchedulers();
2✔
1035
              for (const job of jobs) {
2✔
1036
                if (job?.key === jobIdData.repeatId) {
1✔
1037
                  this.logger.info('Removing Repeatable job by key', { key: job.key, name: job.name, id: jobDataKey });
1✔
1038
                  callback = queue.removeJobScheduler(job.key);
1✔
1039
                  deleteResponse.status.push({
1✔
1040
                    id: jobDataKey,
1✔
1041
                    code: 200,
1✔
1042
                    message: 'success'
1✔
1043
                  });
1✔
1044
                  await this.deleteRedisKey(jobDataKey as string);
1✔
1045
                  await this.deleteRedisKey(jobIdData.repeatId);
1✔
1046
                  break;
1✔
1047
                }
1✔
1048
              }
1✔
1049
            } else {
12✔
1050
              callback = queue.getJob(jobDataKey).then(async (jobData) => {
10✔
1051
                if (jobData) {
10✔
1052
                  try {
3✔
1053
                    await this._removeBullJob(jobData.id, queue);
3✔
1054
                    await this.deleteRedisKey(jobData.id);
3✔
1055
                    deleteResponse.status.push({
3✔
1056
                      id: jobData.id.toString(),
3✔
1057
                      code: 200,
3✔
1058
                      message: 'success'
3✔
1059
                    });
3✔
1060
                  } catch (err: any) {
3!
1061
                    deleteResponse.status.push(
×
1062
                      this.catchStatus(jobData.id.toString(), err)
×
1063
                    );
×
1064
                    return false;
×
1065
                  }
×
1066
                  return true;
3✔
1067
                }
3✔
1068
                return false;
7✔
1069
              });
10✔
1070
            }
10✔
1071

12✔
1072
            // since no CB is returned for removeRepeatableByKey by bull
12✔
1073
            if (!callback) {
12✔
1074
              if (this.resourceEventsEnabled) {
1✔
1075
                dispatch.push(this.jobEvents.emit(
1✔
1076
                  'jobsDeleted', { id: jobDataKey })
1✔
1077
                );
1✔
1078
              }
1✔
1079
            } else {
12✔
1080
              callback.then(() => {
11✔
1081
                if (this.resourceEventsEnabled) {
11✔
1082
                  dispatch.push(this.jobEvents.emit(
11✔
1083
                    'jobsDeleted', { id: jobDataKey })
11✔
1084
                  );
11✔
1085
                }
11✔
1086
              }).catch(err => {
11✔
1087
                deleteResponse.status.push(
×
1088
                  this.catchStatus(jobDataKey.toString(), err, 'Error deleting job')
×
1089
                );
×
1090
              });
11✔
1091
            }
11✔
1092
          }
12✔
1093
        }
12✔
1094
      }
4✔
1095

8✔
1096
      await Promise.all(dispatch);
8✔
1097
      deleteResponse.operation_status = { code: 200, message: 'success' };
8✔
1098
    } catch (err: any) {
9!
1099
      this.catchOperationStatus(err);
×
1100
    }
×
1101
    return deleteResponse;
8✔
1102
  }
8✔
1103

2✔
1104
  /**
2✔
1105
   * Clean up queues - removes complted and failed jobs from queue
2✔
1106
   * @param {any} job clean up job
2✔
1107
   */
2✔
1108
  async cleanupJobs(ttlAfterFinished: number, maxJobsToCleanLimit: number) {
2✔
1109
    for (const queue of this.queuesList) {
×
1110
      try {
×
1111
        await queue.clean(ttlAfterFinished, maxJobsToCleanLimit, COMPLETED_JOB_STATE);
×
1112
        await queue.clean(ttlAfterFinished, maxJobsToCleanLimit, FAILED_JOB_STATE);
×
1113
      } catch (err) {
×
1114
        this.logger.error('Error cleaning up jobs', err);
×
1115
      }
×
1116
    }
×
1117
    this.logger.info('Jobs cleaned up successfully');
×
1118
    const lastExecutedInterval = { lastExecutedInterval: (new Date()).toString() };
×
1119
    await this.repeatJobIdRedisClient.set(QUEUE_CLEANUP, JSON.stringify(lastExecutedInterval));
×
1120
  }
×
1121

2✔
1122
  async setupCleanInterval(cleanInterval: number, ttlAfterFinished: number, maxJobsToCleanLimit: number) {
2✔
1123
    if (!ttlAfterFinished) {
2!
1124
      ttlAfterFinished = DEFAULT_CLEANUP_COMPLETED_JOBS;
×
1125
    }
×
1126
    const intervalData = await this.getRedisValue(QUEUE_CLEANUP);
2✔
1127
    let timeInMs, delta;
2✔
1128
    const now = new Date().getTime();
2✔
1129
    if (intervalData?.lastExecutedInterval && typeof (intervalData.lastExecutedInterval) === 'string') {
2!
1130
      timeInMs = new Date(intervalData.lastExecutedInterval).getTime();
×
1131
      this.logger.debug('Previous execution interval', intervalData);
×
1132
      delta = now - timeInMs;
×
1133
      this.logger.debug('Clean interval and previous difference', { cleanInterval, difference: delta });
×
1134
    }
×
1135

2✔
1136
    if (delta && (delta < cleanInterval)) {
2!
1137
      // use setTimeout and then create interval on setTimeout
×
1138
      this.logger.info('Restoring previous execution interval with set timeout', { time: cleanInterval - delta });
×
1139
      setTimeout(async () => {
×
1140
        await this.cleanupJobs(ttlAfterFinished, maxJobsToCleanLimit);
×
1141
        setInterval(this.cleanupJobs.bind(this), cleanInterval, ttlAfterFinished, maxJobsToCleanLimit);
×
1142
      }, cleanInterval - delta);
×
1143
    } else {
2✔
1144
      setInterval(this.cleanupJobs.bind(this), cleanInterval, ttlAfterFinished, maxJobsToCleanLimit);
2✔
1145
      this.logger.info('Clean up job interval set successfully');
2✔
1146
    }
2✔
1147
  }
2✔
1148

2✔
1149
  /**
2✔
1150
   * Reschedules a job - deletes it and recreates it with a new generated ID.
2✔
1151
   */
2✔
1152
  async update(request: JobList, ctx: any): Promise<DeepPartial<JobListResponse>> {
2✔
1153
    const subject = request.subject;
4✔
1154
    // update meta data for owners information
4✔
1155
    await this.createMetadata(request.items, AuthZAction.MODIFY, subject);
4✔
1156
    let acsResponse: DecisionResponse;
4✔
1157
    try {
4✔
1158
      if (!ctx) { ctx = {}; };
4!
1159
      ctx.subject = subject;
4✔
1160
      ctx.resources = request?.items?.map((job) => {
4✔
1161
        const { data, ...resource } = job;
4✔
1162
        return resource;
4✔
1163
      });
4✔
1164
      acsResponse = await checkAccessRequest(ctx,
4✔
1165
        [{ resource: 'job', id: request.items.map(item => item.id) }],
4✔
1166
        AuthZAction.MODIFY, Operation.isAllowed);
4✔
1167
    } catch (err: any) {
4!
1168
      return this.catchOperationStatus('Error requesting access-control-srv for update operation', err);
×
1169
    }
×
1170
    if (acsResponse.decision != Response_Decision.PERMIT) {
4✔
1171
      return { operation_status: acsResponse.operation_status };
1✔
1172
    }
1✔
1173
    if (_.isNil(request) || _.isNil(request.items)) {
4!
1174
      return {
×
1175
        operation_status: {
×
1176
          code: 400,
×
1177
          message: 'Missing items in update request'
×
1178
        }
×
1179
      };
×
1180
    }
×
1181

3✔
1182
    const mappedJobs = request?.items?.reduce((obj, job) => {
4✔
1183
      obj[job.id] = job;
3✔
1184
      return obj;
3✔
1185
    }, {} as Record<string, Job>);
4✔
1186

4✔
1187
    const jobData = await this.read(JobReadRequest.fromPartial(
4✔
1188
      {
4✔
1189
        filter: {
4✔
1190
          job_ids: Object.keys(mappedJobs)
4✔
1191
        },
4✔
1192
        subject
4✔
1193
      }
4✔
1194
    ), ctx);
4✔
1195

3✔
1196
    await this.delete(DeleteRequest.fromPartial({
3✔
1197
      ids: Object.keys(mappedJobs),
3✔
1198
      subject
3✔
1199
    }), {});
3✔
1200

3✔
1201
    const result = new Array<Job>();
3✔
1202

3✔
1203
    jobData?.items?.forEach(async (job) => {
4✔
1204
      const mappedJob = mappedJobs[job?.payload?.id];
3✔
1205
      const endJob = {
3✔
1206
        id: mappedJob.id,
3✔
1207
        type: mappedJob.type,
3✔
1208
        queue_name: job?.payload?.queue_name,
3✔
1209
        options: {
3✔
1210
          ...job.payload.options,
3✔
1211
          ...(mappedJob.options ? mappedJob.options : {})
3!
1212
        },
3✔
1213
        data: mappedJob.data || job.payload.data,
3!
1214
        when: mappedJob.when,
3✔
1215
      };
3✔
1216

3✔
1217
      if (endJob.when && endJob.options) {
3✔
1218
        delete (endJob.options as any).delay;
3✔
1219
      }
3✔
1220

3✔
1221
      result.push(endJob);
3✔
1222
    });
4✔
1223

4✔
1224
    return this.create(JobList.fromPartial({
4✔
1225
      items: result,
4✔
1226
      subject
4✔
1227
    }), ctx);
4✔
1228
  }
4✔
1229

2✔
1230
  /**
2✔
1231
   * Upserts a job - creates a new job if it does not exist or update the
2✔
1232
   * existing one if it already exists.
2✔
1233
   */
2✔
1234
  async upsert(request: JobList, ctx: any): Promise<DeepPartial<JobListResponse>> {
2✔
1235
    const subject = request.subject;
2✔
1236
    await this.createMetadata(request.items, AuthZAction.MODIFY, subject);
2✔
1237
    let acsResponse: DecisionResponse;
2✔
1238
    try {
2✔
1239
      if (!ctx) { ctx = {}; };
2!
1240
      ctx.subject = subject;
2✔
1241
      ctx.resources = request?.items?.map((job) => {
2✔
1242
        const { data, ...resource } = job;
2✔
1243
        return resource;
2✔
1244
      });
2✔
1245
      acsResponse = await checkAccessRequest(ctx,
2✔
1246
        [{ resource: 'job', id: request.items.map(item => item.id) }],
2✔
1247
        AuthZAction.MODIFY, Operation.isAllowed);
2✔
1248
    } catch (err: any) {
2!
1249
      return this.catchOperationStatus(err, 'Error requesting access-control-srv for upsert operation')
×
1250
    }
×
1251

2✔
1252
    if (acsResponse.decision != Response_Decision.PERMIT) {
2✔
1253
      return { operation_status: acsResponse.operation_status };
1✔
1254
    }
1✔
1255
    if (_.isNil(request) || _.isNil(request.items)) {
2!
1256
      return { operation_status: { code: 400, message: 'Missing items in upsert request' } };
×
1257
    }
×
1258

1✔
1259
    const result = new Array<JobResponse>;
1✔
1260
    for (const eachJob of request.items) {
1✔
1261
      let jobExists = false;
1✔
1262
      const origJobId = _.cloneDeep(eachJob.id);
1✔
1263
      for (const queue of this.queuesList) {
1✔
1264
        const jobIdData = await this.getRedisValue(eachJob.id as string);
2✔
1265
        // future jobs scheduled with `when` will have same repeatId as external SCS jobID
2✔
1266
        if (jobIdData?.repeatId && (jobIdData.repeatId != origJobId)) {
2!
1267
          const repeatId = jobIdData.repeatId;
×
1268
          if (jobIdData?.options?.repeat?.cron && jobIdData?.options?.repeat?.every) {
×
1269
            result.push({
×
1270
              status: {
×
1271
                id: origJobId,
×
1272
                code: 400,
×
1273
                message: 'Both .cron and .every options are defined for this repeatable job'
×
1274
              }
×
1275
            });
×
1276
            continue;
×
1277
          }
×
1278
          const nextMillis = this.getNextMillis(Date.now(), jobIdData.options.repeat);
×
1279
          this.logger.debug('Repeatable job identifier', { id: eachJob.id, repeatId: `repeat:${repeatId}:${nextMillis}` });
×
1280
          // map the repeatKey with nextmilis for bull repeatable jobID
×
1281
          eachJob.id = `repeat:${repeatId}:${nextMillis}`;
×
1282
        }
×
1283
        const jobInst = await queue.getJob(eachJob.id);
2✔
1284
        if (jobInst) {
2✔
1285
          // existing job update it with the given job identifier
1✔
1286
          if (eachJob.id.startsWith('repeat:')) {
1!
1287
            eachJob.id = origJobId;
×
1288
          }
×
1289
          result.push(
1✔
1290
            ...((await this.update(JobList.fromPartial({ items: [eachJob], subject }), ctx))?.items ?? [])
1!
1291
          );
1✔
1292
          jobExists = true;
1✔
1293
          break;
1✔
1294
        }
1✔
1295
      }
2✔
1296
      if (!jobExists) {
1!
1297
        // new job create it
×
1298
        result.push(
×
1299
          ...((await this.create(JobList.fromPartial({ items: [eachJob], subject }), ctx))?.items ?? [])
×
1300
        );
×
1301
      }
×
1302
    }
1✔
1303

1✔
1304
    return {
1✔
1305
      items: result,
1✔
1306
      total_count: result.length,
1✔
1307
      operation_status: {
1✔
1308
        code: 200,
1✔
1309
        message: 'success'
1✔
1310
      }
1✔
1311
    };
1✔
1312
  }
1✔
1313

2✔
1314
  /**
2✔
1315
   * Clear all job data.
2✔
1316
   */
2✔
1317
  async clear(): Promise<any> {
2✔
1318
    let allJobs: any[] = [];
6✔
1319
    for (const queue of this.queuesList) {
6✔
1320
      allJobs = allJobs.concat(await queue.getJobs(['paused', 'repeat', 'wait', 'active', 'delayed',
18✔
1321
        'prioritized', 'waiting', 'waiting-children', 'completed', 'failed']));
18✔
1322
    }
18✔
1323
    return Promise.all(allJobs.map(async (job) => job?.remove())).catch(err => {
6✔
1324
      this.logger.error(`Error clearing jobs`, err);
×
1325
      throw err;
×
1326
    });
6✔
1327
  }
6✔
1328

2✔
1329
  async _removeBullJob(jobInstID: string, queue: Queue): Promise<void> {
2✔
1330
    return queue.getJob(jobInstID).then(job => {
3✔
1331
      if (job) {
3✔
1332
        return job.remove();
3✔
1333
      }
3✔
1334
    }).then(() => {
3✔
1335
      this.logger.info(`Job#${jobInstID} removed`);
3✔
1336
    }).catch(err => {
3✔
1337
      this.logger.error(`Error removing job ${jobInstID}`, err);
×
1338
      throw err;
×
1339
    });
3✔
1340
  }
3✔
1341

2✔
1342
  /**
2✔
1343
   *  disable access control
2✔
1344
   */
2✔
1345
  disableAC() {
2✔
1346
    try {
13✔
1347
      this.cfg.set('authorization:enabled', false);
13✔
1348
      updateConfig(this.cfg);
13✔
1349
    } catch (err) {
13!
1350
      this.logger.error('Error caught disabling authorization:', { err });
×
1351
      this.cfg.set('authorization:enabled', this.authZCheck);
×
1352
    }
×
1353
  }
13✔
1354

2✔
1355
  /**
2✔
1356
   *  enables access control
2✔
1357
   */
2✔
1358
  enableAC() {
2✔
1359
    try {
2✔
1360
      this.cfg.set('authorization:enabled', true);
2✔
1361
      updateConfig(this.cfg);
2✔
1362
    } catch (err) {
2!
1363
      this.logger.error('Error caught enabling authorization:', { err });
×
1364
      this.cfg.set('authorization:enabled', this.authZCheck);
×
1365
    }
×
1366
  }
2✔
1367

2✔
1368
  /**
2✔
1369
   *  restore AC state to previous vale either before enabling or disabling AC
2✔
1370
   */
2✔
1371
  restoreAC() {
2✔
1372
    try {
2✔
1373
      this.cfg.set('authorization:enabled', this.authZCheck);
2✔
1374
      updateConfig(this.cfg);
2✔
1375
    } catch (err) {
2!
1376
      this.logger.error('Error caught enabling authorization:', { err });
×
1377
      this.cfg.set('authorization:enabled', this.authZCheck);
×
1378
    }
×
1379
  }
2✔
1380

2✔
1381
  /**
2✔
1382
   * reads meta data from DB and updates owners information in resource if action is UPDATE / DELETE
2✔
1383
   * @param resources list of resources
2✔
1384
   * @param action resource action
2✔
1385
   * @param subject subject name
2✔
1386
   */
2✔
1387
  async createMetadata(resources: any, action: string, subject: Subject): Promise<any> {
2✔
1388
    const orgOwnerAttributes: Attribute[] = [];
23✔
1389
    if (resources && !_.isArray(resources)) {
23!
1390
      resources = [resources];
×
1391
    }
×
1392
    const urns = this.cfg.get('authorization:urns');
23✔
1393
    if (subject?.scope && (action === AuthZAction.CREATE || action === AuthZAction.MODIFY)) {
23✔
1394
      // add subject scope as default owners
19✔
1395
      orgOwnerAttributes.push(
19✔
1396
        {
19✔
1397
          id: urns?.ownerIndicatoryEntity,
19✔
1398
          value: urns?.organization,
19✔
1399
          attributes: [{
19✔
1400
            id: urns?.ownerInstance,
19✔
1401
            value: subject?.scope,
19✔
1402
            attributes: []
19✔
1403
          }]
19✔
1404
        });
19✔
1405
    }
19✔
1406

23✔
1407
    if (resources?.length > 0) {
23✔
1408
      for (const resource of resources) {
23✔
1409
        if (!resource.data) {
29✔
1410
          resource.data = { meta: {} };
4✔
1411
        } else if (!resource.data.meta) {
29✔
1412
          resource.data.meta = {};
18✔
1413
        }
18✔
1414
        if (resource?.id && (action === AuthZAction.MODIFY || action === AuthZAction.DELETE)) {
29✔
1415
          let result;
10✔
1416
          try {
10✔
1417
            result = await this.read(JobReadRequest.fromPartial({
10✔
1418
              filter: {
10✔
1419
                job_ids: [resource.id]
10✔
1420
              },
10✔
1421
              subject
10✔
1422
            }), {});
10✔
1423
          } catch (error: any) {
10!
1424
            if (error.message?.startsWith('Error! Jobs not found in any of the queues') && action != AuthZAction.DELETE) {
×
1425
              this.logger.debug('New job should be created', { jobId: resource.id });
×
1426
              result = { items: [] };
×
1427
            } else {
×
1428
              this.logger.error(`Error reading job with resource ID ${resource.id}`, { error });
×
1429
            }
×
1430
          }
×
1431
          // update owners info
10✔
1432
          if (result?.items?.length === 1 && result?.items[0]?.payload) {
10✔
1433
            const item = result.items[0].payload;
8✔
1434
            resource.data.meta.owners = item.data.meta.owners;
8✔
1435
            // adding meta to resource root (needed by access-contorl-srv for owners information check)
8✔
1436
            // meta is inside data of resource since the data is persisted in redis using bull
8✔
1437
            resource.meta = { owners: item.data.meta.owners };
8✔
1438
          } else if ((!result || !result.items || !result.items[0] || !result.items[0].payload) && action === AuthZAction.MODIFY) {
10!
1439
            // job does not exist - create new job (ex: Upsert with action modify)
2✔
1440
            const ownerAttributes = _.cloneDeep(orgOwnerAttributes);
2✔
1441
            // add user as default owners
2✔
1442
            ownerAttributes.push(
2✔
1443
              {
2✔
1444
                id: urns?.ownerIndicatoryEntity,
2✔
1445
                value: urns?.user,
2✔
1446
                attributes: [{
2✔
1447
                  id: urns?.ownerInstance,
2✔
1448
                  value: subject?.id,
2✔
1449
                  attributes: []
2✔
1450
                }]
2✔
1451
              });
2✔
1452
            resource.data.meta.owners = ownerAttributes;
2✔
1453
            resource.meta = { owners: ownerAttributes };
2✔
1454
          }
2✔
1455
        } else if ((action === AuthZAction.CREATE || !resource.id) && !resource.data.meta.owners) {
29!
1456
          const ownerAttributes = _.cloneDeep(orgOwnerAttributes);
16✔
1457
          if (!resource.id) {
16✔
1458
            resource.id = uuid.v4().replace(/-/g, '');
14✔
1459
          }
14✔
1460
          // add user as default owners
16✔
1461
          if (resource.id) {
16✔
1462
            ownerAttributes.push(
16✔
1463
              {
16✔
1464
                id: urns.ownerIndicatoryEntity,
16✔
1465
                value: urns.user,
16✔
1466
                attributes: [{
16✔
1467
                  id: urns.ownerInstance,
16✔
1468
                  value: subject?.id,
16✔
1469
                  attributes: []
16✔
1470
                }]
16✔
1471
              });
16✔
1472
          }
16✔
1473
          resource.data.meta.owners = ownerAttributes;
16✔
1474
          resource.meta = { owners: ownerAttributes };
16✔
1475
        } else if (action === AuthZAction.CREATE && resource?.data?.meta?.owners) {
19✔
1476
          resource.meta = { owners: resource.data.meta.owners };
3✔
1477
        }
3✔
1478
      }
29✔
1479
    }
23✔
1480
    return resources;
23✔
1481
  }
23✔
1482
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc