• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

panates / power-tasks / 23439056630

23 Mar 2026 01:12PM UTC coverage: 90.311% (+0.02%) from 90.292%
23439056630

push

github

erayhanoglu
1.14.0

255 of 295 branches covered (86.44%)

Branch coverage included in aggregate %.

1199 of 1315 relevant lines covered (91.18%)

44.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.55
/src/task-queue.ts
1
import DoublyLinked from "doublylinked";
1✔
2
import { AsyncEventEmitter } from "node-events-async";
1✔
3
import { TaskQueueEvents } from "./interfaces/task-queue-events.js";
1✔
4
import { TaskQueueOptions } from "./interfaces/taskqueue-options.js";
1✔
5
import type { TaskLike } from "./interfaces/types.js";
1✔
6
import { Task } from "./task.js";
1✔
7

1✔
8
/**
1✔
9
 * A `TaskQueue` manages the execution of tasks with concurrency control.
1✔
10
 * It allows limiting the number of simultaneous tasks and provides methods
1✔
11
 * to pause, resume, and manage the task lifecycle.
1✔
12
 *
1✔
13
 * @extends AsyncEventEmitter
1✔
14
 */
1✔
15
export class TaskQueue extends AsyncEventEmitter<TaskQueueEvents> {
1✔
16
  /**
1✔
17
   * The maximum number of tasks allowed in the queue.
1✔
18
   */
1✔
19
  maxQueue?: number;
1✔
20
  /**
16✔
21
   * The maximum number of tasks to run concurrently.
16✔
22
   */
16✔
23
  concurrency?: number;
16✔
24
  protected _paused: boolean;
16✔
25
  protected _queue = new DoublyLinked<Task>();
16✔
26
  protected _running = new Set<Task>();
16✔
27
  protected _idMap = new Map<string, Task>();
16✔
28

1✔
29
  /**
1✔
30
   * Constructs a new TaskQueue.
1✔
31
   *
1✔
32
   * @param options - Configuration options for the queue.
1✔
33
   */
1✔
34
  constructor(options?: TaskQueueOptions) {
1✔
35
    super();
16✔
36
    this.maxQueue = options?.maxQueue;
16✔
37
    this.concurrency = options?.concurrency;
16✔
38
    this._paused = !!options?.paused;
16✔
39
  }
16✔
40

1✔
41
  /**
1✔
42
   * Gets the total number of tasks in the queue (both queued and running).
1✔
43
   */
1✔
44
  get size() {
1✔
45
    return this._queue.length + this._running.size;
7✔
46
  }
7✔
47

1✔
48
  /**
1✔
49
   * Gets the number of tasks currently running.
1✔
50
   */
1✔
51
  get running() {
1✔
52
    return this._running.size;
3✔
53
  }
3✔
54

1✔
55
  /**
1✔
56
   * Gets the number of tasks currently waiting in the queue.
1✔
57
   */
1✔
58
  get queued() {
1✔
59
    return this._queue.length;
3✔
60
  }
3✔
61

1✔
62
  /**
1✔
63
   * Whether the queue is currently paused.
1✔
64
   */
1✔
65
  get paused(): boolean {
1✔
66
    return this._paused;
29✔
67
  }
29✔
68

1✔
69
  /**
1✔
70
   * Pauses the queue execution. No new tasks will be started.
1✔
71
   */
1✔
72
  pause(): void {
1✔
73
    this._paused = true;
1✔
74
  }
1✔
75

1✔
76
  /**
1✔
77
   * Resumes the queue execution and starts any queued tasks if concurrency allows.
1✔
78
   */
1✔
79
  resume(): void {
1✔
80
    this._paused = false;
1✔
81
    setImmediate(() => this._pulse());
1✔
82
  }
1✔
83

1✔
84
  /**
1✔
85
   * Clears all tasks from the queue and aborts them.
1✔
86
   */
1✔
87
  clearQueue() {
1✔
88
    this._queue.forEach((task) => {
2✔
89
      task.abort();
3✔
90
      this._idMap.delete(task.id);
3✔
91
    });
3✔
92
    this._queue = new DoublyLinked();
2✔
93
  }
2✔
94

1✔
95
  /**
1✔
96
   * Aborts all running tasks and clears the queue.
1✔
97
   */
1✔
98
  abortAll(): void {
1✔
99
    if (!this.size) return;
1!
100
    this.clearQueue();
1✔
101
    this._running.forEach((task) => task.abort());
1✔
102
  }
1✔
103

1✔
104
  /**
1✔
105
   * Returns a promise that resolves when all tasks have finished and the queue is empty.
1✔
106
   *
1✔
107
   * @returns A promise that resolves when the queue finishes.
1✔
108
   */
1✔
109
  async wait(): Promise<void> {
1✔
110
    if (!this.size) return Promise.resolve();
4!
111
    return new Promise((resolve) => {
4✔
112
      this.once("finish", resolve);
4✔
113
    });
4✔
114
  }
4✔
115

1✔
116
  /**
1✔
117
   * Adds a task to the beginning of the queue.
1✔
118
   *
1✔
119
   * @template T - The type of the result produced by the task.
1✔
120
   * @param task - The task or task function to enqueue.
1✔
121
   * @returns The {@link Task} instance.
1✔
122
   */
1✔
123
  enqueuePrepend<T = any>(task: TaskLike<T>): Task<T> {
1✔
124
    return this._enqueue(task, true);
1✔
125
  }
1✔
126

1✔
127
  /**
1✔
128
   * Adds a task to the end of the queue.
1✔
129
   *
1✔
130
   * @template T - The type of the result produced by the task.
1✔
131
   * @param task - The task or task function to enqueue.
1✔
132
   * @returns The {@link Task} instance.
1✔
133
   */
1✔
134
  enqueue<T = any>(task: TaskLike<T>): Task<T> {
1✔
135
    return this._enqueue(task, false);
22✔
136
  }
22✔
137

1✔
138
  /**
1✔
139
   * Retrieves a task by its ID.
1✔
140
   *
1✔
141
   * @param id - The ID of the task to retrieve.
1✔
142
   * @returns The {@link Task} instance if found, or `undefined` otherwise.
1✔
143
   */
1✔
144
  getTask(id: string): Task | undefined {
1✔
145
    return this._idMap.get(id);
×
146
  }
×
147

1✔
148
  /**
1✔
149
   * Internal method to enqueue a task.
1✔
150
   *
1✔
151
   * @template T - The type of the task result.
1✔
152
   * @param task - The task-like object to enqueue.
1✔
153
   * @param prepend - Whether to add the task to the beginning of the queue.
1✔
154
   * @returns The {@link Task} instance.
1✔
155
   * @protected
1✔
156
   */
1✔
157
  protected _enqueue<T = any>(task: TaskLike, prepend: boolean): Task<T> {
1✔
158
    if (this.maxQueue && this.size >= this.maxQueue)
23✔
159
      throw new Error(`Queue limit (${this.maxQueue}) exceeded`);
23✔
160
    const taskInstance = task instanceof Task ? task : new Task(task);
23✔
161
    Object.defineProperty(taskInstance, "_isManaged", {
23✔
162
      configurable: false,
23✔
163
      writable: false,
23✔
164
      enumerable: false,
23✔
165
      value: true,
23✔
166
    });
23✔
167
    taskInstance.once("error", (...args: any[]) =>
23✔
168
      this.emitAsync("error", ...args),
2✔
169
    );
23✔
170
    this.emit("enqueue", taskInstance);
23✔
171
    if (prepend) this._queue.unshift(taskInstance);
23✔
172
    else this._queue.push(taskInstance);
21✔
173
    if (taskInstance.id) this._idMap.set(taskInstance.id, taskInstance);
23!
174
    this._pulse();
22✔
175
    return taskInstance;
22✔
176
  }
22✔
177

1✔
178
  /**
1✔
179
   * Internal method to process the queue and start tasks.
1✔
180
   * @protected
1✔
181
   */
1✔
182
  protected _pulse() {
1✔
183
    if (this.paused) return;
28✔
184
    while (!this.concurrency || this._running.size < this.concurrency) {
28✔
185
      const task = this._queue.shift();
32✔
186
      if (!task) return;
32✔
187
      this._running.add(task);
19✔
188
      const id = task.id;
19✔
189
      task.prependOnceListener("finish", () => {
19✔
190
        this._running.delete(task);
19✔
191
        if (id) this._idMap.delete(id);
19!
192
        if (!(this._running.size || this._queue.length))
19✔
193
          return this.emit("finish");
19✔
194
        this._pulse();
5✔
195
      });
5✔
196
      task.start();
19✔
197
    }
19✔
198
  }
28✔
199
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc