• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

VaclavObornik / mongodash / 21093867550

17 Jan 2026 11:56AM UTC coverage: 91.559% (-0.2%) from 91.769%
21093867550

push

github

VaclavObornik
2.5.0

1328 of 1554 branches covered (85.46%)

Branch coverage included in aggregate %.

2143 of 2237 relevant lines covered (95.8%)

364.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.29
/src/ConcurrentRunner.ts
1
import * as _debug from 'debug';
246✔
2
import { defaultOnError, OnError } from './OnError';
246✔
3

4
const debug = _debug('mongodash:ConcurrentRunner');
246✔
5

6
export interface ConcurrentRunnerOptions {
7
    concurrency: number;
8
}
9

10
export interface SourceOptions {
11
    minPollMs: number;
12
    maxPollMs: number;
13
    jitterMs: number;
14
}
15

16
interface SourceState {
17
    name: string;
18
    options: SourceOptions;
19
    nextRunAt: number;
20
    currentBackoff: number;
21
}
22

23
export type TryRunATaskCallback = (sourceName: string) => Promise<void>;
24

25
export class ConcurrentRunner {
246✔
26
    private options: ConcurrentRunnerOptions;
27
    private sources: Map<string, SourceState> = new Map();
252✔
28
    private isRunning = false;
252✔
29
    private workers: Promise<void>[] = [];
252✔
30
    private wakeUpSignals: (() => void)[] = [];
252✔
31
    private tryRunATask: TryRunATaskCallback | null = null;
252✔
32
    private _activeWorkerCount = 0;
252✔
33

34
    constructor(
35
        options: ConcurrentRunnerOptions,
36
        private onError: OnError = defaultOnError,
252✔
37
    ) {
38
        this.options = options;
252✔
39
    }
40

41
    public registerSource(name: string, options: SourceOptions): void {
42
        if (this.sources.has(name)) {
149✔
43
            throw new Error(`Source ${name} is already registered.`);
1✔
44
        }
45
        this.sources.set(name, {
148✔
46
            name,
47
            options,
48
            nextRunAt: Date.now(),
49
            currentBackoff: options.minPollMs,
50
        });
51
        this.wakeUpOneWorker();
148✔
52
    }
53

54
    public hasSource(name: string): boolean {
55
        return this.sources.has(name);
140✔
56
    }
57

58
    public start(tryRunATask: TryRunATaskCallback): void {
59
        if (this.isRunning) return;
158✔
60
        this.isRunning = true;
157✔
61
        this.tryRunATask = tryRunATask;
157✔
62

63
        for (let i = 0; i < this.options.concurrency; i++) {
157✔
64
            this.workers.push(this.runWorker());
710✔
65
        }
66
        debug(`Started with ${this.options.concurrency} workers`);
157✔
67
    }
68

69
    public async stop(): Promise<void> {
70
        if (!this.isRunning) return;
161✔
71
        this.isRunning = false;
157✔
72
        this.wakeUpAllWorkers();
157✔
73
        await Promise.all(this.workers);
157✔
74
        this.workers = [];
157✔
75
        debug('Stopped');
157✔
76
    }
77

78
    public speedUp(sourceName: string): void {
79
        const state = this.sources.get(sourceName);
954✔
80
        if (state) {
954✔
81
            // Reset backoff and schedule immediately
82
            state.currentBackoff = state.options.minPollMs;
953✔
83
            state.nextRunAt = Date.now();
953✔
84
            this.wakeUpOneWorker();
953✔
85
            debug(`SpeedUp called for ${sourceName}`);
953✔
86
        }
87
    }
88

89
    public updateAllSources(options: Partial<SourceOptions>): void {
90
        for (const state of this.sources.values()) {
6✔
91
            state.options = { ...state.options, ...options };
×
92
            // If we are lowering the minPollMs, we should probably also lower the current backoff
93
            // to respect the new settings immediately.
94
            if (options.minPollMs !== undefined && state.currentBackoff > options.minPollMs) {
×
95
                state.currentBackoff = options.minPollMs;
×
96
            }
97
            if (options.maxPollMs !== undefined && state.currentBackoff > options.maxPollMs) {
×
98
                state.currentBackoff = options.maxPollMs;
×
99
            }
100
        }
101
        // Wake up everyone to pick up new schedule/backoff
102
        this.wakeUpAllWorkers();
6✔
103
    }
104

105
    private async runWorker(): Promise<void> {
106
        while (this.isRunning) {
710✔
107
            const now = Date.now();
9,917✔
108
            let bestSource: SourceState | null = null;
9,917✔
109
            let minNextRunAt = Infinity;
9,917✔
110

111
            // Find the source that needs to run soonest
112
            for (const state of this.sources.values()) {
9,917✔
113
                if (state.nextRunAt < minNextRunAt) {
27,120✔
114
                    minNextRunAt = state.nextRunAt;
16,097✔
115
                    bestSource = state;
16,097✔
116
                }
117
            }
118

119
            if (bestSource && minNextRunAt <= now) {
9,917✔
120
                // Run task for this source
121
                const state = bestSource;
1,765✔
122

123
                // we always prolong the next run and schedule the next search before the current search
124
                // if there is a task found, the tryRunATask is suppsed to call the speedUp method,
125
                // which will reset the backoff
126
                this.prolongNextRun(state.name);
1,765✔
127

128
                try {
1,765✔
129
                    this._activeWorkerCount++;
1,765✔
130
                    await this.tryRunATask!(state.name);
1,765✔
131
                } catch (e) {
132
                    this.onError(e as Error);
1✔
133
                } finally {
134
                    this._activeWorkerCount--;
1,765✔
135
                }
136
            } else {
137
                // No source is ready to run. Sleep until the nearest scheduled time.
138
                let timeToWait = 0;
8,152✔
139
                if (minNextRunAt === Infinity) {
8,152✔
140
                    timeToWait = 1000; // Default wait if no sources
74✔
141
                } else {
142
                    timeToWait = Math.max(0, minNextRunAt - now);
8,078✔
143
                }
144
                await this.sleep(timeToWait);
8,152✔
145
            }
146
        }
147
    }
148

149
    private prolongNextRun(sourceName: string): void {
150
        const state = this.sources.get(sourceName)!;
1,765✔
151
        const sleepTime = state.currentBackoff + Math.random() * state.options.jitterMs;
1,765✔
152
        state.nextRunAt = Date.now() + sleepTime;
1,765✔
153
        // Increase backoff for next time
154
        state.currentBackoff = Math.min(state.currentBackoff * 2, state.options.maxPollMs);
1,765✔
155
    }
156

157
    private sleep(ms: number): Promise<void> {
158
        return new Promise<void>((resolve) => {
8,152✔
159
            if (ms <= 0) return resolve();
8,152!
160

161
            let timer: NodeJS.Timeout;
162
            const wakeUp = () => {
8,152✔
163
                clearTimeout(timer);
8,152✔
164
                // Remove this wakeUp from the list if it's there (it might be called by speedUp)
165
                const index = this.wakeUpSignals.indexOf(wakeUp);
8,152✔
166
                if (index !== -1) {
8,152✔
167
                    this.wakeUpSignals.splice(index, 1);
6,717✔
168
                }
169
                resolve();
8,152✔
170
            };
171

172
            this.wakeUpSignals.push(wakeUp);
8,152✔
173
            timer = setTimeout(wakeUp, ms);
8,152✔
174
        });
175
    }
176

177
    private wakeUpOneWorker(): void {
178
        const wakeUp = this.wakeUpSignals.shift();
1,101✔
179
        if (wakeUp) {
1,101✔
180
            wakeUp();
780✔
181
        }
182
    }
183

184
    private wakeUpAllWorkers(): void {
185
        while (this.wakeUpSignals.length > 0) {
163✔
186
            const wakeUp = this.wakeUpSignals.shift();
655✔
187
            if (wakeUp) {
655!
188
                wakeUp();
655✔
189
            }
190
        }
191
    }
192

193
    public get activeWorkers(): number {
194
        return this._activeWorkerCount;
124✔
195
    }
196
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc