• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Zajno / common-utils / 23100588080

15 Mar 2026 01:25AM UTC coverage: 70.225% (+0.2%) from 69.991%
23100588080

Pull #135

github

web-flow
Merge 39fc297ea into 21618a6a1
Pull Request #135: Task queue improvements

767 of 1041 branches covered (73.68%)

Branch coverage included in aggregate %.

45 of 46 new or added lines in 2 files covered. (97.83%)

1 existing line in 1 file now uncovered.

3358 of 4833 relevant lines covered (69.48%)

25.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.12
/packages/common/src/structures/queue/tasks.ts
1
import { createManualPromise, type ManualPromise } from '../../async/manualPromise.js';
2
import type { ILogger } from '../../logger/index.js';
3

4
type Factory<T> = () => Promise<T>;
5

6
type QueueItem<T> = {
7
    factory: Factory<T>;
8
    promise: ManualPromise<T | undefined>;
9
    name?: string;
10
};
11

12
export type DelayedTask<T> = {
13
    /** Promise that resolves with the task result, or rejects if the task throws or is cancelled. */
14
    readonly promise: Promise<T | undefined>;
15
    /** Cancel the delayed task. If the task hasn't started yet, the promise rejects with a 'Cancelled' error. */
16
    readonly cancel: () => void;
17
};
18

19
export type TasksQueueOptions = {
20
    /** Delay in ms to wait after each task before starting the next one. Default: 0 */
21
    delayBetweenTasks?: number;
22
    /** Called when a task throws. If provided, the error is NOT propagated to the enqueue() promise. */
23
    onTaskError?: (error: unknown, name?: string) => void;
24
};
25

26
export class TasksQueue<T> {
27

28
    private readonly _items: QueueItem<T>[] = [];
29✔
29
    private _running = 0;
29✔
30
    private _logger: ILogger | null = null;
29✔
31
    private readonly _delayBetweenTasks: number;
32
    private readonly _onTaskError: ((error: unknown, name?: string) => void) | undefined;
33
    private readonly _delayedTasks = new Set<DelayedTask<T>>();
29✔
34

35
    constructor(readonly limit: number, options?: TasksQueueOptions) {
29✔
36
        if (!limit || limit < 0) {
29✔
37
            throw new Error('TasksQueue: limit should be a positive number');
2✔
38
        }
39
        this._delayBetweenTasks = options?.delayBetweenTasks ?? 0;
27✔
40
        this._onTaskError = options?.onTaskError;
27✔
41
    }
42

43
    public get isFull() { return this._running >= this.limit; }
47✔
44
    public get running() { return this._running; }
10✔
45

46
    /** Number of tasks waiting in the queue (not including currently running) */
47
    public get pending() { return this._items.length; }
9✔
48

49
    addLogger(logger: ILogger) {
50
        this._logger = logger;
×
51
        return this;
×
52
    }
53

54
    /**
55
     * Enqueue a task factory to run when a slot is available.
56
     *
57
     * **Note:** When `onTaskError` is provided in the constructor options and the factory throws,
58
     * the error is passed to `onTaskError` and the returned promise resolves with `undefined`.
59
     */
60
    public enqueue(factory: Factory<T>, name?: string): Promise<T | undefined> {
61
        if (typeof factory !== 'function') {
34✔
62
            throw new Error('Invalid arg: factory not a function');
1✔
63
        }
64

65
        if (this.isFull) {
33✔
66

67
            const item: QueueItem<T> = {
13✔
68
                factory,
69
                promise: createManualPromise<T | undefined>(),
70
                name,
71
            };
72

73
            this._items.push(item);
13✔
74
            return item.promise.promise;
13✔
75
        }
76

77
        return this._runFactory(factory, name)
20✔
78
            .then(res => res.ok === true ? res.result : undefined);
17✔
79
    }
80

81
    /** Enqueue a task to run after `delay` ms. Returns a promise and a cancel function. */
82
    public enqueueDelayed(factory: Factory<T>, delay: number, name?: string): DelayedTask<T> {
83
        const mp = createManualPromise<T | undefined>();
10✔
84
        let started = false;
10✔
85

86
        const timer = setTimeout(() => {
10✔
87
            started = true;
7✔
88
            this._delayedTasks.delete(task);
7✔
89
            this.enqueue(factory, name).then(
7✔
90
                result => mp.resolve(result),
5✔
91
                err => mp.reject(err as Error),
2✔
92
            );
93
        }, delay);
94

95
        const cancel = () => {
10✔
96
            if (!started) {
4✔
97
                clearTimeout(timer);
3✔
98
                this._delayedTasks.delete(task);
3✔
99
                mp.reject(new Error('Cancelled'));
3✔
100
            }
101
        };
102

103
        const task: DelayedTask<T> = { promise: mp.promise, cancel };
10✔
104
        this._delayedTasks.add(task);
10✔
105

106
        return task;
10✔
107
    }
108

109
    /** Remove all pending tasks from the queue. Does not cancel running tasks. Also cancels all delayed enqueues.
110
     *  Pending enqueue() promises are rejected with a 'TasksQueue cleared' error so callers don't hang indefinitely.
111
     *  Delayed tasks are cancelled and their promises are rejected with a 'Cancelled' error. */
112
    public clear(): void {
113
        const pending = this._items.splice(0);
3✔
114
        if (pending.length > 0) {
3✔
115
            const clearError = new Error('TasksQueue cleared');
1✔
116
            for (const item of pending) {
1✔
117
                item.promise.reject(clearError);
2✔
118
            }
119
        }
120
        const delayed = [...this._delayedTasks];
3✔
121
        for (const task of delayed) {
3✔
122
            task.cancel();
2✔
123
        }
124
        this._delayedTasks.clear();
3✔
125
    }
126

127
    private _runFactory = async (factory: Factory<T>, name?: string) => {
29✔
128
        this._running++;
31✔
129
        try {
31✔
130
            const result = await factory();
31✔
131
            return { ok: true as const, result };
22✔
132
        } catch (err) {
133
            if (this._onTaskError) {
9✔
134
                this._onTaskError(err, name || factory.name || undefined);
6✔
135
                return { ok: false as const };
6✔
136
            }
137
            this._logger?.warn(`Factory "${name || factory.name || '<unknown>'}" thrown. Rethrowing...`);
3!
138
            throw err;
3✔
139
        } finally {
140
            this._running--;
31✔
141
            if (this._delayBetweenTasks > 0 && this._items.length > 0) {
31✔
142
                await new Promise<void>(resolve => setTimeout(resolve, this._delayBetweenTasks));
3✔
143
            }
144
            this._tryRunNext();
31✔
145
        }
146
    };
147

148
    private _tryRunNext = async () => {
29✔
149
        if (!this._items.length || this.isFull) {
31✔
150
            return;
20✔
151
        }
152

153
        const next = this._items.shift();
11✔
154
        if (!next || !next.factory || !next.promise) {
11✔
155
            return;
×
156
        }
157

158
        try {
11✔
159
            const res = await this._runFactory(next.factory, next.name);
11✔
160
            next.promise.resolve(res.ok === true ? res.result : undefined);
11✔
161
        } catch (err) {
NEW
162
            next.promise.reject(err as Error);
×
163
        }
164
    };
165

166
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc