• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

daycry / jobs / 25524504346

07 May 2026 06:23PM UTC coverage: 75.009% (-0.04%) from 75.053%
25524504346

push

github

daycry
chore: apply Rector fixes so analyze workflow passes

The Static Analysis workflow runs rector process --dry-run, which exits
non-zero on PHP 8.2/8.3 when changes are pending. Applied the 11
recommended refactors (NullToStrictStringFuncCallArgRector,
RemoveReflectionSetAccessibleCallsRector,
AssertEmptyNullableObjectToAssertInstanceofRector,
ClassPropertyAssignToConstructorPromotionRector,
ReadOnlyClassRector, RecastingRemovalRector, RepeatedOrEqualToInArrayRector,
FlipTypeControlToUseExclusiveTypeRector, AddInstanceofAssertForNullableArgumentRector)
across V2 value objects, the new V1_1 tests, and the multi-phase v1.2 files
so dry-run reports "Rector is done!" again.

531 tests pass; PHPStan and Psalm remain clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

3 of 4 new or added lines in 4 files covered. (75.0%)

2 existing lines in 1 file now uncovered.

2137 of 2849 relevant lines covered (75.01%)

12.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.61
/src/Execution/JobLifecycleCoordinator.php
1
<?php
2

3
declare(strict_types=1);
4

5
/**
6
 * This file is part of Daycry Queues.
7
 *
8
 * (c) Daycry <daycry9@proton.me>
9
 *
10
 * For the full copyright and license information, please view
11
 * the LICENSE file that was distributed with this source code.
12
 */
13

14
namespace Daycry\Jobs\Execution;
15

16
use Closure;
17
use Daycry\Jobs\Config\Jobs;
18
use Daycry\Jobs\Exceptions\JobException;
19
use Daycry\Jobs\Interfaces\JobInterface;
20
use Daycry\Jobs\Job;
21
use Daycry\Jobs\Loggers\JobLogger;
22
use Daycry\Jobs\Metrics\Metrics;
23
use RuntimeException;
24
use Throwable;
25

26
/**
27
 * Orchestrates full job lifecycle including retries, notifications, and timeout protection.
28
 * Uses configured RetryPolicy to drive control flow.
29
 * Ensures single-instance jobs acquire and release a runtime flag to avoid concurrent runs.
30
 */
31
class JobLifecycleCoordinator
32
{
33
    private const MAX_CALLBACK_DEPTH = 5;
34

35
    private $sleeper;
36

37
    public function __construct(
38
        /** @var callable(int):void */
39
        $sleeper = null,
40
    ) {
41
        $this->sleeper = $sleeper ?? static function (int $seconds): void {
71✔
42
            if ($seconds > 0) {
1✔
43
                sleep($seconds);
1✔
44
            }
45
        };
63✔
46
    }
47

48
    public function run(Job $job, string $source = 'cron'): LifecycleOutcome
49
    {
50
        $attemptsMeta = [];
67✔
51

52
        // Read retry config inline - use direct config() in coordinator to respect test changes
53
        $cfg    = config(Jobs::class) ?? new Jobs();
67✔
54
        $policy = new RetryPolicyFixed(
67✔
55
            base: $cfg->retryBackoffBase,
67✔
56
            strategy: $cfg->retryBackoffStrategy,
67✔
57
            multiplier: $cfg->retryBackoffMultiplier,
67✔
58
            max: $cfg->retryBackoffMax,
67✔
59
            jitter: $cfg->retryBackoffJitter,
67✔
60
        );
67✔
61

62
        $maxRetries   = max(0, $job->getMaxRetries() ?? 0);
67✔
63
        $finalFailure = false;
67✔
64
        $requeued     = false;
67✔
65
        $finalResult  = null;
67✔
66

67
        // Use persistent attempt counter from job instead of local counter
68
        $persistentAttempt = $job->getAttempt();
67✔
69

70
        // Lock single instance (si aplica). For long-running jobs we refresh the lock
71
        // periodically (heartbeat) inside safeExecuteWithTimeout so the TTL never elapses
72
        // while the job is still alive. The initial TTL is sized to be longer than the
73
        // heartbeat interval so a brief stall does not release the lock prematurely.
74
        $heartbeatTtl = $this->computeHeartbeatTtl($job, $cfg);
67✔
75

76
        if ($job->isSingleInstance()) {
67✔
77
            if ($job->isRunning()) {
5✔
78
                throw new RuntimeException('Job already running: ' . $job->getName());
1✔
79
            }
80
            $job->saveRunningFlag($heartbeatTtl);
4✔
81
        }
82

83
        try {
84
            while (true) {
66✔
85
                $persistentAttempt++;
66✔
86

87
                // Determine effective timeout for this job.
88
                // Priority: job-specific timeout (Job::getTimeout()), then config defaultTimeout.
89
                // If neither is set, treat as unlimited (0). Do NOT apply a global "cap" here.
90
                $jobSpecific    = $job->getTimeout();
66✔
91
                $defaultTimeout = $cfg->defaultTimeout ?? null;
66✔
92

93
                if ($jobSpecific !== null) {
66✔
94
                    $timeout = $jobSpecific;
4✔
95
                } elseif ($defaultTimeout !== null) {
62✔
96
                    $timeout = (int) $defaultTimeout;
1✔
97
                } else {
98
                    // Unlimited by default if nothing specified
99
                    $timeout = 0;
61✔
100
                }
101

102
                // Refresh the singleInstance lock before each attempt so retries do not
103
                // accumulate the original TTL. No-op for non-singleInstance jobs.
104
                if ($job->isSingleInstance()) {
66✔
105
                    $job->saveRunningFlag($heartbeatTtl);
4✔
106
                }
107

108
                $exec = ($timeout > 0)
66✔
109
                    ? $this->safeExecuteWithTimeout($job, $timeout)
5✔
110
                    : $this->safeExecute($job);
61✔
111

112
                $attemptsMeta[] = [
66✔
113
                    'attempt'  => $persistentAttempt,
66✔
114
                    'success'  => $exec->success,
66✔
115
                    'error'    => $exec->error,
66✔
116
                    'duration' => $exec->durationSeconds(),
66✔
117
                ];
66✔
118

119
                // Notificaciones directas con ExecutionResult
120
                if ($exec->success && $job->shouldNotifyOnSuccess()) {
66✔
121
                    $job->notify($exec);
×
122
                } elseif (! $exec->success && $job->shouldNotifyOnFailure()) {
66✔
123
                    $job->notify($exec);
×
124
                }
125

126
                if ($exec->success) {
66✔
127
                    // Success: no extra action needed (completion handled by RequeueHelper)
128
                    $finalResult = $exec;
47✔
129
                    break;
47✔
130
                }
131

132
                // Fallo - check against persistent counter
133
                if ($persistentAttempt > $maxRetries) {
22✔
134
                    // Final failure: no extra action needed (completion handled by RequeueHelper)
135
                    $finalResult  = $exec;
20✔
136
                    $finalFailure = true;
20✔
137
                    break;
20✔
138
                }
139

140
                $delay = $policy->computeDelay($persistentAttempt + 1); // Próximo intento
8✔
141
                if ($delay > 0) {
8✔
142
                    ($this->sleeper)($delay);
6✔
143
                }
144
            }
145
        } finally {
146
            if ($job->isSingleInstance()) {
66✔
147
                $job->clearRunningFlag();
66✔
148
            }
149
        }
150

151
        // Dispatch callback job if defined
152
        if ($job->hasCallbackJob()) {
66✔
153
            $this->dispatchCallbackJob($job, $finalResult);
23✔
154
        }
155

156
        return new LifecycleOutcome(
66✔
157
            finalResult: $finalResult,
66✔
158
            attempts: $persistentAttempt,
66✔
159
            finalFailure: $finalFailure,
66✔
160
            requeued: $requeued,
66✔
161
            attemptsMeta: $attemptsMeta,
66✔
162
        );
66✔
163
    }
164

165
    private function safeExecute(Job $job): ExecutionResult
166
    {
167
        try {
168
            return $this->executeJobInternal($job);
67✔
169
        } catch (Throwable $e) {
1✔
170
            $t = microtime(true);
1✔
171

172
            return new ExecutionResult(false, null, $e->getMessage(), $t, $t);
1✔
173
        }
174
    }
175

176
    /**
177
     * Compute the TTL used for the singleInstance running flag.
178
     * Sized at max(120s, jobTimeout + 60s) so the heartbeat refresh from the worker loop
179
     * has comfortable margin even with brief stalls.
180
     */
181
    private function computeHeartbeatTtl(Job $job, Jobs $cfg): int
182
    {
183
        $timeout = $job->getTimeout() ?? $cfg->defaultTimeout ?? 0;
67✔
184

185
        return max(120, ($timeout) + 60);
67✔
186
    }
187

188
    private function executeJobInternal(Job $job): ExecutionResult
189
    {
190
        $start  = microtime(true);
67✔
191
        $cfg    = config(Jobs::class) ?? new Jobs();
67✔
192
        $logger = null;
67✔
193
        if ($cfg->logPerformance) {
67✔
194
            $logger = new JobLogger();
5✔
195
            $logger->start(date('Y-m-d H:i:s'));
5✔
196
        }
197
        $bufferActive = false;
67✔
198

199
        try {
200
            $mapping = $cfg->jobs;
67✔
201
            $class   = $mapping[$job->getJob()] ?? null;
67✔
202
            if (! $class || ! is_subclass_of($class, Job::class)) {
67✔
203
                throw JobException::forInvalidJob($job->getJob());
1✔
204
            }
205
            /** @var JobInterface $handler */
206
            $handler = new $class();
66✔
207

208
            $job = $handler->beforeRun($job);
66✔
209
            ob_start();
66✔
210
            $bufferActive = true;
66✔
211

212
            // Build middleware pipeline wrapping the handler execution
213
            $middlewareStack = $job->getMiddleware();
66✔
214
            $core            = static fn (Job $j) => $handler->handle($j->getPayload());
66✔
215
            $pipeline        = array_reduce(
66✔
216
                array_reverse($middlewareStack),
66✔
217
                static fn (callable $next, callable $mw) => static fn (Job $j) => $mw($j, $next),
66✔
218
                $core,
66✔
219
            );
66✔
220
            $returned = $pipeline($job);
66✔
221

222
            $buffer       = ob_get_clean();
52✔
223
            $bufferActive = false;
52✔
224

225
            if ($buffer === '' || $buffer === false) {
52✔
226
                $buffer = null;
49✔
227
            }
228

229
            // Interpret return as success unless an exception was thrown.
230
            $success = true;
52✔
231
            $data    = $returned;
52✔
232

233
            // Merge captured buffer with returned data when meaningful
234
            if ($buffer !== null) {
52✔
235
                if ($data === null) {
3✔
236
                    $data = $buffer;
1✔
237
                } elseif (is_string($data) && $buffer !== '') {
2✔
238
                    $separator = str_starts_with($buffer, "\n") ? '' : "\n";
2✔
239
                    $data .= $separator . $buffer;
2✔
240
                }
241
            }
242

243
            $job             = $handler->afterRun($job);
52✔
244
            $end             = microtime(true);
52✔
245
            $executionResult = new ExecutionResult(
52✔
246
                success: $success,
52✔
247
                output: $success ? $this->normalizeOutput($data) : null,
52✔
248
                error: $success ? null : (is_scalar($data) ? (string) $data : json_encode($data)),
52✔
249
                startedAt: $start,
52✔
250
                endedAt: $end,
52✔
251
                handlerClass: $class,
52✔
252
            );
52✔
253
            if ($logger instanceof JobLogger) {
52✔
254
                $logger->end(date('Y-m-d H:i:s'));
3✔
255
                $logger->log($job, $executionResult);
3✔
256
            }
257

258
            return $executionResult;
52✔
259
        } catch (Throwable $e) {
22✔
260
            if ($bufferActive && ob_get_level() > 0) {
22✔
261
                try {
262
                    ob_end_clean();
21✔
263
                } catch (Throwable) {
×
264
                }
265
            }
266
            $t               = microtime(true);
22✔
267
            $executionResult = new ExecutionResult(false, null, $e->getMessage(), $start, $t);
22✔
268
            if ($logger instanceof JobLogger) {
22✔
269
                $logger->end(date('Y-m-d H:i:s'));
2✔
270
                $logger->log($job, $executionResult);
2✔
271
            }
272

273
            return $executionResult;
21✔
274
        }
275
    }
276

277
    private function normalizeOutput(mixed $data): ?string
278
    {
279
        if ($data === null) {
53✔
280
            return null;
1✔
281
        }
282
        if (is_scalar($data)) {
52✔
283
            return (string) $data;
51✔
284
        }
285

286
        $encoded = json_encode($data);
1✔
287

288
        return $encoded !== false ? $encoded : null;
1✔
289
    }
290

291
    /**
292
     * Execute job with timeout protection.
293
     */
294
    private function safeExecuteWithTimeout(Job $job, int $timeout): ExecutionResult
295
    {
296
        if ($timeout <= 0) {
6✔
297
            return $this->safeExecute($job); // No timeout
1✔
298
        }
299

300
        $startTime = time();
5✔
301
        $result    = null;
5✔
302
        $timedOut  = false;
5✔
303

304
        // Fork execution check using pcntl if available
305
        if (function_exists('pcntl_alarm')) {
5✔
306
            // Enable async signal dispatch so SIGALRM interrupts CPU-bound code without
307
            // waiting for a tick or syscall. Available since PHP 7.1; on older runtimes
308
            // we fall back to declare(ticks=1) which is configured below.
309
            $previousAsync = false;
5✔
310
            if (function_exists('pcntl_async_signals')) {
5✔
311
                $previousAsync = pcntl_async_signals(true);
5✔
312
            }
313

314
            // Register alarm handler. The handler throws so CPU-bound code is interrupted
315
            // immediately on SIGALRM rather than relying on a post-execute check.
316
            $previousHandler = function_exists('pcntl_signal_get_handler')
5✔
317
                ? pcntl_signal_get_handler(SIGALRM)
5✔
318
                : SIG_DFL;
×
319
            pcntl_signal(SIGALRM, static function () use (&$timedOut): void {
5✔
320
                $timedOut = true;
×
321
            });
5✔
322
            pcntl_alarm($timeout);
5✔
323

324
            try {
325
                $result = $this->safeExecute($job);
5✔
326
                pcntl_alarm(0);
5✔
327
            } catch (Throwable $e) {
×
328
                pcntl_alarm(0);
×
329

330
                throw $e;
×
331
            } finally {
332
                // Restore previous handler / async-signal state so successive jobs in the
333
                // same worker process do not inherit our timeout handler.
334
                pcntl_signal(SIGALRM, $previousHandler);
5✔
335
                if (function_exists('pcntl_async_signals')) {
5✔
336
                    pcntl_async_signals($previousAsync);
5✔
337
                }
338
            }
339

340
            if ($timedOut) {
5✔
341
                Metrics::get()?->increment('jobs_timed_out', 1, ['job' => $job->getName(), 'queue' => $job->getQueue() ?? 'default']);
×
342

343
                throw JobException::forJobTimeout($job->getName(), $timeout);
×
344
            }
345
        } else {
346
            // Fallback: simple time check (less accurate, no kill)
347
            $result = $this->safeExecute($job);
×
348

349
            if (time() - $startTime > $timeout) {
×
350
                Metrics::get()?->increment('jobs_timed_out', 1, ['job' => $job->getName(), 'queue' => $job->getQueue() ?? 'default']);
×
351
                log_message('warning', "Job {$job->getName()} exceeded timeout of {$timeout}s (no pcntl available for hard kill)");
×
352
            }
353
        }
354

355
        return $result;
5✔
356
    }
357

358
    /**
359
     * Builds and executes or enqueues the callback job based on descriptor.
360
     */
361
    private function dispatchCallbackJob(Job $parent, ExecutionResult $result, int $depth = 0): void
362
    {
363
        if ($depth >= self::MAX_CALLBACK_DEPTH) {
23✔
364
            log_message('warning', 'JobLifecycleCoordinator: max callback chain depth (' . self::MAX_CALLBACK_DEPTH . ') reached for job: ' . $parent->getName());
×
365

366
            return;
×
367
        }
368

369
        $descriptor = $parent->getCallbackDescriptor();
23✔
370
        if (! $descriptor) {
23✔
371
            return;
×
372
        }
373
        // Filter already normalized to: always|success|failure
374
        $filter = $descriptor->filter ?? 'always';
23✔
375
        if ($filter === 'success' && ! $result->success) {
23✔
376
            return;
2✔
377
        }
378
        if ($filter === 'failure' && $result->success) {
21✔
379
            return;
1✔
380
        }
381

382
        // Build child job via user builder
383
        try {
384
            $builder = $descriptor->builder;
20✔
385
            $child   = $builder($parent);
20✔
386
        } catch (Throwable $e) {
1✔
387
            log_message('error', 'JobLifecycleCoordinator: callback builder failed for job ' . $parent->getName() . ' — ' . $e->getMessage());
1✔
388

389
            return; // Fail gracefully to not break parent flow
1✔
390
        }
391
        if (! $child instanceof Job) {
19✔
392
            return; // Invalid builder return
1✔
393
        }
394

395
        // Inherit meta into payload if requested
396
        $inherit = $descriptor->inherit ?? [];
18✔
397
        $meta    = [
18✔
398
            'parentStatus' => $result->success,
18✔
399
        ];
18✔
400
        if (in_array('output', $inherit, true)) {
18✔
401
            $meta['parentOutput'] = $result->output;
17✔
402
        }
403
        if (in_array('error', $inherit, true)) {
18✔
404
            $meta['parentError'] = $result->error;
14✔
405
        }
406
        if (in_array('attempts', $inherit, true)) {
18✔
407
            $meta['parentAttempts'] = $parent->getAttempt();
1✔
408
        }
409
        if (in_array('name', $inherit, true)) {
18✔
410
            $meta['parentName'] = $parent->getName();
2✔
411
        }
412
        if (in_array('source', $inherit, true)) {
18✔
413
            $meta['parentSource'] = $parent->getSource();
1✔
414
        }
415

416
        // Attempt to merge meta into child payload (if array/object)
417
        $payload = $child->getPayload();
18✔
418
        if ($payload instanceof Closure) {
18✔
419
            // Cannot inject meta into a closure payload; skip meta merge.
420
        } elseif (is_array($payload)) {
9✔
421
            $payload['meta'] = isset($payload['meta']) && is_array($payload['meta'])
1✔
422
                ? $payload['meta'] + $meta
1✔
423
                : $meta;
×
424
        } elseif (is_object($payload) && ! ($payload instanceof Closure)) {
8✔
425
            foreach ($meta as $k => $v) {
1✔
426
                try {
427
                    $payload->{$k} = $v;
1✔
428
                } catch (Throwable) { // ignore
×
429
                }
430
            }
431
        } else {
432
            // Wrap scalar/callable/closure into array structure
433
            $payload = [
7✔
434
                'data' => $payload,
7✔
435
                'meta' => $meta,
7✔
436
            ];
7✔
437
        }
438

439
        // Replace modified payload directly (child preserves queue & configuration)
440
        try {
441
            $child->setPayload($payload);
18✔
442
        } catch (Throwable) {
×
443
            // ignore
444
        }
445

446
        // Mark origin
447
        $child->source('callback');
18✔
448
        $child->markAsCallbackChild((bool) ($descriptor->allowChain ?? false));
18✔
449

450
        $allowChain = (bool) ($descriptor->allowChain ?? false);
18✔
451

452
        if ($child->getQueue() !== null) {
18✔
453
            // Enqueue: we cannot process child's callback chain now (will happen when worker executes it)
454
            try {
455
                $child->push();
1✔
UNCOV
456
            } catch (Throwable $e) {
×
UNCOV
457
                log_message('error', 'JobLifecycleCoordinator: failed to enqueue callback job — ' . $e->getMessage());
×
458
            }
459
        } else {
460
            // Inline execution (we can cascade if allowed)
461
            try {
462
                $childResult = $this->executeJobInternal($child);
17✔
463
                if ($allowChain && $child->hasCallbackJob()) {
17✔
464
                    // recursive dispatch for child with depth tracking
465
                    $this->dispatchCallbackJob($child, $childResult, $depth + 1);
17✔
466
                }
467
            } catch (Throwable $e) {
×
468
                log_message('error', 'JobLifecycleCoordinator: inline callback execution failed — ' . $e->getMessage());
×
469
            }
470
        }
471
    }
472
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc