• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

GEWIS / sudosos-backend / 26082567712

19 May 2026 07:22AM UTC coverage: 87.549% (-0.2%) from 87.783%
26082567712

Pull #918

github

web-flow
Merge a7f0b4cdb into 9dd74ee61
Pull Request #918: Background task queue (replaces BullMQ) with admin API, WebSocket updates, and failed-task health signal

3969 of 4634 branches covered (85.65%)

Branch coverage included in aggregate %.

498 of 563 new or added lines in 16 files covered. (88.45%)

110 existing lines in 13 files now uncovered.

20607 of 23437 relevant lines covered (87.93%)

840.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.45
/src/workers/task-runner.ts
1
/**
1✔
2
 *  SudoSOS back-end API service.
3
 *  Copyright (C) 2026 Study association GEWIS
4
 *
5
 *  This program is free software: you can redistribute it and/or modify
6
 *  it under the terms of the GNU Affero General Public License as published
7
 *  by the Free Software Foundation, either version 3 of the License, or
8
 *  (at your option) any later version.
9
 *
10
 *  This program is distributed in the hope that it will be useful,
11
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 *  GNU Affero General Public License for more details.
14
 *
15
 *  You should have received a copy of the GNU Affero General Public License
16
 *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
17
 *
18
 *  @license
19
 */
1✔
20

21
/**
1✔
22
 * Worker pool that drains the `task` table. Replaces the previous BullMQ
23
 * worker + setInterval poller combo. The atomic claim inside
24
 * `TaskService.processNextEligible` keeps concurrent workers (in this process
25
 * or across replicas) safe without explicit locking.
26
 *
27
 * @module tasks
28
 */
1✔
29

30
import log4js from 'log4js';
31
import TaskService from '../service/task-service';
32
import { applyConfiguredLogLevel } from '../helpers/logging';
33

34
const logger = log4js.getLogger('TaskRunner');
1✔
35
applyConfiguredLogLevel(logger);
1✔
36

37
const DEFAULT_CONCURRENCY = 5;
1✔
38
const DEFAULT_IDLE_MS = 1000;
1✔
39

40
export interface TaskRunner {
41
  /** Number of parallel worker loops currently running. */
42
  readonly concurrency: number;
43
  /** Stop accepting work; resolves once every in-flight loop has exited. */
44
  stop(): Promise<void>;
45
}
46

47
export interface StartTaskRunnerOptions {
48
  /** Number of parallel worker loops (default: 5). */
49
  concurrency?: number;
50
  /** Sleep duration when the queue was empty on the last sweep (ms). */
51
  idleMs?: number;
52
}
53

54
const sleep = (ms: number): Promise<void> => new Promise((resolve) => {
1✔
55
  const t = setTimeout(resolve, ms);
5✔
56
  // Don't hold the event loop open just for a poll-sleep.
5✔
57
  if (typeof t.unref === 'function') t.unref();
5✔
58
});
5✔
59

60
/**
1✔
61
 * Start the worker pool. Returns a handle whose `stop()` resolves cleanly
62
 * when every worker has finished its current task and exited.
63
 */
1✔
64
export const startTaskRunner = (options: StartTaskRunnerOptions = {}): TaskRunner => {
1✔
65
  const concurrency = Math.max(1, options.concurrency ?? DEFAULT_CONCURRENCY);
1✔
66
  const idleMs = Math.max(100, options.idleMs ?? DEFAULT_IDLE_MS);
1✔
67

68
  const state = { stopping: false };
1✔
69
  const loops: Promise<void>[] = [];
1✔
70

71
  const workerLoop = async (): Promise<void> => {
1✔
72
    while (!state.stopping) {
5✔
73
      try {
5✔
74
        const ran = await TaskService.processNextEligible();
5✔
75
        if (!ran) {
5✔
76
          await sleep(idleMs);
5✔
77
        }
5✔
78
      } catch (err) {
5!
NEW
79
        logger.error(`Worker iteration failed: ${(err as Error).message}`);
×
NEW
80
        await sleep(idleMs);
×
NEW
81
      }
×
82
    }
5✔
83
  };
5✔
84

85
  for (let i = 0; i < concurrency; i += 1) {
1✔
86
    loops.push(workerLoop());
5✔
87
  }
5✔
88

89
  logger.info(`TaskRunner running (concurrency ${concurrency}, idle ${idleMs}ms).`);
1✔
90

91
  return {
1✔
92
    concurrency,
1✔
93
    stop: async () => {
1✔
94
      state.stopping = true;
1✔
95
      await Promise.allSettled(loops);
1✔
96
      logger.info('TaskRunner stopped.');
1✔
97
    },
1✔
98
  };
1✔
99
};
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc