• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taskforcesh / bullmq / 10173777761

31 Jul 2024 03:54AM CUT coverage: 89.166%. Remained the same
10173777761

push

github

web-flow
test(clean): fix flaky test (#2680)

1157 of 1392 branches covered (83.12%)

Branch coverage included in aggregate %.

2234 of 2411 relevant lines covered (92.66%)

2934.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.78
/src/classes/queue-getters.ts
1
/*eslint-env node */
2
'use strict';
3

4
import { QueueBase } from './queue-base';
1✔
5
import { Job } from './job';
1✔
6
import { clientCommandMessageReg, QUEUE_EVENT_SUFFIX } from '../utils';
1✔
7
import { JobState, JobType } from '../types';
8
import { JobJsonRaw, Metrics } from '../interfaces';
9

10
/**
11
 *
12
 * @class QueueGetters
13
 * @extends QueueBase
14
 *
15
 * @description Provides different getters for different aspects of a queue.
16
 */
17
export class QueueGetters<
1✔
18
  DataType,
19
  ResultType,
20
  NameType extends string,
21
> extends QueueBase {
22
  getJob(
23
    jobId: string,
24
  ): Promise<Job<DataType, ResultType, NameType> | undefined> {
25
    return this.Job.fromId(this, jobId) as Promise<
197✔
26
      Job<DataType, ResultType, NameType>
27
    >;
28
  }
29

30
  private commandByType(
31
    types: JobType[],
32
    count: boolean,
33
    callback: (key: string, dataType: string) => void,
34
  ) {
35
    return types.map((type: string) => {
40✔
36
      type = type === 'waiting' ? 'wait' : type; // alias
61✔
37

38
      const key = this.toKey(type);
61✔
39

40
      switch (type) {
61✔
41
        case 'completed':
42
        case 'failed':
43
        case 'delayed':
44
        case 'prioritized':
45
        case 'repeat':
46
        case 'waiting-children':
47
          return callback(key, count ? 'zcard' : 'zrange');
32!
48
        case 'active':
49
        case 'wait':
50
        case 'paused':
51
          return callback(key, count ? 'llen' : 'lrange');
29!
52
      }
53
    });
54
  }
55

56
  /**
57
   * Helper to easily extend Job class calls.
58
   */
59
  protected get Job(): typeof Job {
60
    return Job;
3,868✔
61
  }
62

63
  private sanitizeJobTypes(types: JobType[] | JobType | undefined): JobType[] {
64
    const currentTypes = typeof types === 'string' ? [types] : types;
207✔
65

66
    if (Array.isArray(currentTypes) && currentTypes.length > 0) {
207✔
67
      const sanitizedTypes = [...currentTypes];
203✔
68

69
      if (sanitizedTypes.indexOf('waiting') !== -1) {
203✔
70
        sanitizedTypes.push('paused');
75✔
71
      }
72

73
      return [...new Set(sanitizedTypes)];
203✔
74
    }
75

76
    return [
4✔
77
      'active',
78
      'completed',
79
      'delayed',
80
      'failed',
81
      'paused',
82
      'prioritized',
83
      'waiting',
84
      'waiting-children',
85
    ];
86
  }
87

88
  /**
89
    Returns the number of jobs waiting to be processed. This includes jobs that are
90
    "waiting" or "delayed" or "prioritized" or "waiting-children".
91
  */
92
  async count(): Promise<number> {
93
    const count = await this.getJobCountByTypes(
48✔
94
      'waiting',
95
      'paused',
96
      'delayed',
97
      'prioritized',
98
      'waiting-children',
99
    );
100

101
    return count;
48✔
102
  }
103

104
  /**
105
   * Returns the time to live for a rate limited key in milliseconds.
106
   * @param maxJobs - max jobs to be considered in rate limit state. If not passed
107
   * it will return the remaining ttl without considering if max jobs is excedeed.
108
   * @returns -2 if the key does not exist.
109
   * -1 if the key exists but has no associated expire.
110
   * @see {@link https://redis.io/commands/pttl/}
111
   */
112
  async getRateLimitTtl(maxJobs?: number): Promise<number> {
113
    return this.scripts.getRateLimitTtl(maxJobs);
35✔
114
  }
115

116
  /**
117
   * Job counts by type
118
   *
119
   * Queue#getJobCountByTypes('completed') => completed count
120
   * Queue#getJobCountByTypes('completed,failed') => completed + failed count
121
   * Queue#getJobCountByTypes('completed', 'failed') => completed + failed count
122
   * Queue#getJobCountByTypes('completed', 'waiting', 'failed') => completed + waiting + failed count
123
   */
124
  async getJobCountByTypes(...types: JobType[]): Promise<number> {
125
    const result = await this.getJobCounts(...types);
123✔
126
    return Object.values(result).reduce((sum, count) => sum + count, 0);
344✔
127
  }
128

129
  /**
130
   * Returns the job counts for each type specified or every list/set in the queue by default.
131
   *
132
   * @returns An object, key (type) and value (count)
133
   */
134
  async getJobCounts(...types: JobType[]): Promise<{
135
    [index: string]: number;
136
  }> {
137
    const currentTypes = this.sanitizeJobTypes(types);
167✔
138

139
    const responses = await this.scripts.getCounts(currentTypes);
167✔
140

141
    const counts: { [index: string]: number } = {};
167✔
142
    responses.forEach((res, index) => {
167✔
143
      counts[currentTypes[index]] = res || 0;
411✔
144
    });
145

146
    return counts;
167✔
147
  }
148

149
  /**
150
   * Get current job state.
151
   *
152
   * @param jobId - job identifier.
153
   * @returns Returns one of these values:
154
   * 'completed', 'failed', 'delayed', 'active', 'waiting', 'waiting-children', 'unknown'.
155
   */
156
  getJobState(jobId: string): Promise<JobState | 'unknown'> {
157
    return this.scripts.getState(jobId);
3✔
158
  }
159

160
  /**
161
   * Returns the number of jobs in completed status.
162
   */
163
  getCompletedCount(): Promise<number> {
164
    return this.getJobCountByTypes('completed');
6✔
165
  }
166

167
  /**
168
   * Returns the number of jobs in failed status.
169
   */
170
  getFailedCount(): Promise<number> {
171
    return this.getJobCountByTypes('failed');
6✔
172
  }
173

174
  /**
175
   * Returns the number of jobs in delayed status.
176
   */
177
  getDelayedCount(): Promise<number> {
178
    return this.getJobCountByTypes('delayed');
3✔
179
  }
180

181
  /**
182
   * Returns the number of jobs in active status.
183
   */
184
  getActiveCount(): Promise<number> {
185
    return this.getJobCountByTypes('active');
1✔
186
  }
187

188
  /**
189
   * Returns the number of jobs in prioritized status.
190
   */
191
  getPrioritizedCount(): Promise<number> {
192
    return this.getJobCountByTypes('prioritized');
1✔
193
  }
194

195
  /**
196
   * Returns the number of jobs per priority.
197
   */
198
  async getCountsPerPriority(priorities: number[]): Promise<{
199
    [index: string]: number;
200
  }> {
201
    const uniquePriorities = [...new Set(priorities)];
2✔
202
    const responses = await this.scripts.getCountsPerPriority(uniquePriorities);
2✔
203

204
    const counts: { [index: string]: number } = {};
2✔
205
    responses.forEach((res, index) => {
2✔
206
      counts[`${uniquePriorities[index]}`] = res || 0;
8!
207
    });
208

209
    return counts;
2✔
210
  }
211

212
  /**
213
   * Returns the number of jobs in waiting or paused statuses.
214
   */
215
  getWaitingCount(): Promise<number> {
216
    return this.getJobCountByTypes('waiting');
5✔
217
  }
218

219
  /**
220
   * Returns the number of jobs in waiting-children status.
221
   */
222
  getWaitingChildrenCount(): Promise<number> {
223
    return this.getJobCountByTypes('waiting-children');
3✔
224
  }
225

226
  /**
227
   * Returns the jobs that are in the "waiting" status.
228
   * @param start - zero based index from where to start returning jobs.
229
   * @param end - zero based index where to stop returning jobs.
230
   */
231
  getWaiting(
232
    start = 0,
7✔
233
    end = -1,
7✔
234
  ): Promise<Job<DataType, ResultType, NameType>[]> {
235
    return this.getJobs(['waiting'], start, end, true);
8✔
236
  }
237

238
  /**
239
   * Returns the jobs that are in the "waiting-children" status.
240
   * I.E. parent jobs that have at least one child that has not completed yet.
241
   * @param start - zero based index from where to start returning jobs.
242
   * @param end - zero based index where to stop returning jobs.
243
   */
244
  getWaitingChildren(
245
    start = 0,
1✔
246
    end = -1,
1✔
247
  ): Promise<Job<DataType, ResultType, NameType>[]> {
248
    return this.getJobs(['waiting-children'], start, end, true);
1✔
249
  }
250

251
  /**
252
   * Returns the jobs that are in the "active" status.
253
   * @param start - zero based index from where to start returning jobs.
254
   * @param end - zero based index where to stop returning jobs.
255
   */
256
  getActive(
257
    start = 0,
1✔
258
    end = -1,
1✔
259
  ): Promise<Job<DataType, ResultType, NameType>[]> {
260
    return this.getJobs(['active'], start, end, true);
1✔
261
  }
262

263
  /**
264
   * Returns the jobs that are in the "delayed" status.
265
   * @param start - zero based index from where to start returning jobs.
266
   * @param end - zero based index where to stop returning jobs.
267
   */
268
  getDelayed(
269
    start = 0,
11✔
270
    end = -1,
11✔
271
  ): Promise<Job<DataType, ResultType, NameType>[]> {
272
    return this.getJobs(['delayed'], start, end, true);
11✔
273
  }
274

275
  /**
276
   * Returns the jobs that are in the "prioritized" status.
277
   * @param start - zero based index from where to start returning jobs.
278
   * @param end - zero based index where to stop returning jobs.
279
   */
280
  getPrioritized(
281
    start = 0,
1✔
282
    end = -1,
1✔
283
  ): Promise<Job<DataType, ResultType, NameType>[]> {
284
    return this.getJobs(['prioritized'], start, end, true);
1✔
285
  }
286

287
  /**
288
   * Returns the jobs that are in the "completed" status.
289
   * @param start - zero based index from where to start returning jobs.
290
   * @param end - zero based index where to stop returning jobs.
291
   */
292
  getCompleted(
293
    start = 0,
2✔
294
    end = -1,
2✔
295
  ): Promise<Job<DataType, ResultType, NameType>[]> {
296
    return this.getJobs(['completed'], start, end, false);
2✔
297
  }
298

299
  /**
300
   * Returns the jobs that are in the "failed" status.
301
   * @param start - zero based index from where to start returning jobs.
302
   * @param end - zero based index where to stop returning jobs.
303
   */
304
  getFailed(
305
    start = 0,
3✔
306
    end = -1,
3✔
307
  ): Promise<Job<DataType, ResultType, NameType>[]> {
308
    return this.getJobs(['failed'], start, end, false);
4✔
309
  }
310

311
  /**
312
   * Returns the qualified job ids and the raw job data (if available) of the
313
   * children jobs of the given parent job.
314
   * It is possible to get either the already processed children, in this case
315
   * an array of qualified job ids and their result values will be returned,
316
   * or the pending children, in this case an array of qualified job ids will
317
   * be returned.
318
   * A qualified job id is a string representing the job id in a given queue,
319
   * for example: "bull:myqueue:jobid".
320
   *
321
   * @param parentId The id of the parent job
322
   * @param type "processed" | "pending"
323
   * @param opts
324
   *
325
   * @returns  { items: { id: string, v?: any, err?: string } [], jobs: JobJsonRaw[], total: number}
326
   */
327
  async getDependencies(
328
    parentId: string,
329
    type: 'processed' | 'pending',
330
    start: number,
331
    end: number,
332
  ): Promise<{
333
    items: { id: string; v?: any; err?: string }[];
334
    jobs: JobJsonRaw[];
335
    total: number;
336
  }> {
337
    const key = this.toKey(
4✔
338
      type == 'processed'
4✔
339
        ? `${parentId}:processed`
340
        : `${parentId}:dependencies`,
341
    );
342
    const { items, total, jobs } = await this.scripts.paginate(key, {
4✔
343
      start,
344
      end,
345
      fetchJobs: true,
346
    });
347
    return {
4✔
348
      items,
349
      jobs,
350
      total,
351
    };
352
  }
353

354
  async getRanges(
355
    types: JobType[],
356
    start = 0,
×
357
    end = 1,
×
358
    asc = false,
×
359
  ): Promise<string[]> {
360
    const multiCommands: string[] = [];
40✔
361

362
    this.commandByType(types, false, (key, command) => {
40✔
363
      switch (command) {
61✔
364
        case 'lrange':
365
          multiCommands.push('lrange');
29✔
366
          break;
29✔
367
        case 'zrange':
368
          multiCommands.push('zrange');
32✔
369
          break;
32✔
370
      }
371
    });
372

373
    const responses = await this.scripts.getRanges(types, start, end, asc);
40✔
374

375
    let results: string[] = [];
40✔
376

377
    responses.forEach((response: string[], index: number) => {
40✔
378
      const result = response || [];
61!
379

380
      if (asc && multiCommands[index] === 'lrange') {
61✔
381
        results = results.concat(result.reverse());
17✔
382
      } else {
383
        results = results.concat(result);
44✔
384
      }
385
    });
386

387
    return [...new Set(results)];
40✔
388
  }
389

390
  /**
391
   * Returns the jobs that are on the given statuses (note that JobType is synonym for job status)
392
   * @param types - the statuses of the jobs to return.
393
   * @param start - zero based index from where to start returning jobs.
394
   * @param end - zero based index where to stop returning jobs.
395
   * @param asc - if true, the jobs will be returned in ascending order.
396
   */
397
  async getJobs(
398
    types?: JobType[] | JobType,
399
    start = 0,
7✔
400
    end = -1,
7✔
401
    asc = false,
7✔
402
  ): Promise<Job<DataType, ResultType, NameType>[]> {
403
    const currentTypes = this.sanitizeJobTypes(types);
40✔
404

405
    const jobIds = await this.getRanges(currentTypes, start, end, asc);
40✔
406

407
    return Promise.all(
40✔
408
      jobIds.map(
409
        jobId =>
410
          this.Job.fromId(this, jobId) as Promise<
68✔
411
            Job<DataType, ResultType, NameType>
412
          >,
413
      ),
414
    );
415
  }
416

417
  /**
418
   * Returns the logs for a given Job.
419
   * @param jobId - the id of the job to get the logs for.
420
   * @param start - zero based index from where to start returning jobs.
421
   * @param end - zero based index where to stop returning jobs.
422
   * @param asc - if true, the jobs will be returned in ascending order.
423
   */
424
  async getJobLogs(
425
    jobId: string,
426
    start = 0,
165✔
427
    end = -1,
165✔
428
    asc = true,
166✔
429
  ): Promise<{ logs: string[]; count: number }> {
430
    const client = await this.client;
170✔
431
    const multi = client.multi();
170✔
432

433
    const logsKey = this.toKey(jobId + ':logs');
170✔
434
    if (asc) {
170✔
435
      multi.lrange(logsKey, start, end);
167✔
436
    } else {
437
      multi.lrange(logsKey, -(end + 1), -(start + 1));
3✔
438
    }
439
    multi.llen(logsKey);
170✔
440
    const result = (await multi.exec()) as [[Error, [string]], [Error, number]];
170✔
441
    if (!asc) {
170✔
442
      result[0][1].reverse();
3✔
443
    }
444
    return {
170✔
445
      logs: result[0][1],
446
      count: result[1][1],
447
    };
448
  }
449

450
  private async baseGetClients(matcher: (name: string) => boolean): Promise<
451
    {
452
      [index: string]: string;
453
    }[]
454
  > {
455
    const client = await this.client;
18✔
456
    try {
18✔
457
      const clients = (await client.client('LIST')) as string;
18✔
458
      const list = this.parseClientList(clients, matcher);
18✔
459
      return list;
18✔
460
    } catch (err) {
461
      if (!clientCommandMessageReg.test((<Error>err).message)) {
×
462
        throw err;
×
463
      }
464

465
      return [{ name: 'GCP does not support client list' }];
×
466
    }
467
  }
468

469
  /**
470
   * Get the worker list related to the queue. i.e. all the known
471
   * workers that are available to process jobs for this queue.
472
   * Note: GCP does not support SETNAME, so this call will not work
473
   *
474
   * @returns - Returns an array with workers info.
475
   */
476
  getWorkers(): Promise<
477
    {
478
      [index: string]: string;
479
    }[]
480
  > {
481
    const unnamedWorkerClientName = `${this.clientName()}`;
16✔
482
    const namedWorkerClientName = `${this.clientName()}:w:`;
16✔
483

484
    const matcher = (name: string) =>
16✔
485
      name &&
250✔
486
      (name === unnamedWorkerClientName ||
487
        name.startsWith(namedWorkerClientName));
488

489
    return this.baseGetClients(matcher);
16✔
490
  }
491

492
  /**
493
   * Returns the current count of workers for the queue.
494
   *
495
   * getWorkersCount(): Promise<number>
496
   *
497
   */
498
  async getWorkersCount(): Promise<number> {
499
    const workers = await this.getWorkers();
5✔
500
    return workers.length;
5✔
501
  }
502

503
  /**
504
   * Get queue events list related to the queue.
505
   * Note: GCP does not support SETNAME, so this call will not work
506
   *
507
   * @deprecated do not use this method, it will be removed in the future.
508
   *
509
   * @returns - Returns an array with queue events info.
510
   */
511
  async getQueueEvents(): Promise<
512
    {
513
      [index: string]: string;
514
    }[]
515
  > {
516
    const clientName = `${this.clientName()}${QUEUE_EVENT_SUFFIX}`;
2✔
517
    return this.baseGetClients((name: string) => name === clientName);
31✔
518
  }
519

520
  /**
521
   * Get queue metrics related to the queue.
522
   *
523
   * This method returns the gathered metrics for the queue.
524
   * The metrics are represented as an array of job counts
525
   * per unit of time (1 minute).
526
   *
527
   * @param start - Start point of the metrics, where 0
528
   * is the newest point to be returned.
529
   * @param end - End point of the metrics, where -1 is the
530
   * oldest point to be returned.
531
   *
532
   * @returns - Returns an object with queue metrics.
533
   */
534
  async getMetrics(
535
    type: 'completed' | 'failed',
536
    start = 0,
4✔
537
    end = -1,
4✔
538
  ): Promise<Metrics> {
539
    const client = await this.client;
11✔
540
    const metricsKey = this.toKey(`metrics:${type}`);
11✔
541
    const dataKey = `${metricsKey}:data`;
11✔
542

543
    const multi = client.multi();
11✔
544
    multi.hmget(metricsKey, 'count', 'prevTS', 'prevCount');
11✔
545
    multi.lrange(dataKey, start, end);
11✔
546
    multi.llen(dataKey);
11✔
547

548
    const [hmget, range, len] = (await multi.exec()) as [
11✔
549
      [Error, [string, string, string]],
550
      [Error, []],
551
      [Error, number],
552
    ];
553
    const [err, [count, prevTS, prevCount]] = hmget;
11✔
554
    const [err2, data] = range;
11✔
555
    const [err3, numPoints] = len;
11✔
556
    if (err || err2) {
11!
557
      throw err || err2 || err3;
×
558
    }
559

560
    return {
11✔
561
      meta: {
562
        count: parseInt(count || '0', 10),
11!
563
        prevTS: parseInt(prevTS || '0', 10),
11!
564
        prevCount: parseInt(prevCount || '0', 10),
11!
565
      },
566
      data,
567
      count: numPoints,
568
    };
569
  }
570

571
  private parseClientList(list: string, matcher: (name: string) => boolean) {
572
    const lines = list.split('\n');
18✔
573
    const clients: { [index: string]: string }[] = [];
18✔
574

575
    lines.forEach((line: string) => {
18✔
576
      const client: { [index: string]: string } = {};
281✔
577
      const keyValues = line.split(' ');
281✔
578
      keyValues.forEach(function (keyValue) {
281✔
579
        const index = keyValue.indexOf('=');
8,171✔
580
        const key = keyValue.substring(0, index);
8,171✔
581
        const value = keyValue.substring(index + 1);
8,171✔
582
        client[key] = value;
8,171✔
583
      });
584
      const name = client['name'];
281✔
585
      if (matcher(name)) {
281✔
586
        client['name'] = this.name;
23✔
587
        client['rawname'] = name;
23✔
588
        clients.push(client);
23✔
589
      }
590
    });
591
    return clients;
18✔
592
  }
593
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc