• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

daycry / cronjob / 18288101817

06 Oct 2025 04:45PM UTC coverage: 68.511% (-1.0%) from 69.514%
18288101817

push

github

daycry
Update README.md

483 of 705 relevant lines covered (68.51%)

5.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.46
/src/JobRunner.php
1
<?php
2

3
declare(strict_types=1);
4

5
namespace Daycry\CronJob;
6

7
use CodeIgniter\CLI\CLI;
8
use CodeIgniter\I18n\Time;
9
use Config\Services;
10
use DateTime;
11
use Daycry\CronJob\Config\CronJob as BaseConfig;
12
use Daycry\CronJob\Exceptions\TaskAlreadyRunningException;
13
use Daycry\CronJob\Exceptions\TimeoutException;
14
use Daycry\CronJob\Interfaces\MetricsExporterInterface;
15
use Daycry\CronJob\Exceptions\CronJobException;
16
use Throwable;
17
use CodeIgniter\Events\Events;
18

19
/**
20
 * Class JobRunner
21
 *
22
 * Handles the execution and management of scheduled jobs.
23
 */
24
class JobRunner
25
{
26
    protected Scheduler $scheduler;
27
    protected ?Time $testTime = null;
28
    protected BaseConfig $config;
29
    protected ?MetricsExporterInterface $metricsExporter = null;
30
    /**
31
     * Internal flag indicating a graceful shutdown request.
32
     */
33
    private bool $stopRequested = false;
34

35
    /**
36
     * @var list<Job>
37
     */
38
    protected array $jobs = [];
39

40
    /**
41
     * @var list<string>
42
     */
43
    protected array $only = [];
44

45
    /**
46
     * JobRunner constructor.
47
     */
48
    public function __construct(?BaseConfig $config = null)
49
    {
50
        $this->config    = $config ?: config('CronJob');
14✔
51
        $this->scheduler = service('scheduler');
14✔
52
        $this->maybeRegisterSignalHandlers();
14✔
53
    }
54

55
    /**
56
     * Optionally inject a metrics exporter implementation.
57
     * Fluent so it can be chained in tests or setup code.
58
     */
59
    public function withMetricsExporter(MetricsExporterInterface $exporter): self
60
    {
61
        $this->metricsExporter = $exporter;
2✔
62
        return $this;
2✔
63
    }
64

65
    /**
66
     * Hook: called before a job is executed.
67
     * Override or extend for custom behavior.
68
     */
69
    protected function beforeJob(Job $job): void
70
    {
71
        // Default empty implementation; subclasses may override.
72
        // (Events are fired inside run() loop with correct attempt number.)
73
    }
10✔
74

75
    /**
76
     * Hook: called after a job is executed.
77
     * Override or extend for custom behavior.
78
     *
79
     * @param mixed $result
80
     */
81
    protected function afterJob(Job $job, $result, ?Throwable $error): void
82
    {
83
        // Default empty implementation; subclasses may override.
84
    }
10✔
85

86
    /**
87
     * Runs all scheduled jobs, respecting dependencies, retries, and hooks.
88
     * Usa topological sort y mide tiempos de ejecución.
89
     */
90
    public function run(): void
91
    {
92
        $this->jobs = [];
14✔
93
        $order      = $this->scheduler->getExecutionOrder();
14✔
94
        $metrics    = [];
14✔
95

96
        foreach ($order as $task) {
14✔
97
            if ($this->stopRequested) {
13✔
98
                $this->cliWrite('Graceful shutdown requested. Stopping further job dispatch.', 'yellow');
1✔
99
                break;
1✔
100
            }
101
            if ($this->shouldSkipTask($task)) {
12✔
102
                $this->fire('cronjob.skipped', [
5✔
103
                    'job'    => $task,
5✔
104
                    'reason' => 'filter_or_schedule',
5✔
105
                ]);
5✔
106
                continue;
5✔
107
            }
108
            $result       = null;
11✔
109
            $error        = null;
11✔
110
            $retries      = $task->getMaxRetries() ?? 0;
11✔
111
            $attempt      = 0;
11✔
112

113
            do {
114
                $attempt++;
11✔
115
                // Call subclass hook first
116
                $this->beforeJob($task);
11✔
117
                $this->fire('cronjob.beforeJob', [
11✔
118
                    'job'     => $task,
11✔
119
                    'attempt' => $attempt,
11✔
120
                ]);
11✔
121
                $attemptStart = microtime(true);
11✔
122
                $duration = null; // initialize
11✔
123
                try {
124
                    $result   = $this->processTask($task);
11✔
125
                    $duration = microtime(true) - $attemptStart;
10✔
126
                    $metrics[$task->getName()][] = $duration;
10✔
127
                    $error    = null;
10✔
128
                } catch (Throwable $e) {
3✔
129
                    $error = $e;
3✔
130
                    $duration = microtime(true) - $attemptStart;
3✔
131
                    // Also record failed attempt duration (helps diagnostics)
132
                    $metrics[$task->getName()][] = $duration;
3✔
133
                    if ($attempt > $retries) {
3✔
134
                        $this->handleTaskError($task, $e);
2✔
135
                        $this->fire('cronjob.failed', [
2✔
136
                            'job'      => $task,
2✔
137
                            'exception'=> $e,
2✔
138
                            'attempts' => $attempt,
2✔
139
                        ]);
2✔
140
                        // Fall-through to afterJob firing below
141
                    } else {
142
                        // Sleep with backoff before retrying if configured
143
                        $delay = $this->computeBackoffDelay($attempt);
3✔
144
                        if ($delay > 0) {
3✔
145
                            $this->cliWrite('Retrying ' . $task->getName() . ' in ' . $delay . 's', 'yellow');
1✔
146
                            $this->fire('cronjob.retryScheduled', [
1✔
147
                                'job'     => $task,
1✔
148
                                'attempt' => $attempt,
1✔
149
                                'delay'   => $delay,
1✔
150
                            ]);
1✔
151
                            sleep($delay);
1✔
152
                        }
153
                        // Fire afterJob for this failed attempt before continuing to next retry
154
                        $this->afterJob($task, $result, $error);
3✔
155
                        $this->fire('cronjob.afterJob', [
3✔
156
                            'job'     => $task,
3✔
157
                            'result'  => $result,
3✔
158
                            'error'   => $error,
3✔
159
                            'attempt' => $attempt,
3✔
160
                            'duration'=> $duration,
3✔
161
                        ]);
3✔
162
                        if ($this->metricsExporter) {
3✔
163
                            $this->metricsExporter->recordAttempt($task->getName(), false, $duration, $attempt, false);
1✔
164
                        }
165
                        continue; // Retry loop
3✔
166
                    }
167
                }
168
                // Call subclass hook first
169
                $this->afterJob($task, $result, $error);
11✔
170
                $this->fire('cronjob.afterJob', [
11✔
171
                    'job'     => $task,
11✔
172
                    'result'  => $result,
11✔
173
                    'error'   => $error,
11✔
174
                    'attempt' => $attempt,
11✔
175
                    'duration'=> $duration,
11✔
176
                ]);
11✔
177
                if ($this->metricsExporter) {
11✔
178
                    $this->metricsExporter->recordAttempt(
2✔
179
                        $task->getName(),
2✔
180
                        $error === null,
2✔
181
                        (float) $duration,
2✔
182
                        $attempt,
2✔
183
                        $error === null || $attempt >= ($task->getMaxRetries() ?? 0) + 1
2✔
184
                    );
2✔
185
                }
186
            } while ($error && $attempt <= $retries);
11✔
187
            // Only mark as executed after attempts complete (success or handled failure)
188
            $this->jobs[] = $task;
11✔
189
        }
190
        $this->reportMetrics($metrics);
14✔
191
        $this->fire('cronjob.metrics.flush', [
14✔
192
            'metrics'    => $metrics,
14✔
193
            'generatedAt'=> new \DateTimeImmutable(),
14✔
194
        ]);
14✔
195
        if ($this->metricsExporter) {
14✔
196
            // Give exporter a chance to publish. We ignore return to keep runner decoupled.
197
            $this->metricsExporter->flush();
2✔
198
        }
199
        if ($this->stopRequested) {
14✔
200
            $this->fire('cronjob.shutdown', [
1✔
201
                'when'     => new \DateTimeImmutable(),
1✔
202
                'executed' => array_map(static fn($j) => $j->getName(), $this->jobs),
1✔
203
            ]);
1✔
204
        }
205
    }
206

207
    /**
208
     * Reporta métricas de ejecución de jobs (puedes personalizar para logs, alertas, etc).
209
     */
210
    protected function reportMetrics(array $metrics): void
211
    {
212
        foreach ($metrics as $job => $runs) {
14✔
213
            $avg = array_sum($runs) / count($runs);
11✔
214
            $this->cliWrite("[METRIC] Job '{$job}' average duration: " . number_format($avg, 4) . 's', 'yellow');
11✔
215
        }
216
    }
217

218
    /**
219
     * Determines if a task should be skipped.
220
     *
221
     * @param Job $task
222
     */
223
    protected function shouldSkipTask($task): bool
224
    {
225
        return (! empty($this->only) && ! in_array($task->getName(), $this->only, true))
12✔
226
               || (! $task->shouldRun($this->testTime) && empty($this->only));
12✔
227
    }
228

229
    /**
230
     * Processes a single task and returns the result.
231
     *
232
     * @param Job $task
233
     *
234
     * @return mixed
235
     */
236
    protected function processTask($task)
237
    {
238
        $error    = null;
11✔
239
        $start    = Time::now();
11✔
240
        $output   = null;
11✔
241
        $timeout  = $task->getTimeout() ?? $this->config->defaultTimeout;
11✔
242
        $t0       = microtime(true);
11✔
243

244
        $this->cliWrite('Processing: ' . ($task->getName() ?: 'Task'), 'green');
11✔
245
        $task->startLog();
11✔
246

247
        try {
248
            $this->validateTask($task);
11✔
249
            $output = $task->run() ?: \ob_get_contents();
11✔
250
            $this->cliWrite('Executed: ' . ($task->getName() ?: 'Task'), 'cyan');
10✔
251
            if ($timeout && (microtime(true) - $t0) > $timeout) {
10✔
252
                $this->fire('cronjob.timeout', [
×
253
                    'job'           => $task,
×
254
                    'timeoutSeconds'=> $timeout,
×
255
                ]);
×
256
                throw TimeoutException::forJob($task, $timeout);
10✔
257
            }
258
        } catch (Throwable $e) {
3✔
259
            $this->handleTaskError($task, $e);
3✔
260
            $error = $e;
3✔
261

262
            throw $e;
3✔
263
        } finally {
264
            $this->finalizeTask($task, $start, $output, $error);
11✔
265
        }
266

267
        return $output;
10✔
268
    }
269

270
    /**
271
     * Compute delay (seconds) before next retry based on config and attempt number.
272
     * Attempt here is 1-based (i.e. first retry attempt number > 1 triggers backoff).
273
     */
274
    protected function computeBackoffDelay(int $attempt): int
275
    {
276
        // No delay before first run; only apply when attempt > 1
277
        if ($attempt <= 1) {
3✔
278
            return 0;
3✔
279
        }
280
        $strategy = $this->config->retryBackoffStrategy;
3✔
281
        if ($strategy === 'none') {
3✔
282
            return 0;
2✔
283
        }
284
        $base = max(1, $this->config->retryBackoffBase);
1✔
285
        $delay = $base;
1✔
286
        if ($strategy === 'fixed') {
1✔
287
            $delay = $base;
1✔
288
        } elseif ($strategy === 'exponential') {
×
289
            $multiplier = $this->config->retryBackoffMultiplier > 0 ? $this->config->retryBackoffMultiplier : 2.0;
×
290
            $delay      = (int) round($base * ($multiplier ** ($attempt - 2))); // attempt 2 => base * multiplier^0
×
291
        }
292
        $delay = min($delay, $this->config->retryBackoffMax);
1✔
293
        if ($this->config->retryBackoffJitter) {
1✔
294
            $jitterRange = (int) max(1, round($delay * 0.15));
×
295
            $delta       = random_int(-$jitterRange, $jitterRange);
×
296
            $delay       = max(1, $delay + $delta);
×
297
        }
298
        return $delay;
1✔
299
    }
300

301
    /**
302
     * Validates a task before execution.
303
     *
304
     * @param Job $task
305
     *
306
     * @throws Exception|TaskAlreadyRunningException
307
     */
308
    protected function validateTask($task): void
309
    {
310
        if (! $task->saveRunningFlag(true) && $task->getRunType() === 'single') {
11✔
311
            throw new TaskAlreadyRunningException($task);
×
312
        }
313
        if (! $task->status()) {
11✔
314
            throw new CronJobException(($task->getName() ?: 'Task') . ' is disabled.', 100);
×
315
        }
316
    }
317

318
    /**
319
     * Handles errors during task execution.
320
     *
321
     * @param Job $task
322
     */
323
    protected function handleTaskError($task, Throwable $e): void
324
    {
325
        $this->cliWrite('Failed: ' . ($task->getName() ?: 'Task'), 'red');
3✔
326
        log_message('error', $e->getMessage(), $e->getTrace());
3✔
327
    }
328

329
    /**
330
     * Finalizes a task after execution.
331
     *
332
     * @param Job $task
333
     */
334
    protected function finalizeTask($task, Time $start, ?string $output, ?Throwable $error): void
335
    {
336
        if ($task->shouldRunInBackground()) {
11✔
337
            return;
×
338
        }
339
        if (! $error instanceof TaskAlreadyRunningException) {
11✔
340
            $task->saveRunningFlag(false);
11✔
341
        }
342
        $task->saveLog($output, $error instanceof \Throwable ? $error->getMessage() : $error);
11✔
343
        $this->sendCronJobFinishesEmailNotification($task, $start, $output, $error);
11✔
344
    }
345

346
    /**
347
     * Sends an email notification when a job finishes.
348
     */
349
    public function sendCronJobFinishesEmailNotification(
350
        Job $task,
351
        Time $startAt,
352
        ?string $output = null,
353
        ?Throwable $error = null,
354
    ): void {
355
        if (! $this->config->notification) {
11✔
356
            return;
11✔
357
        }
358
        $email  = Services::email();
×
359
        $parser = Services::parser();
×
360
        $email->setMailType('html');
×
361
        $email->setFrom($this->config->from, $this->config->fromName);
×
362
        $email->setTo($this->config->to);
×
363
        $email->setSubject($parser->setData(['job' => $task->getName()])->renderString(lang('CronJob.emailSubject')));
×
364
        $email->setMessage($parser->setData([
×
365
            'name'     => $task->getName(),
×
366
            'runStart' => $startAt,
×
367
            'duration' => $task->duration(),
×
368
            'output'   => $output,
×
369
            'error'    => $error,
×
370
        ])->render('Daycry\CronJob\Views\email_notification'));
×
371
        $email->send();
×
372
    }
373

374
    /**
375
     * Restrict execution to only the specified jobs.
376
     *
377
     * @param list<string> $jobs
378
     *
379
     * @return $this
380
     */
381
    public function only(array $jobs = []): self
382
    {
383
        $this->only = $jobs;
1✔
384

385
        return $this;
1✔
386
    }
387

388
    /**
389
     * Get the list of jobs executed in this run.
390
     *
391
     * @return list<Job>
392
     */
393
    public function getJobs(): array
394
    {
395
        return $this->jobs;
3✔
396
    }
397

398
    /**
399
     * Set a test time for job execution (for testing purposes).
400
     *
401
     * @return $this
402
     */
403
    public function withTestTime(string $time): self
404
    {
405
        $this->testTime = Time::createFromInstance(new DateTime($time));
4✔
406

407
        return $this;
4✔
408
    }
409

410
    /**
411
     * Writes output to the CLI if running in CLI mode.
412
     */
413
    protected function cliWrite(string $text, ?string $foreground = null): void
414
    {
415
        if (defined('ENVIRONMENT') && ENVIRONMENT === 'testing') {
12✔
416
            return;
12✔
417
        }
418
        if (! is_cli()) {
×
419
            return;
×
420
        }
421
        CLI::write('[' . date('Y-m-d H:i:s') . '] ' . $text, $foreground);
×
422
    }
423

424
    /**
425
     * Fire internal event if enabled. Exceptions from listeners are swallowed (logged at warning level).
426
     * @param array<string,mixed> $payload
427
     */
428
    private function fire(string $event, array $payload = []): void
429
    {
430
        if (! ($this->config->enableEvents ?? true)) {
14✔
431
            return;
×
432
        }
433
        try {
434
            Events::trigger($event, $payload);
14✔
435
        } catch (\Throwable $e) {
×
436
            log_message('warning', 'CronJob event listener error on ' . $event . ': ' . $e->getMessage());
×
437
        }
438
    }
439

440
    /**
441
     * Public API to request a graceful stop from user-land code (tests or external controller).
442
     */
443
    public function requestStop(): void
444
    {
445
        $this->stopRequested = true;
1✔
446
    }
447

448
    /**
449
     * Register POSIX signal handlers if enabled and environment supports it.
450
     */
451
    private function maybeRegisterSignalHandlers(): void
452
    {
453
        if (! ($this->config->enableSignals ?? false)) {
14✔
454
            return;
×
455
        }
456
        if (! function_exists('pcntl_signal')) {
14✔
457
            return; // extension not available
×
458
        }
459
        if (! is_cli()) {
14✔
460
            return; // only relevant in CLI context
×
461
        }
462
        // Use static to avoid multiple registrations
463
        static $registered = false;
14✔
464
        if ($registered) {
14✔
465
            return;
13✔
466
        }
467
        $registered = true;
1✔
468
        if (function_exists('pcntl_async_signals')) {
1✔
469
            pcntl_async_signals(true);
1✔
470
        }
471
        $handler = function (int $sig): void {
1✔
472
            $this->cliWrite('Received signal ' . $sig . ' -> initiating graceful shutdown', 'yellow');
×
473
            $this->stopRequested = true;
×
474
        };
1✔
475
        // Common termination signals
476
        if (defined('SIGTERM')) { @pcntl_signal(SIGTERM, $handler); }
1✔
477
        if (defined('SIGINT')) { @pcntl_signal(SIGINT, $handler); }
1✔
478
        if (defined('SIGQUIT')) { @pcntl_signal(SIGQUIT, $handler); }
1✔
479
    }
480
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc