• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

daycry / jobs / 24850441053

23 Apr 2026 05:54PM UTC coverage: 52.404% (-1.5%) from 53.938%
24850441053

push

github

daycry
Fixes

104 of 219 new or added lines in 42 files covered. (47.49%)

14 existing lines in 9 files now uncovered.

1210 of 2309 relevant lines covered (52.4%)

4.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.16
/src/Execution/JobLifecycleCoordinator.php
1
<?php
2

3
declare(strict_types=1);
4

5
/**
6
 * This file is part of Daycry Queues.
7
 *
8
 * (c) Daycry <daycry9@proton.me>
9
 *
10
 * For the full copyright and license information, please view
11
 * the LICENSE file that was distributed with this source code.
12
 */
13

14
namespace Daycry\Jobs\Execution;
15

16
use Closure;
17
use Daycry\Jobs\Config\Jobs;
18
use Daycry\Jobs\Exceptions\JobException;
19
use Daycry\Jobs\Interfaces\JobInterface;
20
use Daycry\Jobs\Job;
21
use Daycry\Jobs\Loggers\JobLogger;
22
use RuntimeException;
23
use Throwable;
24

25
/**
26
 * Orchestrates full job lifecycle including retries, notifications, and timeout protection.
27
 * Uses configured RetryPolicy to drive control flow.
28
 * Ensures single-instance jobs acquire and release a runtime flag to avoid concurrent runs.
29
 */
30
class JobLifecycleCoordinator
31
{
32
    private const MAX_CALLBACK_DEPTH = 5;
33

34
    private $sleeper;
35

36
    public function __construct(
37
        /**
38
         * @var callable(int):void
39
         */
40
        $sleeper = null,
41
    ) {
42
        $this->sleeper = $sleeper ?? static function (int $seconds): void {
28✔
43
            if ($seconds > 0) {
1✔
44
                sleep($seconds);
1✔
45
            }
46
        };
22✔
47
    }
48

49
    public function run(Job $job, string $source = 'cron'): LifecycleOutcome
50
    {
51
        $attemptsMeta = [];
28✔
52

53
        // Read retry config inline - use direct config() in coordinator to respect test changes
54
        $cfg    = config(Jobs::class) ?? new Jobs();
28✔
55
        $policy = new RetryPolicyFixed(
28✔
56
            base: $cfg->retryBackoffBase,
28✔
57
            strategy: $cfg->retryBackoffStrategy,
28✔
58
            multiplier: $cfg->retryBackoffMultiplier,
28✔
59
            max: $cfg->retryBackoffMax,
28✔
60
            jitter: $cfg->retryBackoffJitter,
28✔
61
        );
28✔
62

63
        $maxRetries   = max(0, $job->getMaxRetries() ?? 0);
28✔
64
        $finalFailure = false;
28✔
65
        $requeued     = false;
28✔
66
        $finalResult  = null;
28✔
67

68
        // Use persistent attempt counter from job instead of local counter
69
        $persistentAttempt = $job->getAttempt();
28✔
70

71
        // Lock single instance (si aplica)
72
        if ($job->isSingleInstance()) {
28✔
73
            if ($job->isRunning()) {
×
74
                throw new RuntimeException('Job already running: ' . $job->getName());
×
75
            }
76
            $job->saveRunningFlag();
×
77
        }
78

79
        try {
80
            while (true) {
28✔
81
                $persistentAttempt++;
28✔
82

83
                // Determine effective timeout for this job.
84
                // Priority: job-specific timeout (Job::getTimeout()), then config defaultTimeout.
85
                // If neither is set, treat as unlimited (0). Do NOT apply a global "cap" here.
86
                $jobSpecific    = $job->getTimeout();
28✔
87
                $defaultTimeout = $cfg->defaultTimeout ?? null;
28✔
88

89
                if ($jobSpecific !== null) {
28✔
NEW
90
                    $timeout = $jobSpecific;
×
91
                } elseif ($defaultTimeout !== null) {
28✔
92
                    $timeout = (int) $defaultTimeout;
×
93
                } else {
94
                    // Unlimited by default if nothing specified
95
                    $timeout = 0;
28✔
96
                }
97

98
                $exec = ($timeout > 0)
28✔
99
                    ? $this->safeExecuteWithTimeout($job, $timeout)
×
100
                    : $this->safeExecute($job);
28✔
101

102
                $attemptsMeta[] = [
28✔
103
                    'attempt'  => $persistentAttempt,
28✔
104
                    'success'  => $exec->success,
28✔
105
                    'error'    => $exec->error,
28✔
106
                    'duration' => $exec->durationSeconds(),
28✔
107
                ];
28✔
108

109
                // Notificaciones directas con ExecutionResult
110
                if ($exec->success && $job->shouldNotifyOnSuccess()) {
28✔
111
                    $job->notify($exec);
×
112
                } elseif (! $exec->success && $job->shouldNotifyOnFailure()) {
28✔
113
                    $job->notify($exec);
×
114
                }
115

116
                if ($exec->success) {
28✔
117
                    // Success: no extra action needed (completion handled by RequeueHelper)
118
                    $finalResult = $exec;
19✔
119
                    break;
19✔
120
                }
121

122
                // Fallo - check against persistent counter
123
                if ($persistentAttempt > $maxRetries) {
11✔
124
                    // Final failure: no extra action needed (completion handled by RequeueHelper)
125
                    $finalResult  = $exec;
9✔
126
                    $finalFailure = true;
9✔
127
                    break;
9✔
128
                }
129

130
                $delay = $policy->computeDelay($persistentAttempt + 1); // Próximo intento
6✔
131
                if ($delay > 0) {
6✔
132
                    ($this->sleeper)($delay);
5✔
133
                }
134
            }
135
        } finally {
136
            if ($job->isSingleInstance()) {
28✔
137
                $job->clearRunningFlag();
28✔
138
            }
139
        }
140

141
        // Dispatch callback job if defined
142
        if ($job->hasCallbackJob()) {
28✔
143
            $this->dispatchCallbackJob($job, $finalResult);
9✔
144
        }
145

146
        return new LifecycleOutcome(
28✔
147
            finalResult: $finalResult,
28✔
148
            attempts: $persistentAttempt,
28✔
149
            finalFailure: $finalFailure,
28✔
150
            requeued: $requeued,
28✔
151
            attemptsMeta: $attemptsMeta,
28✔
152
        );
28✔
153
    }
154

155
    private function safeExecute(Job $job): ExecutionResult
156
    {
157
        try {
158
            return $this->executeJobInternal($job);
28✔
159
        } catch (Throwable $e) {
×
160
            $t = microtime(true);
×
161

NEW
162
            return new ExecutionResult(false, null, $e->getMessage(), $t, $t);
×
163
        }
164
    }
165

166
    private function executeJobInternal(Job $job): ExecutionResult
167
    {
168
        $start  = microtime(true);
28✔
169
        $cfg    = config(Jobs::class) ?? new Jobs();
28✔
170
        $logger = null;
28✔
171
        if ($cfg->logPerformance) {
28✔
172
            $logger = new JobLogger();
4✔
173
            $logger->start(date('Y-m-d H:i:s'));
4✔
174
        }
175
        $bufferActive = false;
28✔
176

177
        try {
178
            $mapping = $cfg->jobs;
28✔
179
            $class   = $mapping[$job->getJob()] ?? null;
28✔
180
            if (! $class || ! is_subclass_of($class, Job::class)) {
28✔
181
                throw JobException::forInvalidJob($job->getJob());
×
182
            }
183
            /** @var JobInterface $handler */
184
            $handler = new $class();
28✔
185

186
            $job = $handler->beforeRun($job);
28✔
187
            ob_start();
28✔
188
            $bufferActive = true;
28✔
189

190
            // Build middleware pipeline wrapping the handler execution
191
            $middlewareStack = $job->getMiddleware();
28✔
192
            $core            = static fn (Job $j) => $handler->handle($j->getPayload());
28✔
193
            $pipeline        = array_reduce(
28✔
194
                array_reverse($middlewareStack),
28✔
195
                static fn (callable $next, callable $mw) => static fn (Job $j) => $mw($j, $next),
28✔
196
                $core,
28✔
197
            );
28✔
198
            $returned = $pipeline($job);
28✔
199

200
            $buffer       = ob_get_clean();
21✔
201
            $bufferActive = false;
21✔
202

203
            if ($buffer === '' || $buffer === false) {
21✔
204
                $buffer = null;
18✔
205
            }
206

207
            // Interpret return as success unless an exception was thrown.
208
            $success = true;
21✔
209
            $data    = $returned;
21✔
210

211
            // Merge captured buffer with returned data when meaningful
212
            if ($buffer !== null) {
21✔
213
                if ($data === null) {
3✔
214
                    $data = $buffer;
1✔
215
                } elseif (is_string($data) && $buffer !== '') {
2✔
216
                    $separator = str_starts_with($buffer, "\n") ? '' : "\n";
2✔
217
                    $data .= $separator . $buffer;
2✔
218
                }
219
            }
220

221
            $job             = $handler->afterRun($job);
21✔
222
            $end             = microtime(true);
21✔
223
            $executionResult = new ExecutionResult(
21✔
224
                success: $success,
21✔
225
                output: $success ? $this->normalizeOutput($data) : null,
21✔
226
                error: $success ? null : (is_scalar($data) ? (string) $data : json_encode($data)),
21✔
227
                startedAt: $start,
21✔
228
                endedAt: $end,
21✔
229
                handlerClass: $class,
21✔
230
            );
21✔
231
            if ($logger instanceof JobLogger) {
21✔
232
                $logger->end(date('Y-m-d H:i:s'));
3✔
233
                $logger->log($job, $executionResult);
3✔
234
            }
235

236
            return $executionResult;
21✔
237
        } catch (Throwable $e) {
11✔
238
            if ($bufferActive && ob_get_level() > 0) {
11✔
239
                try {
240
                    ob_end_clean();
11✔
241
                } catch (Throwable) {
×
242
                }
243
            }
244
            $t               = microtime(true);
11✔
245
            $executionResult = new ExecutionResult(false, null, $e->getMessage(), $start, $t);
11✔
246
            if ($logger instanceof JobLogger) {
11✔
247
                $logger->end(date('Y-m-d H:i:s'));
1✔
248
                $logger->log($job, $executionResult);
1✔
249
            }
250

251
            return $executionResult;
11✔
252
        }
253
    }
254

255
    private function normalizeOutput(mixed $data): ?string
256
    {
257
        if ($data === null) {
21✔
258
            return null;
×
259
        }
260
        if (is_scalar($data)) {
21✔
261
            return (string) $data;
20✔
262
        }
263

264
        $encoded = json_encode($data);
1✔
265

266
        return $encoded !== false ? $encoded : null;
1✔
267
    }
268

269
    /**
270
     * Execute job with timeout protection.
271
     */
272
    private function safeExecuteWithTimeout(Job $job, int $timeout): ExecutionResult
273
    {
274
        if ($timeout <= 0) {
×
275
            return $this->safeExecute($job); // No timeout
×
276
        }
277

278
        $startTime = time();
×
279
        $result    = null;
×
280
        $timedOut  = false;
×
281

282
        // Fork execution check using pcntl if available
283
        if (function_exists('pcntl_alarm')) {
×
284
            // Register alarm handler
285
            pcntl_signal(SIGALRM, static function () use (&$timedOut): void {
×
286
                $timedOut = true;
×
287
            });
×
288
            pcntl_alarm($timeout);
×
289

290
            try {
291
                $result = $this->safeExecute($job);
×
292
                pcntl_alarm(0); // Cancel alarm
×
293
            } catch (Throwable $e) {
×
294
                pcntl_alarm(0);
×
295

296
                throw $e;
×
297
            }
298

299
            if ($timedOut) {
×
300
                throw JobException::forJobTimeout($job->getName(), $timeout);
×
301
            }
302
        } else {
303
            // Fallback: simple time check (less accurate, no kill)
304
            $result = $this->safeExecute($job);
×
305

306
            if (time() - $startTime > $timeout) {
×
307
                log_message('warning', "Job {$job->getName()} exceeded timeout of {$timeout}s (no pcntl available for hard kill)");
×
308
            }
309
        }
310

311
        return $result;
×
312
    }
313

314
    /**
315
     * Builds and executes or enqueues the callback job based on descriptor.
316
     */
317
    private function dispatchCallbackJob(Job $parent, ExecutionResult $result, int $depth = 0): void
318
    {
319
        if ($depth >= self::MAX_CALLBACK_DEPTH) {
9✔
NEW
320
            log_message('warning', 'JobLifecycleCoordinator: max callback chain depth (' . self::MAX_CALLBACK_DEPTH . ') reached for job: ' . $parent->getName());
×
321

NEW
322
            return;
×
323
        }
324

325
        $descriptor = $parent->getCallbackDescriptor();
9✔
326
        if (! $descriptor) {
9✔
327
            return;
×
328
        }
329
        // Filter already normalized to: always|success|failure
330
        $filter = $descriptor->filter ?? 'always';
9✔
331
        if ($filter === 'success' && ! $result->success) {
9✔
332
            return;
×
333
        }
334
        if ($filter === 'failure' && $result->success) {
9✔
335
            return;
1✔
336
        }
337

338
        // Build child job via user builder
339
        try {
340
            $builder = $descriptor->builder;
8✔
341
            $child   = $builder($parent);
8✔
342
        } catch (Throwable $e) {
×
NEW
343
            log_message('error', 'JobLifecycleCoordinator: callback builder failed for job ' . $parent->getName() . ' — ' . $e->getMessage());
×
344

NEW
345
            return; // Fail gracefully to not break parent flow
×
346
        }
347
        if (! $child instanceof Job) {
8✔
348
            return; // Invalid builder return
×
349
        }
350

351
        // Inherit meta into payload if requested
352
        $inherit = $descriptor->inherit ?? [];
8✔
353
        $meta    = [
8✔
354
            'parentStatus' => $result->success,
8✔
355
        ];
8✔
356
        if (in_array('output', $inherit, true)) {
8✔
357
            $meta['parentOutput'] = $result->output;
7✔
358
        }
359
        if (in_array('error', $inherit, true)) {
8✔
360
            $meta['parentError'] = $result->error;
7✔
361
        }
362
        if (in_array('attempts', $inherit, true)) {
8✔
363
            $meta['parentAttempts'] = $parent->getAttempt();
×
364
        }
365
        if (in_array('name', $inherit, true)) {
8✔
366
            $meta['parentName'] = $parent->getName();
1✔
367
        }
368
        if (in_array('source', $inherit, true)) {
8✔
369
            $meta['parentSource'] = $parent->getSource();
×
370
        }
371

372
        // Attempt to merge meta into child payload (if array/object)
373
        $payload = $child->getPayload();
8✔
374
        if ($payload instanceof Closure) {
8✔
375
            // Cannot inject meta into a closure payload; skip meta merge.
UNCOV
376
        } elseif (is_array($payload)) {
×
377
            $payload['meta'] = isset($payload['meta']) && is_array($payload['meta'])
×
378
                ? $payload['meta'] + $meta
×
379
                : $meta;
×
UNCOV
380
        } elseif (is_object($payload) && ! ($payload instanceof Closure)) {
×
UNCOV
381
            foreach ($meta as $k => $v) {
×
382
                try {
UNCOV
383
                    $payload->{$k} = $v;
×
UNCOV
384
                } catch (Throwable) { // ignore
×
385
                }
386
            }
387
        } else {
388
            // Wrap scalar/callable/closure into array structure
389
            $payload = [
×
390
                'data' => $payload,
×
391
                'meta' => $meta,
×
392
            ];
×
393
        }
394

395
        // Replace modified payload directly (child preserves queue & configuration)
396
        try {
397
            $child->setPayload($payload);
8✔
398
        } catch (Throwable) {
×
399
            // ignore
400
        }
401

402
        // Mark origin
403
        $child->source('callback');
8✔
404
        $child->markAsCallbackChild((bool) ($descriptor->allowChain ?? false));
8✔
405

406
        $allowChain = (bool) ($descriptor->allowChain ?? false);
8✔
407

408
        if ($child->getQueue() !== null) {
8✔
409
            // Enqueue: we cannot process child's callback chain now (will happen when worker executes it)
410
            try {
411
                $child->push();
1✔
NEW
412
            } catch (Throwable $e) {
×
NEW
413
                log_message('error', 'JobLifecycleCoordinator: failed to enqueue callback job — ' . $e->getMessage());
×
414
            }
415
        } else {
416
            // Inline execution (we can cascade if allowed)
417
            try {
418
                $childResult = $this->executeJobInternal($child);
7✔
419
                if ($allowChain && $child->hasCallbackJob()) {
7✔
420
                    // recursive dispatch for child with depth tracking
421
                    $this->dispatchCallbackJob($child, $childResult, $depth + 1);
7✔
422
                }
NEW
423
            } catch (Throwable $e) {
×
NEW
424
                log_message('error', 'JobLifecycleCoordinator: inline callback execution failed — ' . $e->getMessage());
×
425
            }
426
        }
427
    }
428
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc