• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

VaclavObornik / mongodash / 24614995715

18 Apr 2026 10:09PM UTC coverage: 91.226% (+0.3%) from 90.898%
24614995715

Pull #464

github

web-flow
Merge d8e515b4a into d5a9d9486
Pull Request #464: refactor(cron,runner,metrics): unify cron runners + follower metrics fix

1471 of 1726 branches covered (85.23%)

Branch coverage included in aggregate %.

54 of 56 new or added lines in 3 files covered. (96.43%)

1 existing line in 1 file now uncovered.

2293 of 2400 relevant lines covered (95.54%)

341.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.17
/src/cronTasks.ts
1
// import * as _debug from 'debug';
2
import { CronExpressionOptions } from 'cron-parser';
3
import { Collection, Document, Filter } from 'mongodb';
4
import { ConcurrentRunner } from './ConcurrentRunner';
265✔
5
import { createContinuousLock } from './createContinuousLock';
265✔
6
import { getCollection } from './getCollection';
265✔
7
import { initPromise } from './initPromise';
265✔
8
import { CompatibleFindOneAndUpdateOptions, CompatibleModifyResult } from './mongoCompatibility';
9
import { onError } from './OnError';
265✔
10
import { onInfo } from './OnInfo';
265✔
11
import { createIntervalFunction } from './parseInterval';
265✔
12

13
export interface InitOptions {
14
    runCronTasks: boolean;
15
    /**
16
     * Maximum number of cron tasks this instance will execute in parallel.
17
     *
18
     * The default of `1` preserves the historical behaviour: one task is
19
     * processed at a time per instance. Raise it when you have many
20
     * independent cron tasks and want to avoid head-of-line blocking (a
21
     * long-running task delaying unrelated ones).
22
     *
23
     * Tasks with the same id are always serialised via the per-task lock
24
     * (`lockedTill`), so raising this does not cause a single task to run
25
     * twice in parallel.
26
     */
27
    cronTaskConcurrency: number;
28
    cronExpressionParserOptions: CronExpressionOptions;
29
    cronTaskCaller: CronTaskCaller;
30
    cronTaskFilter: CronTaskFilter;
31
}
32

33
export function init(options: InitOptions): void {
265✔
34
    if (state.runner) {
259!
35
        throw new Error('Cron tasks are already running');
×
36
    }
37

38
    state.runCronTasks = options.runCronTasks;
259✔
39
    if (options.cronExpressionParserOptions.endDate) {
259✔
40
        throw new Error("The 'endDate' parameter of the cron-parser package is not supported yet.");
1✔
41
    }
42
    state.cronExpressionParserOptions = options.cronExpressionParserOptions;
258✔
43
    state.cronTaskCaller = options.cronTaskCaller;
258✔
44
    state.cronTaskFilter = options.cronTaskFilter;
258✔
45

46
    // Floor to a safe integer rather than bitwise truncation (which wraps at 32 bits).
47
    const requested = Number(options.cronTaskConcurrency);
258✔
48
    const concurrency = Math.max(1, Number.isFinite(requested) ? Math.floor(requested) : 1);
258!
49
    state.runner = new ConcurrentRunner({ concurrency }, (error) => onError(error));
258✔
50

51
    if (state.runCronTasks) {
258✔
52
        onInfo({ message: 'Cron tasks processing started', code: CODE_CRON_TASK_STARTED });
256✔
53
        if (state.tasks.size) {
256!
54
            ensureStarted();
×
55
        }
56
    }
57
}
58

59
export const CODE_CRON_TASK_STARTED = 'cronTaskStarted';
265✔
60
export const CODE_CRON_TASK_FINISHED = 'cronTaskFinished';
265✔
61
export const CODE_CRON_TASK_SCHEDULED = 'cronTaskScheduled';
265✔
62
export const CODE_CRON_TASK_FAILED = 'cronTaskFailed';
265✔
63

64
// const debug = _debug('mongodash:cronTasks');
65
const debug = (..._args: unknown[]) => {};
265✔
66

67
export type TaskFunction = () => Promise<unknown> | void;
68
export type ScalarInterval = number | string;
69
export type StaticInterval = ScalarInterval | Date;
70
export type IntervalFunction = () => StaticInterval | Promise<StaticInterval>;
71
export type Interval = ScalarInterval | IntervalFunction;
72
export type TaskId = string;
73
export type CronTaskStatus = 'locked' | 'running' | 'idle' | 'failed' | 'scheduled';
74

75
export interface CronTaskRecord {
76
    _id: TaskId;
77
    status: CronTaskStatus;
78
    nextRunAt: Date;
79
    runImmediately: boolean;
80
    lockedTill: Date | null;
81
    lastRun: {
82
        startedAt: Date;
83
        finishedAt: Date | null;
84
        error: string | null;
85
        durationMs?: number;
86
    } | null;
87
    isRegistered: boolean; // True if the task is registered in this instance
88
}
89

90
export interface CronTaskQuery {
91
    filter?: string; // Search by task ID
92
    limit?: number;
93
    skip?: number;
94
    sort?: { field: keyof CronTaskRecord; direction: 1 | -1 };
95
}
96

97
export interface CronPagedResult<T> {
98
    items: T[];
99
    total: number;
100
    limit: number;
101
    offset: number;
102
}
103

104
type Task = { taskId: TaskId; task: TaskFunction; intervalFunction: IntervalFunction };
105

106
type RunLogEntry = {
107
    startedAt: Date;
108
    finishedAt: Date | null;
109
    error: string | null;
110
};
111

112
class TaskDocument implements Document {
113
    public runImmediately = false;
204✔
114

115
    public runLog = <RunLogEntry[]>[];
204✔
116

117
    public lockedTill: Date | null = null;
204✔
118

119
    constructor(
120
        public _id: TaskId,
204✔
121
        public runSince: Date,
204✔
122
    ) {}
123
}
124

125
type EnforcedTask = {
126
    taskId: TaskId;
127
    resolve: () => void;
128
    reject: (reason: Error) => void;
129
};
130

131
const noTaskWaitTime = 5 * 1000;
265✔
132

133
const state = {
265✔
134
    tasks: new Map<string, Task>(),
135

136
    // Runner state. `runner` is created in init() and reused across
137
    // start/stop cycles. `runnerStarted` tracks whether workers are
138
    // currently looping. `runnerStopPromise` is non-null while a previous
139
    // stop is still draining; ensureStarted chains on it so a rapid
140
    // stop + start sequence waits for the previous teardown before firing
141
    // a new start - otherwise ConcurrentRunner.start() is a no-op
142
    // (isRunning still true) and the scheduler would stall with
143
    // runnerStarted=true but no live workers.
144
    runner: <ConcurrentRunner | null>null,
145
    runnerStarted: false,
146
    runnerStopPromise: <Promise<void> | null>null,
147

148
    _collection: <Collection<TaskDocument> | null>null,
149

150
    get collection(): Collection<TaskDocument> {
151
        if (!this._collection) {
1,195✔
152
            this._collection = getCollection<TaskDocument>('cronTasks');
21✔
153
        }
154
        return this._collection;
1,195✔
155
    },
156

157
    enforcedTasks: <Array<EnforcedTask>>[],
158

159
    ensureIndexPromise: <Promise<unknown> | null>null,
160

161
    // Config
162
    runCronTasks: false,
163
    cronExpressionParserOptions: <CronExpressionOptions>{},
164
    cronTaskCaller: <CronTaskCaller | null>null,
165
    cronTaskFilter: <CronTaskFilter | null>null,
166
};
167

168
const CRON_SOURCE_NAME = '_cron_tasks';
265✔
169

170
export interface CronTaskCaller {
171
    <T>(task: () => Promise<T>): Promise<T> | T;
172
}
173

174
export interface CronTaskFilter {
175
    ({ taskId }: { taskId: TaskId }): boolean;
176
}
177

178
// Removed module-level vars that are now in state or imported
179
// let taskCaller: CronTaskCaller;
180
// let taskFilter: CronTaskFilter;
181
// let onError: OnError;
182
// let onInfo: OnInfo;
183

184
function createIntervalFunctionFromScalar(interval: ScalarInterval): () => Date {
185
    return createIntervalFunction(interval, { cronOptions: state.cronExpressionParserOptions });
23✔
186
}
187

188
async function getNextRunDate(intervalFunction: IntervalFunction): Promise<Date> {
189
    const maybeDate: StaticInterval = await intervalFunction();
397✔
190
    if (maybeDate instanceof Date) {
396✔
191
        return maybeDate;
389✔
192
    }
193

194
    return createIntervalFunctionFromScalar(maybeDate)();
7✔
195
}
196

197
export async function runCronTask(taskId: TaskId): Promise<void> {
265✔
198
    if (new Error().stack?.includes('mongoDashRunTaskNotCyclic')) {
11!
199
        throw new Error('It is not possible to call runCronTask inside another running task. Use the scheduleCronTaskImmediately() function instead.');
1✔
200
    }
201

202
    debug(`runCronTask called for ${taskId}`);
10✔
203
    if (!state.tasks.has(taskId)) {
10✔
204
        throw new Error(`Cannot run unknown task '${taskId}'.`);
1✔
205
    }
206
    return new Promise((resolve, reject) => {
9✔
207
        state.enforcedTasks.push({ taskId, resolve, reject });
9✔
208
        ensureStarted();
9✔
209
    });
210
}
211

212
function ensureIndex() {
213
    if (!state.ensureIndexPromise) {
204✔
214
        state.ensureIndexPromise = Promise.all([
14✔
215
            state.collection.createIndex({ runSince: 1, _id: 1, lockedTill: 1 }, { name: 'runSinceIndex' }),
216
            state.collection.createIndex(
217
                { runImmediately: 1, _id: 1, lockedTill: 1 },
218
                { name: 'runImmediatelyIndex', partialFilterExpression: { runImmediately: { $eq: true } } },
219
            ),
220
        ]);
221
    }
222
    return state.ensureIndexPromise;
204✔
223
}
224

225
const lockTime = 5 * 60 * 1000;
265✔
226

227
function getLockDate() {
228
    return new Date(Date.now() + lockTime);
388✔
229
}
230

231
function getUnlockedFilter() {
232
    return { $or: [{ lockedTill: null }, { lockedTill: { $lt: new Date() } }] };
562✔
233
}
234

235
function getTasksToProcessFilter() {
236
    return {
553✔
237
        $and: [{ _id: { $in: Array.from(state.tasks.keys()).filter((taskId) => state.cronTaskFilter!({ taskId })) } }, getUnlockedFilter()],
28,943✔
238
    };
239
}
240

241
async function findATaskToRun(enforcedTask: EnforcedTask | null): Promise<Task | null> {
242
    let filter: Filter<TaskDocument>;
243

244
    if (enforcedTask) {
388✔
245
        filter = { $and: [{ _id: enforcedTask.taskId }, getUnlockedFilter()] };
9✔
246
    } else {
247
        filter = {
379✔
248
            $and: [{ $or: [{ runSince: { $lte: new Date() } }, { runImmediately: true }] }, getTasksToProcessFilter()],
249
        };
250
    }
251

252
    debug('finding a task', JSON.stringify(filter, null, 2));
388✔
253

254
    const result = await state.collection.findOneAndUpdate(
388✔
255
        filter,
256
        {
257
            $set: {
258
                lockedTill: getLockDate(),
259
                runImmediately: false,
260
            },
261
            $push: {
262
                runLog: {
263
                    $each: [{ startedAt: new Date(), finishedAt: null, error: null }],
264
                    $sort: { startedAt: -1 },
265
                    $slice: 5,
266
                },
267
            },
268
        },
269
        {
270
            sort: {
271
                runImmediately: -1, // prefer manual triggering
272
                runSince: 1, // prefer more delayed tasks
273
                'runLog.0.finishedAt': 1, // prefer tasks waiting longer
274
            },
275
            projection: { _id: true, runImmediately: true },
276
            includeResultMetadata: true,
277
        } as CompatibleFindOneAndUpdateOptions,
278
    );
279

280
    // Handle v4/v5+ compatibility
281
    const document = (result as unknown as CompatibleModifyResult).value;
386✔
282

283
    if (!document) {
386✔
284
        if (enforcedTask) {
197✔
285
            enforcedTask.reject(new Error('The task document not found or is locked right now.'));
2✔
286
        }
287
        return null;
197✔
288
    }
289

290
    if (!enforcedTask && !state.runCronTasks) {
189✔
291
        // the stopCronTasks has been called during finding a task, rollback the lock
292
        // we update runImmediately back only if it was truthy before
293
        const runImmediatelyUpdate = document.runImmediately ? { runImmediately: true } : null;
1!
294
        await state.collection.updateOne(
1✔
295
            { _id: document._id },
296
            {
297
                $set: {
298
                    lockedTill: null,
299
                    ...runImmediatelyUpdate,
300
                },
301
                $pop: { runLog: -1 }, // remove last runLog entry (0 index)
302
            },
303
        );
304
        return null;
1✔
305
    }
306

307
    return state.tasks.get(document._id)!;
188✔
308
}
309

310
async function processTask(task: Task, enforcedTask: EnforcedTask | null) {
311
    const stopContinuousLock = createContinuousLock(state.collection, task.taskId, 'lockedTill', lockTime);
188✔
312

313
    const processTheTask = async () => {
188✔
314
        debug(`processing task ${task.taskId} `);
188✔
315
        let taskError: Error | null = null;
188✔
316
        let nextRunDate: Date;
317
        let nextRunScheduled = false;
188✔
318

319
        const start = new Date();
188✔
320
        try {
188✔
321
            function mongoDashRunTaskNotCyclic() {
322
                onInfo({ message: `Cron task '${task.taskId}' started.`, taskId: task.taskId, code: CODE_CRON_TASK_STARTED });
188✔
323
                return task.task();
188✔
324
            }
325
            await mongoDashRunTaskNotCyclic();
188✔
326
            const duration = Date.now() - start.getTime();
184✔
327
            onInfo({ message: `Cron task '${task.taskId}' finished in ${duration} ms.`, taskId: task.taskId, code: CODE_CRON_TASK_FINISHED, duration });
184✔
328
        } catch (err) {
329
            const duration = Date.now() - start.getTime();
4✔
330
            const reason = err instanceof Error ? err.message : `${err} `;
4!
331
            onInfo({ message: `Cron task '${task.taskId}' failed in ${duration} ms.`, taskId: task.taskId, code: CODE_CRON_TASK_FAILED, reason, duration });
4✔
332
            taskError = err as Error;
4✔
333
        }
334

335
        try {
188✔
336
            await stopContinuousLock(); // to avoid possibility of lock after the following document update
188✔
337

338
            nextRunDate = await getNextRunDate(task.intervalFunction);
188✔
339
            debug(`scheduling task ${task.taskId} to run in ${nextRunDate.getTime() - Date.now()} ms`);
187✔
340

341
            await state.collection.updateOne(
187✔
342
                { _id: task.taskId },
343
                {
344
                    $set: {
345
                        runSince: nextRunDate,
346
                        lockedTill: null,
347
                        'runLog.0.error': taskError ? `${taskError} ` : null,
187✔
348
                        'runLog.0.finishedAt': new Date(),
349
                    },
350
                },
351
            );
352

353
            nextRunScheduled = true;
185✔
354
        } finally {
355
            if (enforcedTask) {
188✔
356
                if (taskError) {
6✔
357
                    enforcedTask.reject(taskError);
1✔
358
                } else {
359
                    enforcedTask.resolve();
5✔
360
                }
361
            } else if (taskError) {
182✔
362
                onError(taskError);
3✔
363
            }
364

365
            // we want to inform about the scheduling after the
366
            if (nextRunScheduled) {
188✔
367
                onInfo({
185✔
368
                    message: `Cron task '${task.taskId}' scheduled to ${nextRunDate!.toISOString()}.`,
369
                    taskId: task.taskId,
370
                    code: CODE_CRON_TASK_SCHEDULED,
371
                    nextRunDate: new Date(nextRunDate!.toISOString()),
372
                });
373
            }
374
        }
375
    };
376

377
    try {
188✔
378
        await state.cronTaskCaller!(processTheTask);
188✔
379
    } catch (err) {
380
        // todo revise why we need to do this
381
        // this should fix situations when the _taskCaller has a problem
382
        await stopContinuousLock();
3✔
383
        onError(err as Error);
3✔
384
    }
385
}
386

387
/** Can never throw. */
388
async function getWaitTimeByNextTask(): Promise<number> {
389
    try {
174✔
390
        const nextTask = await state.collection.findOne(getTasksToProcessFilter(), {
174✔
391
            projection: { runSince: 1 },
392
            sort: { runSince: 1 },
393
        });
394

395
        if (!nextTask) {
173✔
396
            return noTaskWaitTime;
55✔
397
        }
398

399
        const timeToNext = nextTask.runSince.getTime() - Date.now();
118✔
400
        return Math.min(Math.max(timeToNext, 0), noTaskWaitTime);
118✔
401
    } catch (error) {
402
        onError(error as Error);
1✔
403
        return noTaskWaitTime;
1✔
404
    }
405
}
406

407
// --- Scheduler ---------------------------------------------------------
408
// One ConcurrentRunner-driven loop for all concurrencies. Each worker
409
// polls the cron collection; `findATaskToRun` uses an atomic
410
// findOneAndUpdate on `lockedTill` so a task is serialised against itself
411
// regardless of how many workers are running. speedUp after a successful
412
// run skips the runner's back-off so a burst of pending tasks drains
413
// quickly; setNextRunAt on an empty poll defers the next tick to the
414
// known runSince of the soonest task.
415

416
async function runATask(): Promise<void> {
417
    await initPromise;
388✔
418
    const enforcedTask = state.enforcedTasks.shift() || null;
388✔
419
    let task: Task | null = null;
388✔
420

421
    try {
388✔
422
        task = await findATaskToRun(enforcedTask);
388✔
423
        if (!task) {
386✔
424
            debug('no pending task found');
198✔
425
            return;
198✔
426
        }
427

428
        await processTask(task, enforcedTask);
188✔
429
        // A task just completed - there may be another ready right now, so
430
        // poll again immediately instead of applying back-off.
431
        state.runner?.speedUp(CRON_SOURCE_NAME);
188!
432
    } catch (error) {
433
        debug(`Catch error ${error}`);
2✔
434
        if (enforcedTask) {
2✔
435
            enforcedTask.reject(error as Error);
1✔
436
        } else {
437
            onError(error as Error);
1✔
438
        }
439
    } finally {
440
        if (!task && state.runner && state.runnerStarted && state.enforcedTasks.length === 0) {
388✔
441
            if (state.runCronTasks) {
174!
442
                const waitMs = await getWaitTimeByNextTask();
174✔
443
                // ConcurrentRunner's setNextRunAt no-ops if a concurrent
444
                // speedUp() already pulled nextRunAt to the past, so our
445
                // potentially-stale waitMs cannot swallow that signal.
446
                state.runner.setNextRunAt(CRON_SOURCE_NAME, Date.now() + waitMs);
174✔
447
            } else {
448
                // Once runCronTasks has been turned off and there is no
449
                // enforced task to run we stop polling entirely. A later
450
                // runCronTask() / startCronTasks() will re-enter
451
                // ensureStarted which restarts the runner.
UNCOV
452
                state.runnerStarted = false;
×
453
                // Track the stop promise here too so a rapid runCronTask()
454
                // call that arrives mid-drain chains on it in ensureStarted
455
                // (see the same race covered by stopCronTasks).
456
                state.runnerStopPromise = state.runner.stop().catch((err) => {
×
457
                    onError(err as Error);
×
458
                });
459
            }
460
        }
461
    }
462
}
463

464
function ensureStarted(): void {
465
    // If a previous stopCronTasks() is still draining the runner, wait
466
    // for it before (re)starting. Without this the inner start() call
467
    // would no-op (isRunning still true) and we would end up with
468
    // runnerStarted=true but no actual workers.
469
    if (state.runnerStopPromise) {
316✔
470
        const pending = state.runnerStopPromise;
51✔
471
        pending
51✔
472
            .then(() => {
473
                if (state.runnerStopPromise === pending) {
51!
474
                    state.runnerStopPromise = null;
51✔
475
                }
476
                // Re-evaluate: cron may have been stopped again in the
477
                // meantime or runCronTasks may have flipped off.
478
                if (state.runCronTasks || state.enforcedTasks.length > 0) {
51!
479
                    ensureStarted();
51✔
480
                }
481
            })
NEW
482
            .catch((err) => onError(err as Error));
×
483
        return;
51✔
484
    }
485

486
    if (!state.runner!.hasSource(CRON_SOURCE_NAME)) {
265✔
487
        state.runner!.registerSource(CRON_SOURCE_NAME, {
13✔
488
            minPollMs: 200,
489
            maxPollMs: noTaskWaitTime,
490
            jitterMs: 100,
491
        });
492
    }
493
    if (!state.runnerStarted) {
265✔
494
        debug('STARTING RUNNER');
64✔
495
        state.runnerStarted = true;
64✔
496
        state.runner!.start(() => runATask());
388✔
497
    } else {
498
        state.runner!.speedUp(CRON_SOURCE_NAME);
201✔
499
    }
500
}
501

502
export function stopCronTasks(): void {
265✔
503
    debug('STOPPING CRON TASKS');
321✔
504
    state.runCronTasks = false;
321✔
505
    if (state.runner && state.runnerStarted) {
321✔
506
        state.runnerStarted = false;
64✔
507
        // Fire and forget: the public API is synchronous (returns void).
508
        // Any in-flight tasks will finish on their own; further polls will
509
        // not happen because runnerStarted is already cleared. Track the
510
        // stop promise so ensureStarted can chain on it if the caller
511
        // restarts cron before teardown completes - see the race Copilot
512
        // flagged on PR #460.
513
        state.runnerStopPromise = state.runner.stop().catch((err) => {
64✔
514
            onError(err as Error);
×
515
        });
516
    }
517
}
518

519
export function startCronTasks(): void {
265✔
520
    state.runCronTasks = true;
75✔
521
    if (state.tasks.size) {
75✔
522
        ensureStarted();
62✔
523
    }
524
}
525

526
export async function scheduleCronTaskImmediately(taskId: TaskId): Promise<void> {
265✔
527
    const { matchedCount } = await state.collection.updateOne({ _id: taskId }, { $set: { runImmediately: true } });
5✔
528
    if (!matchedCount) {
5✔
529
        throw new Error(`No task with id "${taskId}" is registered.`);
2✔
530
    }
531
    if (state.runCronTasks && state.tasks.has(taskId)) {
3✔
532
        ensureStarted();
1✔
533
    }
534
}
535

536
export async function cronTask(taskId: TaskId, interval: Interval, task: TaskFunction): Promise<void> {
265✔
537
    await initPromise;
213✔
538

539
    if (state.tasks.has(taskId)) {
213✔
540
        throw new Error(`The taskId '${taskId}' is already used.`);
1✔
541
    }
542

543
    const intervalFunction = typeof interval === 'function' ? interval : createIntervalFunctionFromScalar(interval);
212✔
544
    const nextRun = await getNextRunDate(intervalFunction);
209✔
545

546
    const document = new TaskDocument(taskId, nextRun);
204✔
547
    const { _id, ...documentWithoutId } = document;
204✔
548
    await state.collection.updateOne({ _id: document._id }, { $setOnInsert: documentWithoutId }, { upsert: true });
204✔
549

550
    state.tasks.set(taskId, {
204✔
551
        taskId,
552
        task,
553
        intervalFunction,
554
    });
555
    debug(`task ${taskId} has been registered`);
204✔
556

557
    await ensureIndex();
204✔
558

559
    // if the cron tasks are logically running, ensure the loop is running
560
    if (state.runCronTasks) {
204✔
561
        ensureStarted();
193✔
562
    }
563
}
564

565
/**
566
 * Lists cron tasks with pagination and sorting.
567
 */
568
export async function getCronTasksList(query: CronTaskQuery = {}): Promise<CronPagedResult<CronTaskRecord>> {
265!
569
    const limit = query.limit ?? 50;
10✔
570
    const skip = query.skip ?? 0;
10✔
571

572
    let sortField = query.sort?.field || 'runSince';
10✔
573
    if (sortField === 'nextRunAt') sortField = 'runSince';
10✔
574
    const sort = { [sortField]: query.sort?.direction ?? 1 } as Record<string, 1 | -1>;
10✔
575

576
    let localTaskIds = Array.from(state.tasks.keys());
10✔
577

578
    if (query.filter) {
10✔
579
        const regex = new RegExp(query.filter, 'i');
1✔
580
        localTaskIds = localTaskIds.filter((id) => regex.test(id));
3✔
581
    }
582

583
    const filter: Filter<TaskDocument> = {
10✔
584
        _id: { $in: localTaskIds },
585
    };
586

587
    const [docs, total] = await Promise.all([
10✔
588
        state.collection.find(filter).sort(sort).skip(skip).limit(limit).toArray(),
589
        state.collection.countDocuments(filter),
590
    ]);
591

592
    const items: CronTaskRecord[] = docs.map((doc) => {
10✔
593
        const lastRun = doc.runLog[0] || null;
9✔
594
        let lastRunData = null;
9✔
595

596
        if (lastRun) {
9✔
597
            lastRunData = {
1✔
598
                startedAt: lastRun.startedAt,
599
                finishedAt: lastRun.finishedAt,
600
                error: lastRun.error,
601
                durationMs: lastRun.finishedAt ? lastRun.finishedAt.getTime() - lastRun.startedAt.getTime() : undefined,
1!
602
            };
603
        }
604

605
        let status: CronTaskStatus = 'idle';
9✔
606
        if (doc.lockedTill && doc.lockedTill > new Date()) {
9✔
607
            status = 'locked';
1✔
608
            // We can assume 'running' if locked, unless just failed/finished and lock not released yet?
609
            // Actually 'lockedTill' is set during processing.
610
            // If we want to distinguish 'running' from just 'locked', it's hard without another field.
611
            // But 'locked' usually means running or zombie.
612
            status = 'running';
1✔
613
        } else if (doc.runImmediately) {
8✔
614
            status = 'scheduled';
1✔
615
        } else if (lastRun?.error) {
7✔
616
            // Only if the LATEST run failed and we are not currently running
617
            status = 'failed';
1✔
618
        }
619

620
        return {
9✔
621
            _id: doc._id,
622
            status,
623
            nextRunAt: doc.runSince,
624
            runImmediately: doc.runImmediately,
625
            lockedTill: doc.lockedTill,
626
            lastRun: lastRunData,
627
            isRegistered: state.tasks.has(doc._id),
628
        };
629
    });
630

631
    return {
10✔
632
        items,
633
        total,
634
        limit,
635
        offset: skip,
636
    };
637
}
638

639
/**
640
 * @deprecated Alias for {@link scheduleCronTaskImmediately}. Prefer that name for
641
 * clarity - it describes exactly what happens (the task is scheduled to run on
642
 * the next polling tick, not necessarily this very millisecond). This alias will
643
 * be removed in a future major version.
644
 */
645
export async function triggerCronTask(taskId: TaskId): Promise<void> {
265✔
646
    return scheduleCronTaskImmediately(taskId);
×
647
}
648
/**
649
 * Returns IDs of all registered cron tasks in this instance.
650
 */
651
export function getRegisteredCronTaskIds(): string[] {
265✔
652
    return Array.from(state.tasks.keys()).sort();
×
653
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc