• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

GEWIS / sudosos-backend / 26083912867

19 May 2026 07:51AM UTC coverage: 87.549% (-0.2%) from 87.783%
26083912867

Pull #918

github

web-flow
Merge 550518fe0 into 9dd74ee61
Pull Request #918: Background task queue (replaces BullMQ) with admin API, WebSocket updates, and failed-task health signal

3968 of 4633 branches covered (85.65%)

Branch coverage included in aggregate %.

498 of 563 new or added lines in 16 files covered. (88.45%)

110 existing lines in 13 files now uncovered.

20607 of 23437 relevant lines covered (87.93%)

840.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.45
/src/workers/task-runner.ts
1
/**
1✔
2
 *  SudoSOS back-end API service.
3
 *  Copyright (C) 2026 Study association GEWIS
4
 *
5
 *  This program is free software: you can redistribute it and/or modify
6
 *  it under the terms of the GNU Affero General Public License as published
7
 *  by the Free Software Foundation, either version 3 of the License, or
8
 *  (at your option) any later version.
9
 *
10
 *  This program is distributed in the hope that it will be useful,
11
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 *  GNU Affero General Public License for more details.
14
 *
15
 *  You should have received a copy of the GNU Affero General Public License
16
 *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
17
 *
18
 *  @license
19
 */
1✔
20

21
/**
1✔
22
 * Worker pool that drains the `task` table. The atomic claim inside
23
 * `TaskService.processNextEligible` keeps concurrent workers (in this
24
 * process or across replicas) safe without explicit locking.
25
 *
26
 * @module tasks
27
 */
1✔
28

29
import log4js from 'log4js';
30
import TaskService from '../service/task-service';
31
import { applyConfiguredLogLevel } from '../helpers/logging';
32

33
const logger = log4js.getLogger('TaskRunner');
1✔
34
applyConfiguredLogLevel(logger);
1✔
35

36
const DEFAULT_CONCURRENCY = 5;
1✔
37
const DEFAULT_IDLE_MS = 1000;
1✔
38

39
export interface TaskRunner {
40
  /** Number of parallel worker loops currently running. */
41
  readonly concurrency: number;
42
  /** Stop accepting work; resolves once every in-flight loop has exited. */
43
  stop(): Promise<void>;
44
}
45

46
export interface StartTaskRunnerOptions {
47
  /** Number of parallel worker loops (default: 5). */
48
  concurrency?: number;
49
  /** Sleep duration when the queue was empty on the last sweep (ms). */
50
  idleMs?: number;
51
}
52

53
const sleep = (ms: number): Promise<void> => new Promise((resolve) => {
1✔
54
  const t = setTimeout(resolve, ms);
5✔
55
  // Don't hold the event loop open just for a poll-sleep.
5✔
56
  if (typeof t.unref === 'function') t.unref();
5✔
57
});
5✔
58

59
/**
1✔
60
 * Start the worker pool. Returns a handle whose `stop()` resolves cleanly
61
 * when every worker has finished its current task and exited.
62
 */
1✔
63
export const startTaskRunner = (options: StartTaskRunnerOptions = {}): TaskRunner => {
1✔
64
  const concurrency = Math.max(1, options.concurrency ?? DEFAULT_CONCURRENCY);
1✔
65
  const idleMs = Math.max(100, options.idleMs ?? DEFAULT_IDLE_MS);
1✔
66

67
  const state = { stopping: false };
1✔
68
  const loops: Promise<void>[] = [];
1✔
69

70
  const workerLoop = async (): Promise<void> => {
1✔
71
    while (!state.stopping) {
5✔
72
      try {
5✔
73
        const ran = await TaskService.processNextEligible();
5✔
74
        if (!ran) {
5✔
75
          await sleep(idleMs);
5✔
76
        }
5✔
77
      } catch (err) {
5!
NEW
78
        logger.error(`Worker iteration failed: ${(err as Error).message}`);
×
NEW
79
        await sleep(idleMs);
×
NEW
80
      }
×
81
    }
5✔
82
  };
5✔
83

84
  for (let i = 0; i < concurrency; i += 1) {
1✔
85
    loops.push(workerLoop());
5✔
86
  }
5✔
87

88
  logger.info(`TaskRunner running (concurrency ${concurrency}, idle ${idleMs}ms).`);
1✔
89

90
  return {
1✔
91
    concurrency,
1✔
92
    stop: async () => {
1✔
93
      state.stopping = true;
1✔
94
      await Promise.allSettled(loops);
1✔
95
      logger.info('TaskRunner stopped.');
1✔
96
    },
1✔
97
  };
1✔
98
};
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc