• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

restorecommerce / scheduling-srv / 18377090959

09 Oct 2025 12:57PM UTC coverage: 69.771% (-5.8%) from 75.556%
18377090959

push

github

Arun-KumarH
fix: lcov report gets generated automatically

221 of 325 branches covered (68.0%)

Branch coverage included in aggregate %.

1210 of 1726 relevant lines covered (70.1%)

10.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.05
/src/schedulingService.ts
1
import * as _ from 'lodash-es';
1✔
2
import { errors } from '@restorecommerce/chassis-srv';
1✔
3
import * as kafkaClient from '@restorecommerce/kafka-client';
4
import {
1✔
5
  AuthZAction,
6
  DecisionResponse,
7
  Operation,
8
  PolicySetRQResponse,
9
  CtxResource,
10
  CustomQueryArgs
11
} from '@restorecommerce/acs-client';
12
import {
1✔
13
  JobServiceImplementation as SchedulingServiceServiceImplementation,
14
  JobFailed, JobDone, DeepPartial, JobList, JobListResponse,
15
  Backoff_Type, JobOptions_Priority, JobReadRequest, JobReadRequest_SortOrder,
16
  JobResponse, Job,
17
} from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/job.js';
18
import { createClient, RedisClientType } from 'redis';
1✔
19
import { NewJob, Priority } from './types.js';
1✔
20
import pkg, { CronExpression } from 'cron-parser';
1✔
21
import { _filterJobData, _filterJobOptions, _filterQueuedJob, checkAccessRequest, decomposeError, marshallProtobufAny } from './utilts.js';
1✔
22
import * as uuid from 'uuid';
1✔
23
import { Logger } from 'winston';
24
import { Response_Decision } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/access_control.js';
1✔
25
import { Attribute } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/attribute.js';
26
import {
1✔
27
  DeleteRequest,
28
  DeleteResponse,
29
  ResourceListResponse
30
} from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/resource_base.js';
31
import { Queue, QueueOptions, Job as BullJob } from 'bullmq';
1✔
32
import { Status } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/status.js';
33
import { Subject } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/auth.js';
34
import { Meta } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/meta.js';
35

36
const { CronExpressionParser } = pkg;
1✔
37
const JOB_DONE_EVENT = 'jobDone';
1✔
38
const JOB_FAILED_EVENT = 'jobFailed';
1✔
39
const DEFAULT_CLEANUP_COMPLETED_JOBS = 604800000; // 7 days in miliseconds
1✔
40
const COMPLETED_JOB_STATE = 'completed';
1✔
41
const FAILED_JOB_STATE = 'failed';
1✔
42
const QUEUE_CLEANUP = 'queueCleanup';
1✔
43

44
/**
45
 * A job scheduling service.
46
 */
47
export class SchedulingService implements SchedulingServiceServiceImplementation {
1✔
48
  public queuesList: Queue[];
2✔
49
  protected queuesConfigList: any;
2✔
50
  protected defaultQueueName: string;
2✔
51
  protected resourceEventsEnabled: boolean;
2✔
52
  protected canceledJobs: Set<string>;
2✔
53
  protected repeatJobIdRedisClient: RedisClientType<any, any>;
2✔
54
  protected techUser: Subject;
2✔
55

56
  constructor(
2✔
57
    protected readonly jobEvents: kafkaClient.Topic,
2✔
58
    protected readonly redisConfig: any,
2✔
59
    protected readonly logger: Logger,
2✔
60
    protected readonly redisClient: RedisClientType<any, any>,
2✔
61
    protected readonly bullOptions: any,
2✔
62
    protected readonly cfg: any,
2✔
63
  ) {
2✔
64
    this.resourceEventsEnabled = true;
2✔
65
    this.queuesList = [];
2✔
66
    this.queuesConfigList = [];
2✔
67
    this.techUser = cfg.get('authorization:techUser');
2✔
68

69
    const repeatJobIdCfg = cfg.get('redis');
2✔
70
    repeatJobIdCfg.database = cfg.get('redis:db-indexes:db-repeatJobId');
2✔
71
    this.repeatJobIdRedisClient = createClient(repeatJobIdCfg);
2✔
72
    this.repeatJobIdRedisClient.on(
2✔
73
      'error',
2✔
74
      (err) => logger?.error('Redis client error in repeatable job store', decomposeError(err)));
2✔
75
    this.repeatJobIdRedisClient.connect().then((data) => {
2✔
76
      logger?.info('Redis client connection for repeatable job store successful');
2✔
77
    }).catch(err => logger?.error('Redis client error for repeatable job store', decomposeError(err)));
2✔
78

79
    this.canceledJobs = new Set<string>();
2✔
80

81
    // Read Queue Configuration file and find first queue which has "default": true,
82
    // then save it to defaultQueueName
83
    const queuesCfg = this.cfg.get('queue');
2✔
84
    if (_.isEmpty(queuesCfg)) {
2!
85
      this.logger?.error('Queue configuration not found!');
×
86
      throw new Error('Queue configuration not found!');
×
87
    }
×
88
    let defaultTrueExists = false;
2✔
89
    for (const queueCfg of queuesCfg) {
2✔
90
      // Find configuration which has default=true
91
      if (queueCfg.default == true) {
2✔
92
        defaultTrueExists = true;
2✔
93
        this.defaultQueueName = queueCfg.name;
2✔
94
        break;
2✔
95
      }
2✔
96
    }
2✔
97
    if (!defaultTrueExists) {
2!
98
      this.logger?.error('Queue default configuration not found!');
×
99
      throw new Error('Queue default configuration not found!');
×
100
    }
×
101

102
    // Create Queues
103
    for (const queueCfg of queuesCfg) {
2✔
104
      const prefix = queueCfg.name;
6✔
105
      const rateLimiting = queueCfg.rateLimiting;
6✔
106
      const advancedSettings = queueCfg.advancedSettings;
6✔
107

108
      const queueOptions: QueueOptions = {
6✔
109
        connection: {
6✔
110
          ...redisConfig,
6✔
111
        }
6✔
112
      };
6✔
113

114
      // Create Queue Configuration - Add Rate Limiting if enabled
115
      if (!_.isEmpty(rateLimiting) && rateLimiting.enabled == true) {
6!
116
        this.logger?.info(`Queue: ${queueCfg.name} - Rate limiting is ENABLED.`);
×
117
      }
×
118

119
      if (!_.isEmpty(advancedSettings)) {
6✔
120
        queueOptions.settings = {
6✔
121
          ...advancedSettings,
6✔
122
        };
6✔
123
      }
6✔
124

125
      const redisURL = new URL((queueOptions.connection as any).url);
6✔
126

127
      if ('keyPrefix' in queueOptions.connection) {
6!
128
        delete queueOptions.connection.keyPrefix;
×
129
      }
×
130

131
      const queue = new Queue(prefix, {
6✔
132
        ...queueOptions,
6✔
133
        connection: {
6✔
134
          ...queueOptions.connection as any,
6✔
135
          host: redisURL.hostname,
6✔
136
          port: Number.parseInt(redisURL.port)
6✔
137
        }
6✔
138
      });
6✔
139
      this.queuesList.push(queue);
6✔
140

141
      // Add Queue Configurations
142
      const queueCfgObj = {
6✔
143
        name: queueCfg.name,
6✔
144
        concurrency: queueCfg.concurrency,
6✔
145
        default: queueCfg.default,
6✔
146
        runMissedScheduled: queueCfg.runMissedScheduled
6✔
147
      };
6✔
148
      this.queuesConfigList.push(queueCfgObj);
6✔
149
    }
6✔
150
  }
2✔
151

152
  private catchOperationStatus(error: any, message?: string): ResourceListResponse {
2✔
153
    this.logger?.error(message ?? error?.message, decomposeError(error));
×
154
    return {
×
155
      total_count: 0,
×
156
      operation_status: {
×
157
        code: Number.isInteger(error?.code) ? error.code : 500,
×
158
        message: error?.message ?? error?.msg ?? error?.details ?? message,
×
159
      },
×
160
    };
×
161
  }
×
162

163
  private catchStatus(id: string, error: any, message?: string): Status {
2✔
164
    this.logger?.error(message ?? error?.message, decomposeError(error));
×
165
    return {
×
166
      id,
×
167
      code: Number.isInteger(error?.code) ? error.code : 500,
×
168
      message: error?.message ?? error?.msg ?? error?.details ?? message,
×
169
    };
×
170
  }
×
171

172
  /**
173
   * Start handling the job queue, job scheduling and
174
   * managing job events.
175
   */
176
  async start(): Promise<any> {
2✔
177
    const logger = this.logger;
2✔
178
    const events = [JOB_DONE_EVENT, JOB_FAILED_EVENT];
2✔
179
    for (const eventName of events) {
2✔
180
      // A Scheduling Service Event Listener
181
      await this.jobEvents.on(
4✔
182
        eventName,
4✔
183
        async (msg: any, ctx: any, config: any, eventName: string): Promise<any> => {
4✔
184
          const job = msg;
10✔
185
          // Match Job Type to Queue Name, else use Default Queue
186
          const queue = this.queuesList?.find(q => q.name === job.type)
10!
187
            ?? this.queuesList?.find(q => q.name === this.defaultQueueName);
×
188

189
          if (eventName === JOB_FAILED_EVENT) {
10!
190
            logger?.error(`job@${job.type}#${job.id} failed with error #${job.error}`,
×
191
              _filterQueuedJob<JobFailed>(job, this.logger));
×
192
          } else if (eventName === JOB_DONE_EVENT) {
10✔
193
            logger?.verbose(`job#${job.id} done`, _filterQueuedJob<JobDone>(job, this.logger));
10✔
194
          }
10✔
195

196
          logger?.info('Received Job event', { event: eventName });
10✔
197
          logger?.info('Job details', job);
10✔
198
          const jobData: any = await queue.getJob(job.id).catch(error => {
10✔
199
            logger?.error('Error retrieving job ${job.id} from queue', decomposeError(error));
×
200
          });
10✔
201

202
          if (job?.delete_scheduled) {
10!
203
            await queue.removeRepeatable(jobData.name, jobData.opts.repeat);
×
204
          }
×
205
        }
10✔
206
      );
4✔
207
    }
4✔
208

209
    // Initialize Event Listeners for each Queue
210
    for (const queue of this.queuesList) {
2✔
211
      queue.on('error', (error) => {
6✔
212
        logger?.error('queue error', decomposeError(error));
×
213
      });
6✔
214
      queue.on('waiting', (job) => {
6✔
215
        logger?.verbose(`job#${job.id} scheduled`, job);
15✔
216
      });
6✔
217
      queue.on('removed', (job) => {
6✔
218
        logger?.verbose(`job#${job.id} removed`, job);
21✔
219
      });
6✔
220
      queue.on('progress', (job) => {
6✔
221
        logger?.verbose(`job#${job.id} progress`, job);
×
222
      });
6✔
223
    }
6✔
224

225
    // If the scheduling service goes down and if there were
226
    // recurring jobs which have missed schedules then
227
    // we will need to reschedule it for those missing intervals.
228
    await this._rescheduleMissedJobs();
2✔
229
  }
2✔
230

231
  /**
232
   * To reschedule the missed recurring jobs upon service restart
233
   */
234
  private async _rescheduleMissedJobs(): Promise<JobListResponse[]> {
2✔
235
    // for jobs created via Kafka currently there are no acs checks
236
    const result: BullJob[] = [];
2✔
237
    const logger = this.logger;
2✔
238
    // Get the jobs
239
    for (const queueCfg of this.queuesConfigList) {
2✔
240
      // If enabled in the config, or the config is missing,b
241
      // Reschedule the missed jobs, else skip.
242
      const queue = this.queuesList?.find(q => q.name === queueCfg.name);
6✔
243
      if (queueCfg?.runMissedScheduled?.toString() === 'true') {
6✔
244
        await queue.getJobs(['active', 'delayed', 'repeat']).then(jobs => {
6✔
245
          result.push(...(jobs?.filter(Boolean) ?? []));
6!
246
        }).catch((error: any) => {
6✔
247
          logger?.error(
×
248
            'Error reading jobs to reschedule the missed recurring jobs',
×
249
            decomposeError(error)
×
250
          );
×
251
        });
6✔
252
      }
6✔
253
    }
6✔
254
    const pomises = result.map(async job => {
2✔
255
      let lastRunTime;
×
256
      // get the last run time for the job, we store the last run time only
257
      // for recurring jobs
258
      if (job?.name) {
×
259
        try {
×
260
          lastRunTime = await this.redisClient.get(job.name);
×
261
        } catch (error: any) {
×
262
          this.logger?.error(
×
263
            'Error reading the last run time for job type:',
×
264
            { name: job.name }, decomposeError(error)
×
265
          );
×
266
        }
×
267
      }
×
268
      // we store lastRunTime only for recurring jobs and if it exists check
269
      // cron interval and schedule immediate jobs for missed intervals
270
      this.logger?.info(`Job overdue - Last run time of Job ${job?.name} was:`, lastRunTime);
×
271
      if (lastRunTime) {
×
272
        // convert redis string value to object and get actual time value
273
        try {
×
274
          lastRunTime = JSON.parse(lastRunTime);
×
275
        }
×
276
        catch (error: any) {
×
277
          this.logger?.error(
×
278
            'Error parsing lastRunTime',
×
279
            { lastRunTime }, decomposeError(error)
×
280
          );
×
281
        }
×
282

283
        if ((job?.opts?.repeat as any)?.pattern && lastRunTime?.time) {
×
284
          const options = {
×
285
            currentDate: new Date(lastRunTime.time),
×
286
            endDate: new Date(),
×
287
            iterator: true
×
288
          };
×
289
          try {
×
290
            const intervalTime = CronExpressionParser.parse((job.opts.repeat as any).pattern, options);
×
291
            while (intervalTime?.hasNext()) {
×
292
              const nextInterval = intervalTime.next();
×
293
              // schedule it as one time job for now or immediately
294
              const data = {
×
295
                payload: marshallProtobufAny({
×
296
                  value: {
×
297
                    time: nextInterval.toString()
×
298
                  }
×
299
                })
×
300
              };
×
301
              const currentTime = new Date();
×
302
              const when = new Date(currentTime.setSeconds(currentTime.getSeconds() + 2)).toISOString();
×
303
              const immediateJob: Job = {
×
304
                type: job.name,
×
305
                data,
×
306
                // give a delay of 2 sec between each job
307
                // to avoid time out of queued jobs
308
                when,
×
309
                options: {}
×
310
              };
×
311
              return await this.create({
×
312
                items: [immediateJob],
×
313
                total_count: 0,
×
314
                subject: this.techUser,
×
315
              });
×
316
            }
×
317
          }
×
318
          catch (error) {
×
319
            this.logger?.error('Error parsing cron expression running missed schedules', decomposeError(error));
×
320
          }
×
321
        }
×
322
      }
×
323
    });
2✔
324
    return await Promise.all(pomises);
2✔
325
  }
2✔
326

327
  /**
328
   * Disabling of CRUD events.
329
   */
330
  disableEvents(): void {
2✔
331
    this.resourceEventsEnabled = false;
×
332
  }
×
333

334
  /**
335
   * Enabling of CRUD events.
336
   */
337
  enableEvents(): any {
2✔
338
    this.resourceEventsEnabled = true;
×
339
  }
×
340

341
  _validateJob(job: NewJob): NewJob {
2✔
342
    if (_.isNil(job.type)) {
18!
343
      throw new errors.InvalidArgument('Job type not specified.');
×
344
    }
×
345

346
    if (!job.options) {
18!
347
      job.options = {};
×
348
    }
×
349

350
    if (job.when) {
18✔
351
      // If the jobSchedule time has already lapsed then do not schedule the job
352
      const jobScheduleTime = new Date(job.when).getTime();
13✔
353
      const currentTime = new Date().getTime();
13✔
354
      if (jobScheduleTime < currentTime) {
13!
355
        throw new errors.InvalidArgument('Cannot schedule a job for an elapsed time');
×
356
      }
×
357

358
      job.options.delay = jobScheduleTime - currentTime;
13✔
359
    }
13✔
360

361
    if (job.options.backoff && typeof job.options.backoff !== 'number') {
18✔
362
      if (typeof job.options.backoff.type === 'number') {
18!
363
        job.options.backoff.type = Object.keys(Backoff_Type)[job.options.backoff.type];
×
364
      }
×
365
      job.options.backoff.type = job.options.backoff.type.toLowerCase();
18✔
366
    }
18✔
367

368
    if (job.options.priority && typeof job.options.priority === 'string') {
18✔
369
      job.options.priority = JobOptions_Priority[job.options.priority] as any;
18✔
370
    }
18✔
371

372
    if (_.isEmpty(job.data)) {
18!
373
      throw new errors.InvalidArgument('No job data specified.');
×
374
    }
×
375

376
    job.data = _filterJobData(job.data, false, this.logger);
18✔
377

378
    return job;
18✔
379
  }
18✔
380

381
  /**
382
   * get next job execution time in mili seconds
383
   * @param millis
384
   * @param opts
385
   */
386
  getNextMillis(millis: number, opts: any) {
2✔
387
    if (opts?.every) {
1!
388
      return Math.floor(millis / opts.every) * opts.every + opts.every;
×
389
    }
×
390

391
    const currentDate =
1✔
392
      opts?.startDate && new Date(opts.startDate) > new Date(millis)
1!
393
        ? new Date(opts.startDate)
×
394
        : new Date(millis);
1✔
395
    const interval = CronExpressionParser.parse(
1✔
396
      opts.cron,
1✔
397
      _.defaults(
1✔
398
        {
1✔
399
          currentDate
1✔
400
        },
1✔
401
        opts
1✔
402
      )
1✔
403
    );
1✔
404

405
    try {
1✔
406
      return interval.next().getTime();
1✔
407
    } catch (e) {
1!
408
      this.logger?.error('Error getting next job execution time');
×
409
    }
×
410
  }
1✔
411

412
  /**
413
   * store the mapping from repeateKey to external interface SCS job Id, and
414
   * also the mapping other way around i.e. from SCS job Id to repeatKey (needed for read operations)
415
   * @param name - job name
416
   * @param repeat - job repeate options
417
   * @param jobId - job id
418
   */
419
  async storeRepeatKey(repeatId: string, scsJobId: string, options: any) {
2✔
420
    try {
18✔
421
      if (repeatId && scsJobId) {
18✔
422
        this.logger?.info('Repeat key mapped to external SCS JobId', { repeatId, scsJobId });
18✔
423
        await this.repeatJobIdRedisClient.set(repeatId, scsJobId);
18✔
424
        const jobIdData = { repeatId, options };
18✔
425
        await this.repeatJobIdRedisClient.set(scsJobId, JSON.stringify(jobIdData));
18✔
426
      }
18✔
427
    } catch (error: any) {
18!
428
      this.logger?.error('Error storing repeatKey to redis', decomposeError(error));
×
429
    }
×
430
  }
18✔
431

432
  private idGen(): string {
2✔
433
    return uuid.v4().replace(/-/g, '');
×
434
  }
×
435

436
  /**
437
   * Create and queue jobs.
438
   * @param {any} call RPC call argument
439
   * @param {any} ctx RPC context
440
   */
441
  async create(request: JobList, ctx?: any): Promise<DeepPartial<JobListResponse>> {
2✔
442
    const jobListResponse: JobListResponse = { items: [], operation_status: { code: 0, message: '' }, total_count: 0 };
13✔
443
    const subject = request.subject;
13✔
444
    if (!request?.items?.length) {
13!
445
      return {
×
446
        items: [],
×
447
        total_count: 0,
×
448
        operation_status: {
×
449
          code: 400,
×
450
          message: 'Missing items in create request'
×
451
        }
×
452
      };
×
453
    }
×
454

455
    await this.createMetadata(request.items, AuthZAction.CREATE, subject);
13✔
456
    let acsResponse: DecisionResponse;
13✔
457
    try {
13✔
458
      ctx ??= {};
13✔
459
      ctx.subject = subject;
13✔
460
      ctx.resources = request?.items?.map((job) => {
13✔
461
        const { data, ...resource } = job;
19✔
462
        return resource;
19✔
463
      });
13✔
464
      acsResponse = await checkAccessRequest(ctx, [{
13✔
465
        resource: 'job',
13✔
466
        id: request.items.map(item => item.id)
13✔
467
      }], AuthZAction.CREATE, Operation.isAllowed);
13✔
468
    } catch (err: any) {
13!
469
      return this.catchOperationStatus(err, 'Error requesting access-control-srv for create meta data');
×
470
    }
×
471
    if (acsResponse.decision != Response_Decision.PERMIT) {
13✔
472
      return {
1✔
473
        items: [],
1✔
474
        total_count: 0,
1✔
475
        operation_status: acsResponse.operation_status
1✔
476
      };
1✔
477
    }
1✔
478

479
    const jobs: NewJob[] = [];
12✔
480
    for (const job of request?.items || []) {
13!
481
      try {
18✔
482
        jobs.push(this._validateJob(job as any));
18✔
483
      } catch (err: any) {
18!
484
        this.logger?.error('Error validating job', job, decomposeError(err));
×
485
        jobListResponse.items.push({
×
486
          status: {
×
487
            id: job.id,
×
488
            code: 400,
×
489
            message: err.message
×
490
          }
×
491
        });
×
492
      }
×
493
    }
18✔
494

495
    const result: BullJob[] = [];
12✔
496
    // Scheduling jobs
497
    for (let i = 0; i < jobs.length; i += 1) {
13✔
498
      const job = jobs[i];
18✔
499
      // if not jobID is specified generate a UUID
500
      if (!job.id) {
18!
501
        job.id = this.idGen();
×
502
      }
×
503

504
      // map the id to jobId as needed in JobOpts for bull
505
      if (job?.id) {
18✔
506
        // check if jobID already exists then map it as already exists error
507
        const existingJobId = await this.getRedisValue(job.id);
18✔
508
        if (existingJobId) {
18!
509
          // read job to check if data exists
510
          const jobData = await this.read(JobReadRequest.fromPartial({
×
511
            filter: {
×
512
              job_ids: [job.id]
×
513
            },
×
514
            subject
×
515
          }), ctx);
×
516
          if ((jobData?.items as any)[0]?.payload) {
×
517
            jobListResponse.items.push({
×
518
              status: {
×
519
                id: job.id,
×
520
                code: 403,
×
521
                message: `Job with ID ${job.id} already exists`
×
522
              }
×
523
            });
×
524
            continue;
×
525
          }
×
526
        }
×
527
        if (!job?.options) {
18!
528
          job.options = { jobId: job.id };
×
529
        } else {
18✔
530
          job.options.jobId = job.id;
18✔
531
        }
18✔
532
        if (job?.options?.repeat) {
18✔
533
          (job as any).options.repeat.jobId = job.id;
3✔
534
        }
3✔
535
      }
18✔
536

537
      if (!job.data.meta) {
18!
538
        const now = new Date();
×
539
        const metaObj: Meta = {
×
540
          created: now,
×
541
          modified: now,
×
542
          modified_by: '',
×
543
          owners: []
×
544
        };
×
545
        Object.assign(job.data, { meta: metaObj });
×
546
      }
×
547
      // if only owners are specified in meta
548
      if (job.data.meta && (!job.data.meta.created || !job.data.meta.modified)) {
18✔
549
        job.data.meta.created = new Date();
15✔
550
        job.data.meta.modified = new Date();
15✔
551
      }
15✔
552
      if (job?.data?.meta) {
18✔
553
        job.data.meta.created_by = subject?.id;
18✔
554
        job.data.meta.modified_by = subject?.id;
18✔
555
      }
18✔
556

557
      if (job?.data?.payload?.value) {
18✔
558
        job.data.payload.value = job.data.payload.value.toString() as any;
18✔
559
      }
18✔
560

561
      // convert enum priority back to number as it's expected by bull
562
      if (job?.options?.priority) {
18✔
563
        job.options.priority = typeof job.options.priority === 'number' ? job.options.priority : Priority[job.options.priority] as unknown as number;
18!
564
      }
18✔
565

566
      // if its a repeat job and tz is empty delete the key (else cron parser throws an error)
567
      if (job?.options?.repeat?.tz === '') {
18!
568
        delete job.options.repeat.tz;
×
569
      }
×
570

571
      const bullOptions = {
18✔
572
        ...job.options
18✔
573
      };
18✔
574

575
      if ((bullOptions as any).timeout === 1) {
18✔
576
        delete (bullOptions as any).timeout;
2✔
577
      }
2✔
578

579
      // Match the Job Type with the Queue Name and add the Job to this Queue.
580
      // If there is no match, add the Job to the Default Queue
581
      let queue = _.find(this.queuesList, { name: job?.queue_name });
18✔
582
      if (_.isEmpty(queue)) {
18!
583
        queue = _.find(this.queuesList, { name: this.defaultQueueName });
×
584
      }
×
585
      const submittedJob = await queue.add(job.type, job.data, bullOptions);
18✔
586
      if (submittedJob?.id?.startsWith('repeat:')) {
18✔
587
        const repeatJobId = submittedJob?.id?.split(':')[1];
3✔
588
        await this.storeRepeatKey(repeatJobId, job.id, job.options);
3✔
589
      } else if (submittedJob?.id) {
18✔
590
        // future job with when
591
        await this.storeRepeatKey(submittedJob.id, job.id, job.options);
15✔
592
      }
15✔
593
      this.logger?.verbose(`job@${job.type} created`, job);
18✔
594
      result.push(submittedJob);
18✔
595
    }
18✔
596

597
    for (const job of result) {
13✔
598
      const jobId = job.id as string;
18✔
599
      if (jobId.startsWith('repeat:')) {
18✔
600
        const repeatKey = jobId.split(':')[1];
3✔
601
        job.id = await this.getRedisValue(repeatKey);
3✔
602
      }
3✔
603
    }
18✔
604

605
    for (const job of result) {
13✔
606
      const when = job?.opts?.delay ? new Date(job?.opts?.delay).toString() : '';
18✔
607
      jobListResponse.items.push({
18✔
608
        payload: {
18✔
609
          id: job.id as string,
18✔
610
          type: job.name,
18✔
611
          queue_name: job?.queueName,
18✔
612
          data: _filterJobData(job.data, true, this.logger),
18✔
613
          options: _filterJobOptions(job.opts) as any,
18✔
614
          when
18✔
615
        },
18✔
616
        status: {
18✔
617
          id: (job?.id)?.toString(),
18✔
618
          code: 200,
18✔
619
          message: 'success'
18✔
620
        }
18✔
621
      });
18✔
622
    }
18✔
623
    const jobList = {
12✔
624
      items: result.map(job => ({
12✔
625
        id: job.id,
18✔
626
        type: job.name,
18✔
627
        data: _filterJobData(job.data, true, this.logger),
18✔
628
        options: _filterJobOptions(job.opts)
18✔
629
      })),
12✔
630
      total_count: result.length
12✔
631
    };
12✔
632

633
    if (this.resourceEventsEnabled) {
12✔
634
      await this.jobEvents.emit('jobsCreated', jobList);
12✔
635
    }
12✔
636

637
    jobListResponse.operation_status = jobListResponse.items?.some(
12✔
638
      item => item.status?.code !== 200
13✔
639
    ) ? { code: 207, message: 'Multi Status!' }
13!
640
      : { code: 200, message: 'success' };
12✔
641
    return jobListResponse;
13✔
642
  }
13✔
643

644
  private filterByOwnerShip(customArgsObj: CustomQueryArgs, result: BullJob[]): BullJob[] {
2✔
645
    // applying filter based on custom arguments (filterByOwnerShip)
646
    const filteredResult = new Array<BullJob>();
38✔
647
    const customArgs = customArgsObj?.custom_arguments;
38✔
648
    if (customArgs?.value) {
38✔
649
      let customArgsFilter;
24✔
650
      try {
24✔
651
        customArgsFilter = JSON.parse(customArgs.value.toString());
24✔
652
      } catch (error: any) {
24!
653
        this.logger?.error('Error parsing custom query arguments', decomposeError(error));
×
654
      }
×
655
      if (customArgsFilter?.length === 0) {
24!
656
        return [];
×
657
      }
×
658
      if (!Array.isArray(customArgsFilter)) {
24!
659
        customArgsFilter = [customArgsFilter];
×
660
      }
×
661
      for (const customArgObj of customArgsFilter) {
24✔
662
        const ownerIndicatorEntity = customArgObj?.entity;
24✔
663
        const ownerValues = customArgObj?.instance;
24✔
664
        const ownerIndictaorEntURN = this.cfg.get('authorization:urns:ownerIndicatoryEntity');
24✔
665
        const ownerInstanceURN = this.cfg.get('authorization:urns:ownerInstance');
24✔
666
        const filteredResp = result.filter(job => {
24✔
667
          if (job?.data?.meta?.owners?.length > 0) {
57✔
668
            for (const owner of job.data.meta.owners) {
44✔
669
              if (owner?.id === ownerIndictaorEntURN && owner?.value === ownerIndicatorEntity && owner?.attributes?.length > 0) {
44✔
670
                for (const ownerInstObj of owner.attributes) {
44✔
671
                  if (ownerInstObj?.id === ownerInstanceURN && ownerInstObj?.value && ownerValues.includes(ownerInstObj.value)) {
44✔
672
                    return job;
44✔
673
                  }
44✔
674
                }
44!
675
              }
×
676
            }
44!
677
          }
×
678
        }) as BullJob[];
24✔
679
        filteredResult.push(...filteredResp);
24✔
680
      }
24✔
681
      return filteredResult;
24✔
682
    } else {
27✔
683
      // no custom filters exist, return complete result set
684
      return result;
14✔
685
    }
14✔
686
  }
38✔
687

688
  async deleteRedisKey(key: string): Promise<any> {
2✔
689
    try {
5✔
690
      await this.repeatJobIdRedisClient.del(key);
5✔
691
      this.logger?.debug('Redis Key deleted successfully used for mapping repeatable jobID', { key });
5✔
692
    } catch (err: any) {
5!
693
      this.logger?.error('Error deleting redis key', { key }, decomposeError(err));
×
694
    }
×
695
  }
5✔
696

697
  async getRedisValue(key: string): Promise<any> {
2✔
698
    let redisValue;
71✔
699
    try {
71✔
700
      if (key) {
71✔
701
        redisValue = await this.repeatJobIdRedisClient.get(key);
71✔
702
      }
71✔
703
      if (redisValue) {
71✔
704
        return JSON.parse(redisValue);
49✔
705
      } else {
71✔
706
        return;
22✔
707
      }
22✔
708
    } catch (err: any) {
71✔
709
      if (err.message?.startsWith('Unexpected token') || err.message?.startsWith('Unexpected number') || err.message?.startsWith('Unexpected non-whitespace character')) {
23✔
710
        return redisValue;
23✔
711
      } else {
23!
712
        this.logger?.error('Error reading redis key', { key }, decomposeError(err));
×
713
      }
×
714
    }
23✔
715
  }
71✔
716

717

718
  /**
719
   * Retrieve jobs.
720
   * @param {any} call RPC call argument
721
   * @param {any} ctx RPC context
722
   */
723
  async read(request: JobReadRequest, ctx?: any): Promise<DeepPartial<JobListResponse>> {
2✔
724
    const jobListResponse: JobListResponse = { items: [], operation_status: { code: 0, message: '' }, total_count: 0 };
42✔
725
    const subject = request.subject;
42✔
726
    let acsResponse: PolicySetRQResponse;
42✔
727
    try {
42✔
728
      ctx ??= {};
42✔
729
      ctx.subject = subject;
42✔
730
      ctx.resources = [];
42✔
731
      acsResponse = await checkAccessRequest(ctx, [{ resource: 'job' }], AuthZAction.READ,
42✔
732
        Operation.whatIsAllowed) as PolicySetRQResponse;
42✔
733
    } catch (err: any) {
42!
734
      return this.catchOperationStatus(err, 'Error requesting access-control-srv for read operation');
×
735
    }
×
736

737
    if (acsResponse.decision !== Response_Decision.PERMIT) {
42✔
738
      return { operation_status: acsResponse.operation_status };
3✔
739
    }
3✔
740

741
    let result: Array<BullJob>;
39✔
742
    if (_.isEmpty(request) || _.isEmpty(request.filter)
39✔
743
      && (!request.filter || !request.filter.job_ids
24!
744
        || _.isEmpty(request.filter.job_ids))
✔
745
      && (!request.filter || !request.filter.type ||
24!
746
        _.isEmpty(request.filter.type))) {
42✔
747
      result = await this._getJobList();
24✔
748
      const custom_arguments = acsResponse.custom_query_args?.[0];
24✔
749
      result = this.filterByOwnerShip(custom_arguments, result);
24✔
750
    } else {
42✔
751
      result = new Array<BullJob>();
15✔
752
      const jobIDs = Array.isArray(request?.filter?.job_ids) ? request.filter.job_ids : request.filter.job_ids ? [request.filter.job_ids] : [];
15!
753
      const typeFilterName = request.filter.type;
15✔
754

755
      // Search in all the queues and retrieve jobs after JobID
756
      // and add them to the jobIDsCopy list.
757
      // If the job is not found in any of the queues, continue looking
758
      // Finally compare the two lists and add an error to status for which
759
      // job could not be found in any of the queues.
760
      if (jobIDs?.length > 0) {
15✔
761
        // jobIDsCopy should contain the jobIDs duplicate values
762
        // after the for loop ends
763
        const jobIDsCopy: string[] = [];
14✔
764
        for (let jobID of jobIDs ?? []) {
14!
765
          const jobIdData = await this.getRedisValue(jobID as string);
14✔
766
          // future jobs scheduled with `when` will have same repeatId as external SCS jobID
767
          if (jobIdData?.repeatId && (jobIdData.repeatId != jobID)) {
14✔
768
            const repeatId = jobIdData.repeatId;
1✔
769
            if (jobIdData?.options?.repeat?.cron && jobIdData?.options?.repeat?.every) {
1!
770
              jobListResponse.items.push({
×
771
                status: {
×
772
                  id: jobID.toString(),
×
773
                  code: 400,
×
774
                  message: 'Both .cron and .every options are defined for this repeatable job'
×
775
                }
×
776
              });
×
777
              continue;
×
778
            }
×
779
            const nextMillis = this.getNextMillis(Date.now(), jobIdData.options.repeat);
1✔
780
            this.logger?.debug('Repeatable job identifier', { id: jobID, repeatId: `repeat:${repeatId}:${nextMillis}` });
1!
781
            // map the repeatKey with nextmilis for bull repeatable jobID
782
            jobID = `repeat:${repeatId}:${nextMillis}`;
1✔
783
          }
1✔
784
          for (const queue of this.queuesList) {
14✔
785
            await new Promise((resolve, reject) => {
39✔
786
              // getJob returns job or null
787
              queue.getJob(jobID).then(job => {
39✔
788
                if (job?.opts?.repeat?.pattern) {
39!
789
                  (job.opts.repeat as any).cron = job.opts.repeat.pattern;
×
790
                  delete job.opts.repeat.pattern;
×
791
                }
×
792
                resolve(job);
39✔
793
                if (!_.isEmpty(job)) {
39✔
794
                  result.push(job);
12✔
795
                  if ((job as any)?.opts?.repeat?.jobId) {
12!
796
                    jobIDsCopy.push((job as any).opts.repeat.jobId);
×
797
                  } else {
12✔
798
                    jobIDsCopy.push(jobID);
12✔
799
                  }
12✔
800
                }
12✔
801
              }).catch(err => {
39✔
802
                jobListResponse.items.push({
×
803
                  status: this.catchStatus(jobID.toString(), err, `Error reading job ${jobID}`)
×
804
                });
×
805
              });
39✔
806
            });
39✔
807
          }
39✔
808
        }
13✔
809
        if (!_.isEqual(jobIDs.sort(), jobIDsCopy.sort())) {
14✔
810
          const jobIDsDiff = _.difference(jobIDs, jobIDsCopy);
1✔
811
          for (const jobId of jobIDsDiff) {
1✔
812
            jobListResponse.items.push({
1✔
813
              status: {
1✔
814
                id: jobId.toString(),
1✔
815
                code: 404,
1✔
816
                message: `Job ID ${jobId} not found in any of the queues`
1✔
817
              }
1✔
818
            });
1✔
819
          }
1✔
820
        }
1✔
821
      } else {
15✔
822
        try {
1✔
823
          let jobsList: any[] = [];
1✔
824
          for (const queue of this.queuesList) {
1✔
825
            const getJobsResult = await queue.getJobs(['active', 'delayed', 'repeat']);
3✔
826
            getJobsResult.forEach((job) => {
3✔
827
              if (job?.opts?.repeat?.pattern) {
6!
828
                (job.opts.repeat as any).cron = job.opts.repeat.pattern;
×
829
                delete job.opts.repeat.pattern;
×
830
              }
×
831
            });
3✔
832
            jobsList = jobsList.concat(getJobsResult);
3✔
833
          }
3✔
834
          result = jobsList;
1✔
835
        } catch (err: any) {
1!
836
          return this.catchOperationStatus(err, 'Error reading jobs');
×
837
        }
×
838
      }
1✔
839

840
      if (typeFilterName) {
15✔
841
        result = result.filter(job => job?.name === typeFilterName);
2✔
842
      }
2✔
843
      const custom_arguments = acsResponse.custom_query_args?.[0]?.custom_arguments;
14✔
844
      result = this.filterByOwnerShip(custom_arguments, result);
15✔
845
    }
15✔
846

847
    result = result.filter(valid => !!valid);
38✔
848

849
    if (!_.isEmpty(request) && !_.isEmpty(request.sort)
38✔
850
      && _.includes(['ASCENDING', 'DESCENDING'], request.sort)) {
42✔
851
      let sort: boolean | "desc" | "asc";
2✔
852
      switch (request.sort) {
2✔
853
        case JobReadRequest_SortOrder.DESCENDING:
2✔
854
          sort = 'desc';
1✔
855
          break;
1✔
856
        case JobReadRequest_SortOrder.ASCENDING:
2✔
857
          sort = 'asc';
1✔
858
          break;
1✔
859
        default:
2!
860
          this.logger?.error(`Unknown sort option ${request.sort}`);
×
861
      }
2✔
862
      result = _.orderBy(result, ['id'], [sort]);
2✔
863
    }
2✔
864

865
    for (const job of result) {
42✔
866
      const jobId = job.id as string;
61✔
867
      if (jobId.startsWith('repeat:')) {
61✔
868
        const repeatKey = jobId.split(':')[1];
20✔
869
        // it could be possible the redis repeat key is deleted on index 8 and old completed
870
        // jobs exist in data_store if delete on complete was not set to true for repeatable jobs
871
        const jobRedisId = await this.getRedisValue(repeatKey);
20✔
872
        if (jobRedisId) {
20✔
873
          job.id = jobRedisId;
20✔
874
        }
20✔
875
      }
20✔
876
    }
61✔
877

878
    for (const job of result) {
42✔
879
      const when = job?.opts?.delay ? new Date(job?.opts?.delay).toString() : '';
61!
880
      jobListResponse.items.push({
61✔
881
        payload: {
61✔
882
          id: job.id as string,
61✔
883
          type: job.name,
61✔
884
          queue_name: job.queueName,
61✔
885
          data: _filterJobData(job.data, true, this.logger),
61✔
886
          options: _filterJobOptions(job.opts) as any,
61✔
887
          when
61✔
888
        },
61✔
889
        status: {
61✔
890
          id: job.id.toString(),
61✔
891
          code: 200,
61✔
892
          message: 'success'
61✔
893
        }
61✔
894
      });
61✔
895
    }
61✔
896
    jobListResponse.total_count = jobListResponse?.items?.length;
38✔
897
    jobListResponse.operation_status = {
42✔
898
      code: 200,
42✔
899
      message: 'success'
42✔
900
    };
42✔
901
    return jobListResponse;
42✔
902
  }
42✔
903

904
  async _getJobList(): Promise<BullJob[]> {
2✔
905
    let jobsList: any[] = [];
24✔
906
    for (const queue of this.queuesList) {
24✔
907
      const getJobsResult = await queue.getJobs(['active', 'delayed', 'repeat']);
72✔
908
      getJobsResult.forEach((job) => {
72✔
909
        if (job?.opts?.repeat?.pattern) {
57!
910
          (job.opts.repeat as any).cron = job.opts.repeat.pattern;
×
911
          delete job.opts.repeat.pattern;
×
912
        }
×
913
      });
72✔
914
      jobsList = jobsList.concat(getJobsResult);
72✔
915
    }
72✔
916
    return jobsList;
24✔
917
  }
24✔
918

919
  // delete a job by its job instance after processing 'jobDone' / 'jobFailed'
920
  async _deleteJobInstance(jobId: string, queue: Queue): Promise<void> {
2✔
921
    return this._removeBullJob(jobId, queue);
×
922
  }
×
923

924
  /**
925
   * Delete Job from queue.
926
   */
927
  async delete(request: DeleteRequest, ctx: any): Promise<DeleteResponse> {
2✔
928
    const deleteResponse: DeleteResponse = { status: [], operation_status: { code: 0, message: '' } };
9✔
929
    if (_.isEmpty(request)) {
9!
930
      return {
×
931
        operation_status: {
×
932
          code: 400,
×
933
          message: 'No arguments provided for delete operation'
×
934
        }
×
935
      };
×
936
    }
×
937

938
    try {
9✔
939
      const subject = request?.subject;
9✔
940
      const jobIDs = request?.ids;
9✔
941
      let resources = new Array<DeepPartial<CtxResource>>();
9✔
942
      let action;
9✔
943
      if (jobIDs) {
9✔
944
        action = AuthZAction.DELETE;
4✔
945
        if (_.isArray(jobIDs)) {
4✔
946
          for (const id of jobIDs) {
4✔
947
            resources.push({ id });
4✔
948
          }
4✔
949
        } else {
4!
950
          resources = [{ id: jobIDs }];
×
951
        }
×
952
        await this.createMetadata(resources, action, subject);
4✔
953
      }
4✔
954
      if (request.collection) {
9✔
955
        action = AuthZAction.DROP;
5✔
956
        resources = [{ collection: request.collection }];
5✔
957
      }
5✔
958
      let acsResponse: DecisionResponse;
9✔
959
      try {
9✔
960
        if (!ctx) { ctx = {}; };
9!
961
        ctx.subject = subject;
9✔
962
        ctx.resources = resources;
9✔
963
        acsResponse = await checkAccessRequest(
9✔
964
          ctx, [{ resource: 'job', id: jobIDs as string[] }],
9✔
965
          action,
9✔
966
          Operation.isAllowed
9✔
967
        );
9✔
968
      } catch (err: any) {
9!
969
        return this.catchOperationStatus(err, 'Error requesting access-control-srv for delete operation');
×
970
      }
×
971
      if (acsResponse.decision != Response_Decision.PERMIT) {
9✔
972
        return {
1✔
973
          status: [],
1✔
974
          operation_status: acsResponse.operation_status
1✔
975
        };
1✔
976
      }
1✔
977
      const dispatch = [];
8✔
978
      this.logger?.info('Received delete request');
9✔
979
      if ('collection' in request && request.collection) {
9✔
980
        this.logger?.info('Deleting all jobs');
4✔
981
        for (const queue of this.queuesList || []) {
4!
982
          const jobs = await queue.getJobs(['paused', 'repeat', 'wait', 'active', 'delayed',
12✔
983
            'prioritized', 'waiting', 'waiting-children', 'completed', 'failed']);
12✔
984
          for (const job of jobs || []) {
12!
985
            if (!job) {
22✔
986
              continue;
2✔
987
            }
2✔
988
            let deleted;
20✔
989
            if (job?.repeatJobKey) {
22✔
990
              deleted = await queue.removeJobScheduler(job?.repeatJobKey);
8✔
991
            } else {
22✔
992
              const id = job.key ? job.key : job.id;
12!
993
              deleted = await queue.removeJobScheduler(id);
12✔
994
            }
12✔
995
            this.logger?.info('Job deleted with key', { key: job.key, name: job.name });
22✔
996
            const jobIdentifier = job.id ? job.id : job.name;
22!
997
            if (this.resourceEventsEnabled) {
22✔
998
              dispatch.push(this.jobEvents.emit('jobsDeleted', { id: jobIdentifier }));
20✔
999
            }
20✔
1000
            deleteResponse.status.push({
20✔
1001
              id: jobIdentifier,
20✔
1002
              code: 200,
20✔
1003
              message: 'success'
20✔
1004
            });
20✔
1005
          }
20✔
1006
        }
12✔
1007
        // FLUSH redis DB index 8 used for mapping of repeat jobIds (since req is for dropping job collection)
1008
        const delResp = await this.repeatJobIdRedisClient.flushDb();
4✔
1009
        if (delResp) {
4✔
1010
          this.logger?.info('Mapped keys for repeatable jobs deleted successfully');
4✔
1011
        } else {
4!
1012
          this.logger?.info('Could not delete repeatable job keys');
×
1013
        }
×
1014
        await this.clear();
4✔
1015
      } else if ('ids' in request) {
4✔
1016
        this.logger?.info('Deleting jobs by their IDs', { id: request.ids });
4✔
1017

1018
        for (const queue of this.queuesList) {
4✔
1019
          for (const jobDataKey of request.ids) {
12✔
1020
            let callback: Promise<boolean>;
12✔
1021
            const jobIdData = await this.getRedisValue(jobDataKey as string);
12✔
1022
            // future jobs scheduled with `when` will have same repeatId as external SCS jobID
1023
            if (jobIdData?.repeatId && (jobIdData.repeatId != jobDataKey)) {
12✔
1024
              const jobs = await queue.getJobSchedulers();
2✔
1025
              for (const job of jobs) {
2✔
1026
                if (job?.key === jobIdData.repeatId) {
1✔
1027
                  this.logger?.info('Removing Repeatable job by key', { key: job.key, name: job.name, id: jobDataKey });
1✔
1028
                  callback = queue.removeJobScheduler(job.key);
1✔
1029
                  deleteResponse.status.push({
1✔
1030
                    id: jobDataKey,
1✔
1031
                    code: 200,
1✔
1032
                    message: 'success'
1✔
1033
                  });
1✔
1034
                  await this.deleteRedisKey(jobDataKey as string);
1✔
1035
                  await this.deleteRedisKey(jobIdData.repeatId);
1✔
1036
                  break;
1✔
1037
                }
1✔
1038
              }
1✔
1039
            } else {
12✔
1040
              callback = queue.getJob(jobDataKey).then(async (jobData) => {
10✔
1041
                if (jobData) {
10✔
1042
                  try {
3✔
1043
                    await this._removeBullJob(jobData.id, queue);
3✔
1044
                    await this.deleteRedisKey(jobData.id);
3✔
1045
                    deleteResponse.status.push({
3✔
1046
                      id: jobData.id.toString(),
3✔
1047
                      code: 200,
3✔
1048
                      message: 'success'
3✔
1049
                    });
3✔
1050
                  } catch (err: any) {
3!
1051
                    deleteResponse.status.push(
×
1052
                      this.catchStatus(jobData.id.toString(), err)
×
1053
                    );
×
1054
                    return false;
×
1055
                  }
×
1056
                  return true;
3✔
1057
                }
3✔
1058
                return false;
7✔
1059
              });
10✔
1060
            }
10✔
1061

1062
            // since no CB is returned for removeRepeatableByKey by bull
1063
            if (!callback) {
12✔
1064
              if (this.resourceEventsEnabled) {
1✔
1065
                dispatch.push(this.jobEvents.emit(
1✔
1066
                  'jobsDeleted', { id: jobDataKey })
1✔
1067
                );
1✔
1068
              }
1✔
1069
            } else {
12✔
1070
              callback.then(() => {
11✔
1071
                if (this.resourceEventsEnabled) {
11✔
1072
                  dispatch.push(this.jobEvents.emit(
11✔
1073
                    'jobsDeleted', { id: jobDataKey })
11✔
1074
                  );
11✔
1075
                }
11✔
1076
              }).catch(err => {
11✔
1077
                deleteResponse.status.push(
×
1078
                  this.catchStatus(jobDataKey.toString(), err, 'Error deleting job')
×
1079
                );
×
1080
              });
11✔
1081
            }
11✔
1082
          }
12✔
1083
        }
12✔
1084
      }
4✔
1085

1086
      await Promise.all(dispatch);
8✔
1087
      deleteResponse.operation_status = { code: 200, message: 'success' };
8✔
1088
    } catch (err: any) {
9!
1089
      this.catchOperationStatus(err);
×
1090
    }
✔
1091
    return deleteResponse;
8✔
1092
  }
9✔
1093

1094
  /**
1095
   * Clean up queues - removes complted and failed jobs from queue
1096
   * @param {any} job clean up job
1097
   */
1098
  async cleanupJobs(ttlAfterFinished: number, maxJobsToCleanLimit: number) {
2✔
1099
    for (const queue of this.queuesList) {
×
1100
      try {
×
1101
        await queue.clean(ttlAfterFinished, maxJobsToCleanLimit, COMPLETED_JOB_STATE);
×
1102
        await queue.clean(ttlAfterFinished, maxJobsToCleanLimit, FAILED_JOB_STATE);
×
1103
      } catch (err) {
×
1104
        this.logger?.error('Error cleaning up jobs', decomposeError(err));
×
1105
      }
×
1106
    }
×
1107
    this.logger?.info('Jobs cleaned up successfully');
×
1108
    const lastExecutedInterval = { lastExecutedInterval: (new Date()).toString() };
×
1109
    await this.repeatJobIdRedisClient.set(QUEUE_CLEANUP, JSON.stringify(lastExecutedInterval));
×
1110
  }
×
1111

1112
  async setupCleanInterval(cleanInterval: number, ttlAfterFinished: number, maxJobsToCleanLimit: number) {
2✔
1113
    if (!ttlAfterFinished) {
2!
1114
      ttlAfterFinished = DEFAULT_CLEANUP_COMPLETED_JOBS;
×
1115
    }
×
1116
    const intervalData = await this.getRedisValue(QUEUE_CLEANUP);
2✔
1117
    let timeInMs, delta;
2✔
1118
    const now = new Date().getTime();
2✔
1119
    if (intervalData?.lastExecutedInterval && typeof (intervalData.lastExecutedInterval) === 'string') {
2!
1120
      timeInMs = new Date(intervalData.lastExecutedInterval).getTime();
×
1121
      this.logger?.debug('Previous execution interval', intervalData);
×
1122
      delta = now - timeInMs;
×
1123
      this.logger?.debug('Clean interval and previous difference', { cleanInterval, difference: delta });
×
1124
    }
×
1125

1126
    if (delta && (delta < cleanInterval)) {
2!
1127
      // use setTimeout and then create interval on setTimeout
1128
      this.logger?.info('Restoring previous execution interval with set timeout', { time: cleanInterval - delta });
×
1129
      setTimeout(async () => {
×
1130
        await this.cleanupJobs(ttlAfterFinished, maxJobsToCleanLimit);
×
1131
        setInterval(this.cleanupJobs.bind(this), cleanInterval, ttlAfterFinished, maxJobsToCleanLimit);
×
1132
      }, cleanInterval - delta);
×
1133
    } else {
2✔
1134
      setInterval(this.cleanupJobs.bind(this), cleanInterval, ttlAfterFinished, maxJobsToCleanLimit);
2✔
1135
      this.logger?.info('Clean up job interval set successfully');
2✔
1136
    }
2✔
1137
  }
2✔
1138

1139
  /**
1140
   * Reschedules a job - deletes it and recreates it with a new generated ID.
1141
   */
1142
  async update(request: JobList, ctx: any): Promise<DeepPartial<JobListResponse>> {
2✔
1143
    const subject = request.subject;
4✔
1144
    // update meta data for owners information
1145
    await this.createMetadata(request.items, AuthZAction.MODIFY, subject);
4✔
1146
    let acsResponse: DecisionResponse;
4✔
1147
    try {
4✔
1148
      if (!ctx) { ctx = {}; };
4!
1149
      ctx.subject = subject;
4✔
1150
      ctx.resources = request?.items?.map((job) => {
4✔
1151
        const { data, ...resource } = job;
4✔
1152
        return resource;
4✔
1153
      });
4✔
1154
      acsResponse = await checkAccessRequest(ctx,
4✔
1155
        [{ resource: 'job', id: request.items.map(item => item.id) }],
4✔
1156
        AuthZAction.MODIFY, Operation.isAllowed);
4✔
1157
    } catch (err: any) {
4!
1158
      return this.catchOperationStatus('Error requesting access-control-srv for update operation', err);
×
1159
    }
×
1160
    if (acsResponse.decision != Response_Decision.PERMIT) {
4✔
1161
      return { operation_status: acsResponse.operation_status };
1✔
1162
    }
1✔
1163
    if (_.isNil(request) || _.isNil(request.items)) {
4!
1164
      return {
×
1165
        operation_status: {
×
1166
          code: 400,
×
1167
          message: 'Missing items in update request'
×
1168
        }
×
1169
      };
×
1170
    }
✔
1171

1172
    const mappedJobs = request?.items?.reduce((obj, job) => {
4✔
1173
      obj[job.id] = job;
3✔
1174
      return obj;
3✔
1175
    }, {} as Record<string, Job>);
4✔
1176

1177
    const jobData = await this.read(JobReadRequest.fromPartial(
4✔
1178
      {
4✔
1179
        filter: {
4✔
1180
          job_ids: Object.keys(mappedJobs)
4✔
1181
        },
4✔
1182
        subject
4✔
1183
      }
4✔
1184
    ), ctx);
4✔
1185

1186
    await this.delete(DeleteRequest.fromPartial({
3✔
1187
      ids: Object.keys(mappedJobs),
3✔
1188
      subject
3✔
1189
    }), {});
3✔
1190

1191
    const result = new Array<Job>();
3✔
1192

1193
    jobData?.items?.forEach(async (job) => {
4✔
1194
      const mappedJob = mappedJobs[job?.payload?.id];
3✔
1195
      const endJob = {
3✔
1196
        id: mappedJob.id,
3✔
1197
        type: mappedJob.type,
3✔
1198
        queue_name: job?.payload?.queue_name,
3✔
1199
        options: {
3✔
1200
          ...job.payload.options,
3✔
1201
          ...(mappedJob.options ? mappedJob.options : {})
3!
1202
        },
3✔
1203
        data: mappedJob.data || job.payload.data,
3!
1204
        when: mappedJob.when,
3✔
1205
      };
3✔
1206

1207
      if (endJob.when && endJob.options) {
3✔
1208
        delete (endJob.options as any).delay;
3✔
1209
      }
3✔
1210

1211
      result.push(endJob);
3✔
1212
    });
4✔
1213

1214
    return this.create(JobList.fromPartial({
4✔
1215
      items: result,
4✔
1216
      subject
4✔
1217
    }), ctx);
4✔
1218
  }
4✔
1219

1220
  /**
1221
   * Upserts a job - creates a new job if it does not exist or update the
1222
   * existing one if it already exists.
1223
   */
1224
  async upsert(request: JobList, ctx: any): Promise<DeepPartial<JobListResponse>> {
2✔
1225
    const subject = request.subject;
2✔
1226
    await this.createMetadata(request.items, AuthZAction.MODIFY, subject);
2✔
1227
    let acsResponse: DecisionResponse;
2✔
1228
    try {
2✔
1229
      if (!ctx) { ctx = {}; };
2!
1230
      ctx.subject = subject;
2✔
1231
      ctx.resources = request?.items?.map((job) => {
2✔
1232
        const { data, ...resource } = job;
2✔
1233
        return resource;
2✔
1234
      });
2✔
1235
      acsResponse = await checkAccessRequest(ctx,
2✔
1236
        [{ resource: 'job', id: request.items.map(item => item.id) }],
2✔
1237
        AuthZAction.MODIFY, Operation.isAllowed);
2✔
1238
    } catch (err: any) {
2!
1239
      return this.catchOperationStatus(err, 'Error requesting access-control-srv for upsert operation')
×
1240
    }
×
1241

1242
    if (acsResponse.decision != Response_Decision.PERMIT) {
2✔
1243
      return { operation_status: acsResponse.operation_status };
1✔
1244
    }
1✔
1245
    if (_.isNil(request) || _.isNil(request.items)) {
2!
1246
      return { operation_status: { code: 400, message: 'Missing items in upsert request' } };
×
1247
    }
✔
1248

1249
    const result = new Array<JobResponse>;
1✔
1250
    for (const eachJob of request.items) {
1✔
1251
      let jobExists = false;
1✔
1252
      const origJobId = _.cloneDeep(eachJob.id);
1✔
1253
      for (const queue of this.queuesList) {
1✔
1254
        const jobIdData = await this.getRedisValue(eachJob.id as string);
2✔
1255
        // future jobs scheduled with `when` will have same repeatId as external SCS jobID
1256
        if (jobIdData?.repeatId && (jobIdData.repeatId != origJobId)) {
2!
1257
          const repeatId = jobIdData.repeatId;
×
1258
          if (jobIdData?.options?.repeat?.cron && jobIdData?.options?.repeat?.every) {
×
1259
            result.push({
×
1260
              status: {
×
1261
                id: origJobId,
×
1262
                code: 400,
×
1263
                message: 'Both .cron and .every options are defined for this repeatable job'
×
1264
              }
×
1265
            });
×
1266
            continue;
×
1267
          }
×
1268
          const nextMillis = this.getNextMillis(Date.now(), jobIdData.options.repeat);
×
1269
          this.logger?.debug('Repeatable job identifier', { id: eachJob.id, repeatId: `repeat:${repeatId}:${nextMillis}` });
×
1270
          // map the repeatKey with nextmilis for bull repeatable jobID
1271
          eachJob.id = `repeat:${repeatId}:${nextMillis}`;
×
1272
        }
×
1273
        const jobInst = await queue.getJob(eachJob.id);
2✔
1274
        if (jobInst) {
2✔
1275
          // existing job update it with the given job identifier
1276
          if (eachJob.id.startsWith('repeat:')) {
1!
1277
            eachJob.id = origJobId;
×
1278
          }
×
1279
          result.push(
1✔
1280
            ...((await this.update(JobList.fromPartial({ items: [eachJob], subject }), ctx))?.items ?? [])
1!
1281
          );
1✔
1282
          jobExists = true;
1✔
1283
          break;
1✔
1284
        }
1✔
1285
      }
2✔
1286
      if (!jobExists) {
1!
1287
        // new job create it
1288
        result.push(
×
1289
          ...((await this.create(JobList.fromPartial({ items: [eachJob], subject }), ctx))?.items ?? [])
×
1290
        );
×
1291
      }
×
1292
    }
1✔
1293

1294
    return {
1✔
1295
      items: result,
1✔
1296
      total_count: result.length,
1✔
1297
      operation_status: {
1✔
1298
        code: 200,
1✔
1299
        message: 'success'
1✔
1300
      }
1✔
1301
    };
1✔
1302
  }
2✔
1303

1304
  /**
1305
   * Clear all job data.
1306
   */
1307
  async clear(): Promise<any> {
2✔
1308
    let allJobs: any[] = [];
6✔
1309
    for (const queue of this.queuesList) {
6✔
1310
      allJobs = allJobs.concat(await queue.getJobs(['paused', 'repeat', 'wait', 'active', 'delayed',
18✔
1311
        'prioritized', 'waiting', 'waiting-children', 'completed', 'failed']));
18✔
1312
    }
18✔
1313
    return Promise.all(allJobs.map(async (job) => job?.remove())).catch(err => {
6✔
1314
      this.logger?.error(`Error clearing jobs`, decomposeError(err));
×
1315
      throw err;
×
1316
    });
6✔
1317
  }
6✔
1318

1319
  async _removeBullJob(jobInstID: string, queue: Queue): Promise<void> {
2✔
1320
    return queue.getJob(jobInstID).then(job => {
3✔
1321
      if (job) {
3✔
1322
        return job.remove();
3✔
1323
      }
3✔
1324
    }).then(() => {
3✔
1325
      this.logger?.info(`Job#${jobInstID} removed`);
3✔
1326
    }).catch(err => {
3✔
1327
      this.logger?.error(`Error removing job ${jobInstID}`, decomposeError(err));
×
1328
      throw err;
×
1329
    });
3✔
1330
  }
3✔
1331

1332
  /**
1333
   * reads meta data from DB and updates owners information in resource if action is UPDATE / DELETE
1334
   * @param resources list of resources
1335
   * @param action resource action
1336
   * @param subject subject name
1337
   */
1338
  async createMetadata(resources: any, action: string, subject: Subject): Promise<any> {
2✔
1339
    const orgOwnerAttributes: Attribute[] = [];
23✔
1340
    if (resources && !_.isArray(resources)) {
23!
1341
      resources = [resources];
×
1342
    }
×
1343
    const urns = this.cfg.get('authorization:urns');
23✔
1344
    if (subject?.scope && (action === AuthZAction.CREATE || action === AuthZAction.MODIFY)) {
23✔
1345
      // add subject scope as default owners
1346
      orgOwnerAttributes.push(
19✔
1347
        {
19✔
1348
          id: urns?.ownerIndicatoryEntity,
19✔
1349
          value: urns?.organization,
19✔
1350
          attributes: [{
19✔
1351
            id: urns?.ownerInstance,
19✔
1352
            value: subject?.scope,
19✔
1353
            attributes: []
19✔
1354
          }]
19✔
1355
        });
19✔
1356
    }
19✔
1357

1358
    if (resources?.length > 0) {
23✔
1359
      for (const resource of resources) {
23✔
1360
        if (!resource.data) {
29✔
1361
          resource.data = { meta: {} };
4✔
1362
        } else if (!resource.data.meta) {
29✔
1363
          resource.data.meta = {};
18✔
1364
        }
18✔
1365
        if (resource?.id && (action === AuthZAction.MODIFY || action === AuthZAction.DELETE)) {
29✔
1366
          let result;
10✔
1367
          try {
10✔
1368
            result = await this.read(JobReadRequest.fromPartial({
10✔
1369
              filter: {
10✔
1370
                job_ids: [resource.id]
10✔
1371
              },
10✔
1372
              subject
10✔
1373
            }), {});
10✔
1374
          } catch (error: any) {
10✔
1375
            if (error.message?.startsWith('Error! Jobs not found in any of the queues') && action != AuthZAction.DELETE) {
1!
1376
              this.logger?.debug('New job should be created', { jobId: resource.id });
×
1377
              result = { items: [] };
×
1378
            } else {
1✔
1379
              this.logger?.error(`Error reading job with resource ID ${resource.id}`, decomposeError(error));
1✔
1380
            }
1✔
1381
          }
1✔
1382
          // update owners info
1383
          if (result?.items?.length === 1 && result?.items[0]?.payload) {
10✔
1384
            const item = result.items[0].payload;
7✔
1385
            resource.data.meta.owners = item.data.meta.owners;
7✔
1386
            // adding meta to resource root (needed by access-contorl-srv for owners information check)
1387
            // meta is inside data of resource since the data is persisted in redis using bull
1388
            resource.meta = { owners: item.data.meta.owners };
7✔
1389
          } else if ((!result || !result.items || !result.items[0] || !result.items[0].payload) && action === AuthZAction.MODIFY) {
10!
1390
            // job does not exist - create new job (ex: Upsert with action modify)
1391
            const ownerAttributes = _.cloneDeep(orgOwnerAttributes);
2✔
1392
            // add user as default owners
1393
            ownerAttributes.push(
2✔
1394
              {
2✔
1395
                id: urns?.ownerIndicatoryEntity,
2✔
1396
                value: urns?.user,
2✔
1397
                attributes: [{
2✔
1398
                  id: urns?.ownerInstance,
2✔
1399
                  value: subject?.id,
2✔
1400
                  attributes: []
2✔
1401
                }]
2✔
1402
              });
2✔
1403
            resource.data.meta.owners = ownerAttributes;
2✔
1404
            resource.meta = { owners: ownerAttributes };
2✔
1405
          }
2✔
1406
        } else if ((action === AuthZAction.CREATE || !resource.id) && !resource.data.meta.owners) {
29!
1407
          const ownerAttributes = _.cloneDeep(orgOwnerAttributes);
16✔
1408
          if (!resource.id) {
16✔
1409
            resource.id = uuid.v4().replace(/-/g, '');
14✔
1410
          }
14✔
1411
          // add user as default owners
1412
          if (resource.id) {
16✔
1413
            ownerAttributes.push(
16✔
1414
              {
16✔
1415
                id: urns.ownerIndicatoryEntity,
16✔
1416
                value: urns.user,
16✔
1417
                attributes: [{
16✔
1418
                  id: urns.ownerInstance,
16✔
1419
                  value: subject?.id,
16✔
1420
                  attributes: []
16✔
1421
                }]
16✔
1422
              });
16✔
1423
          }
16✔
1424
          resource.data.meta.owners = ownerAttributes;
16✔
1425
          resource.meta = { owners: ownerAttributes };
16✔
1426
        } else if (action === AuthZAction.CREATE && resource?.data?.meta?.owners) {
19✔
1427
          resource.meta = { owners: resource.data.meta.owners };
3✔
1428
        }
3✔
1429
      }
29✔
1430
    }
23✔
1431
    return resources;
23✔
1432
  }
23✔
1433
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc