• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

panates / power-tasks / 23430198069

23 Mar 2026 09:24AM UTC coverage: 90.292% (-1.8%) from 92.138%
23430198069

push

github

erayhanoglu
1.13.5

255 of 295 branches covered (86.44%)

Branch coverage included in aggregate %.

1196 of 1312 relevant lines covered (91.16%)

44.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.52
/src/task-queue.ts
1
import DoublyLinked from "doublylinked";
1✔
2
import { AsyncEventEmitter } from "node-events-async";
1✔
3
import { TaskQueueEvents } from "./interfaces/task-queue-events.js";
1✔
4
import { TaskQueueOptions } from "./interfaces/taskqueue-options.js";
1✔
5
import type { TaskLike } from "./interfaces/types.js";
1✔
6
import { Task } from "./task.js";
1✔
7

1✔
8
/**
1✔
9
 * A `TaskQueue` manages the execution of tasks with concurrency control.
1✔
10
 * It allows limiting the number of simultaneous tasks and provides methods
1✔
11
 * to pause, resume, and manage the task lifecycle.
1✔
12
 *
1✔
13
 * @extends AsyncEventEmitter
1✔
14
 */
1✔
15
export class TaskQueue extends AsyncEventEmitter<TaskQueueEvents> {
1✔
16
  /**
1✔
17
   * The maximum number of tasks allowed in the queue.
1✔
18
   */
1✔
19
  maxQueue?: number;
1✔
20
  /**
16✔
21
   * The maximum number of tasks to run concurrently.
16✔
22
   */
16✔
23
  concurrency?: number;
16✔
24
  protected _paused: boolean;
16✔
25
  protected _queue = new DoublyLinked<Task>();
16✔
26
  protected _running = new Set<Task>();
16✔
27
  protected _runningIds = new Set<string>();
16✔
28

1✔
29
  /**
1✔
30
   * Constructs a new TaskQueue.
1✔
31
   *
1✔
32
   * @param options - Configuration options for the queue.
1✔
33
   */
1✔
34
  constructor(options?: TaskQueueOptions) {
1✔
35
    super();
16✔
36
    this.maxQueue = options?.maxQueue;
16✔
37
    this.concurrency = options?.concurrency;
16✔
38
    this._paused = !!options?.paused;
16✔
39
  }
16✔
40

1✔
41
  /**
1✔
42
   * Gets the total number of tasks in the queue (both queued and running).
1✔
43
   */
1✔
44
  get size() {
1✔
45
    return this._queue.length + this._running.size;
7✔
46
  }
7✔
47

1✔
48
  /**
1✔
49
   * Gets the number of tasks currently running.
1✔
50
   */
1✔
51
  get running() {
1✔
52
    return this._running.size;
3✔
53
  }
3✔
54

1✔
55
  /**
1✔
56
   * Gets the number of tasks currently waiting in the queue.
1✔
57
   */
1✔
58
  get queued() {
1✔
59
    return this._queue.length;
3✔
60
  }
3✔
61

1✔
62
  /**
1✔
63
   * Whether the queue is currently paused.
1✔
64
   */
1✔
65
  get paused(): boolean {
1✔
66
    return this._paused;
29✔
67
  }
29✔
68

1✔
69
  /**
1✔
70
   * Pauses the queue execution. No new tasks will be started.
1✔
71
   */
1✔
72
  pause(): void {
1✔
73
    this._paused = true;
1✔
74
  }
1✔
75

1✔
76
  /**
1✔
77
   * Resumes the queue execution and starts any queued tasks if concurrency allows.
1✔
78
   */
1✔
79
  resume(): void {
1✔
80
    this._paused = false;
1✔
81
    setImmediate(() => this._pulse());
1✔
82
  }
1✔
83

1✔
84
  /**
1✔
85
   * Clears all tasks from the queue and aborts them.
1✔
86
   */
1✔
87
  clearQueue() {
1✔
88
    this._queue.forEach((task) => task.abort());
2✔
89
    this._queue = new DoublyLinked();
2✔
90
  }
2✔
91

1✔
92
  /**
1✔
93
   * Aborts all running tasks and clears the queue.
1✔
94
   */
1✔
95
  abortAll(): void {
1✔
96
    if (!this.size) return;
1!
97
    this.clearQueue();
1✔
98
    this._running.forEach((task) => task.abort());
1✔
99
  }
1✔
100

1✔
101
  /**
1✔
102
   * Returns a promise that resolves when all tasks have finished and the queue is empty.
1✔
103
   *
1✔
104
   * @returns A promise that resolves when the queue finishes.
1✔
105
   */
1✔
106
  async wait(): Promise<void> {
1✔
107
    if (!this.size) return Promise.resolve();
4!
108
    return new Promise((resolve) => {
4✔
109
      this.once("finish", resolve);
4✔
110
    });
4✔
111
  }
4✔
112

1✔
113
  /**
1✔
114
   * Adds a task to the beginning of the queue.
1✔
115
   *
1✔
116
   * @template T - The type of the result produced by the task.
1✔
117
   * @param task - The task or task function to enqueue.
1✔
118
   * @returns The {@link Task} instance.
1✔
119
   */
1✔
120
  enqueuePrepend<T = any>(task: TaskLike<T>): Task<T> {
1✔
121
    return this._enqueue(task, true);
1✔
122
  }
1✔
123

1✔
124
  /**
1✔
125
   * Adds a task to the end of the queue.
1✔
126
   *
1✔
127
   * @template T - The type of the result produced by the task.
1✔
128
   * @param task - The task or task function to enqueue.
1✔
129
   * @returns The {@link Task} instance.
1✔
130
   */
1✔
131
  enqueue<T = any>(task: TaskLike<T>): Task<T> {
1✔
132
    return this._enqueue(task, false);
22✔
133
  }
22✔
134

1✔
135
  /**
1✔
136
   * Checks if a task with the given ID is currently running.
1✔
137
   *
1✔
138
   * @param id - The ID of the task to check.
1✔
139
   * @returns `true` if a task with the given ID is running, `false` otherwise.
1✔
140
   */
1✔
141
  isRunning(id: string): boolean {
1✔
142
    return this._runningIds.has(id);
×
143
  }
×
144

1✔
145
  /**
1✔
146
   * Internal method to enqueue a task.
1✔
147
   *
1✔
148
   * @template T - The type of the task result.
1✔
149
   * @param task - The task-like object to enqueue.
1✔
150
   * @param prepend - Whether to add the task to the beginning of the queue.
1✔
151
   * @returns The {@link Task} instance.
1✔
152
   * @protected
1✔
153
   */
1✔
154
  protected _enqueue<T = any>(task: TaskLike, prepend: boolean): Task<T> {
1✔
155
    if (this.maxQueue && this.size >= this.maxQueue)
23✔
156
      throw new Error(`Queue limit (${this.maxQueue}) exceeded`);
23✔
157
    const taskInstance = task instanceof Task ? task : new Task(task);
23✔
158
    Object.defineProperty(taskInstance, "_isManaged", {
23✔
159
      configurable: false,
23✔
160
      writable: false,
23✔
161
      enumerable: false,
23✔
162
      value: true,
23✔
163
    });
23✔
164
    taskInstance.once("error", (...args: any[]) =>
23✔
165
      this.emitAsync("error", ...args),
2✔
166
    );
23✔
167
    this.emit("enqueue", taskInstance);
23✔
168
    if (prepend) this._queue.unshift(taskInstance);
23✔
169
    else this._queue.push(taskInstance);
21✔
170
    this._pulse();
22✔
171
    return taskInstance;
22✔
172
  }
22✔
173

1✔
174
  /**
1✔
175
   * Internal method to process the queue and start tasks.
1✔
176
   * @protected
1✔
177
   */
1✔
178
  protected _pulse() {
1✔
179
    if (this.paused) return;
28✔
180
    while (!this.concurrency || this._running.size < this.concurrency) {
28✔
181
      const task = this._queue.shift();
32✔
182
      if (!task) return;
32✔
183
      this._running.add(task);
19✔
184
      const id = task.id;
19✔
185
      if (id) this._runningIds.add(id);
32!
186
      task.prependOnceListener("finish", () => {
19✔
187
        this._running.delete(task);
19✔
188
        if (id) this._runningIds.delete(id);
19!
189
        if (!(this._running.size || this._queue.length))
19✔
190
          return this.emit("finish");
19✔
191
        this._pulse();
5✔
192
      });
5✔
193
      task.start();
19✔
194
    }
19✔
195
  }
28✔
196
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc