• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

daycry / jobs / 21832579818

09 Feb 2026 04:06PM UTC coverage: 56.107% (+0.3%) from 55.783%
21832579818

push

github

daycry
Merge JobExecutor into JobLifecycleCoordinator

Collapse the standalone JobExecutor into JobLifecycleCoordinator by moving safe handler execution logic (executeJobInternal, output normalization, buffer capture and logging) into the coordinator. Remove src/Execution/JobExecutor.php and adapt constructor to accept a sleeper callable. Introduce InteractsWithCurrentJob trait (provides $this->currentJob via beforeRun/afterRun) and apply it to built-in Job types (ClosureJob, CommandJob, EventJob, ShellJob, UrlJob). Update SyncQueue docs and tests to use JobLifecycleCoordinator (and its run(...)->finalResult) instead of JobExecutor, and update documentation to reflect the new execution flow and context access. Tests and logging behavior updated accordingly.

63 of 66 new or added lines in 2 files covered. (95.45%)

1 existing line in 1 file now uncovered.

1176 of 2096 relevant lines covered (56.11%)

4.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.02
/src/Execution/JobLifecycleCoordinator.php
1
<?php
2

3
declare(strict_types=1);
4

5
/**
6
 * This file is part of Daycry Queues.
7
 *
8
 * (c) Daycry <daycry9@proton.me>
9
 *
10
 * For the full copyright and license information, please view
11
 * the LICENSE file that was distributed with this source code.
12
 */
13

14
namespace Daycry\Jobs\Execution;
15

16
use Daycry\Jobs\Exceptions\JobException;
17
use Daycry\Jobs\Interfaces\JobInterface;
18
use Daycry\Jobs\Job;
19
use Daycry\Jobs\Loggers\JobLogger;
20
use RuntimeException;
21
use Throwable;
22

23
/**
24
 * Orchestrates full job lifecycle including retries, notifications, and timeout protection.
25
 * Uses configured RetryPolicy to drive control flow.
26
 * Ensures single-instance jobs acquire and release a runtime flag to avoid concurrent runs.
27
 */
28
class JobLifecycleCoordinator
29
{
30
    private $sleeper;
31

32
    public function __construct(
33
        /**
34
         * @var callable(int):void
35
         */
36
        $sleeper = null,
37
    ) {
38
        $this->sleeper = $sleeper ?? static function (int $seconds): void {
28✔
39
            if ($seconds > 0) {
1✔
40
                sleep($seconds);
1✔
41
            }
42
        };
22✔
43
    }
44

45
    public function run(Job $job, string $source = 'cron'): LifecycleOutcome
46
    {
47
        $attemptsMeta = [];
28✔
48

49
        // Read retry config inline - use direct config() in coordinator to respect test changes
50
        $cfg    = config('Jobs');
28✔
51
        $policy = new RetryPolicyFixed(
28✔
52
            base: $cfg->retryBackoffBase,
28✔
53
            strategy: $cfg->retryBackoffStrategy,
28✔
54
            multiplier: $cfg->retryBackoffMultiplier,
28✔
55
            max: $cfg->retryBackoffMax,
28✔
56
            jitter: $cfg->retryBackoffJitter,
28✔
57
        );
28✔
58

59
        $maxRetries   = max(0, $job->getMaxRetries() ?? 0);
28✔
60
        $finalFailure = false;
28✔
61
        $requeued     = false;
28✔
62
        $finalResult  = null;
28✔
63

64
        // Use persistent attempt counter from job instead of local counter
65
        $persistentAttempt = $job->getAttempt();
28✔
66

67
        // Lock single instance (si aplica)
68
        if ($job->isSingleInstance()) {
28✔
69
            if ($job->isRunning()) {
×
70
                throw new RuntimeException('Job already running: ' . $job->getName());
×
71
            }
72
            $job->saveRunningFlag();
×
73
        }
74

75
        try {
76
            while (true) {
28✔
77
                $persistentAttempt++;
28✔
78

79
                // Determine effective timeout for this job.
80
                // Priority: job-specific timeout (Job::getTimeout()), then config defaultTimeout.
81
                // If neither is set, treat as unlimited (0). Do NOT apply a global "cap" here.
82
                $jobSpecific    = method_exists($job, 'getTimeout') ? $job->getTimeout() : null;
28✔
83
                $defaultTimeout = $cfg->defaultTimeout ?? null;
28✔
84

85
                if ($jobSpecific !== null) {
28✔
86
                    $timeout = (int) $jobSpecific;
×
87
                } elseif ($defaultTimeout !== null) {
28✔
88
                    $timeout = (int) $defaultTimeout;
×
89
                } else {
90
                    // Unlimited by default if nothing specified
91
                    $timeout = 0;
28✔
92
                }
93

94
                $exec = ($timeout > 0)
28✔
95
                    ? $this->safeExecuteWithTimeout($job, $timeout)
×
96
                    : $this->safeExecute($job);
28✔
97

98
                $attemptsMeta[] = [
28✔
99
                    'attempt'  => $persistentAttempt,
28✔
100
                    'success'  => $exec->success,
28✔
101
                    'error'    => $exec->error,
28✔
102
                    'duration' => $exec->durationSeconds(),
28✔
103
                ];
28✔
104

105
                // Notificaciones directas con ExecutionResult
106
                if ($exec->success && $job->shouldNotifyOnSuccess()) {
28✔
107
                    $job->notify($exec);
×
108
                } elseif (! $exec->success && $job->shouldNotifyOnFailure()) {
28✔
109
                    $job->notify($exec);
×
110
                }
111

112
                if ($exec->success) {
28✔
113
                    // Success: no extra action needed (completion handled by RequeueHelper)
114
                    $finalResult = $exec;
19✔
115
                    break;
19✔
116
                }
117

118
                // Fallo - check against persistent counter
119
                if ($persistentAttempt > $maxRetries) {
11✔
120
                    // Final failure: no extra action needed (completion handled by RequeueHelper)
121
                    $finalResult  = $exec;
9✔
122
                    $finalFailure = true;
9✔
123
                    break;
9✔
124
                }
125

126
                $delay = $policy->computeDelay($persistentAttempt + 1); // Próximo intento
6✔
127
                if ($delay > 0) {
6✔
128
                    ($this->sleeper)($delay);
5✔
129
                }
130
            }
131
        } finally {
132
            if ($job->isSingleInstance()) {
28✔
133
                $job->clearRunningFlag();
28✔
134
            }
135
        }
136

137
        if (! $finalResult) {
28✔
138
            // fallback improbable
139
            $finalResult = new ExecutionResult(false, null, 'Unknown execution state', microtime(true), microtime(true));
×
140
        }
141

142
        // Dispatch callback job if defined
143
        if (method_exists($job, 'hasCallbackJob') && $job->hasCallbackJob()) {
28✔
144
            $this->dispatchCallbackJob($job, $finalResult);
9✔
145
        }
146

147
        return new LifecycleOutcome(
28✔
148
            finalResult: $finalResult,
28✔
149
            attempts: $persistentAttempt,
28✔
150
            finalFailure: $finalFailure,
28✔
151
            requeued: $requeued,
28✔
152
            attemptsMeta: $attemptsMeta,
28✔
153
        );
28✔
154
    }
155

156
    private function safeExecute(Job $job): ExecutionResult
157
    {
158
        try {
159
            return $this->executeJobInternal($job);
28✔
UNCOV
160
        } catch (Throwable $e) {
×
161
            $t = microtime(true);
×
162

163
            return new ExecutionResult(false, null, $e->getMessage(), $t, $t, null);
×
164
        }
165
    }
166

167
    private function executeJobInternal(Job $job): ExecutionResult
168
    {
169
        $start  = microtime(true);
28✔
170
        $logger = null;
28✔
171
        if (config('Jobs')->logPerformance) {
28✔
172
            $logger = new JobLogger();
4✔
173
            $logger->start(date('Y-m-d H:i:s'));
4✔
174
        }
175
        $bufferActive = false;
28✔
176

177
        try {
178
            $mapping = config('Jobs')->jobs;
28✔
179
            $class   = $mapping[$job->getJob()] ?? null;
28✔
180
            if (! $class || ! is_subclass_of($class, Job::class)) {
28✔
NEW
181
                throw JobException::forInvalidJob($job->getJob());
×
182
            }
183
            /** @var JobInterface $handler */
184
            $handler = new $class();
28✔
185

186
            $job = $handler->beforeRun($job);
28✔
187
            ob_start();
28✔
188
            $bufferActive = true;
28✔
189
            $returned     = $handler->handle($job->getPayload()); // Handler arbitrary return (scalar|array|object|null)
28✔
190
            $buffer       = ob_get_clean();
21✔
191
            $bufferActive = false;
21✔
192

193
            if ($buffer === '' || $buffer === false) {
21✔
194
                $buffer = null;
18✔
195
            }
196

197
            // Interpret return as success unless an exception was thrown.
198
            $success = true;
21✔
199
            $data    = $returned;
21✔
200

201
            // Merge captured buffer with returned data when meaningful
202
            if ($buffer !== null) {
21✔
203
                if ($data === null) {
3✔
204
                    $data = $buffer;
1✔
205
                } elseif (is_string($data) && $buffer !== '') {
2✔
206
                    $separator = str_starts_with($buffer, "\n") ? '' : "\n";
2✔
207
                    $data .= $separator . $buffer;
2✔
208
                }
209
            }
210

211
            $job             = $handler->afterRun($job);
21✔
212
            $end             = microtime(true);
21✔
213
            $executionResult = new ExecutionResult(
21✔
214
                success: $success,
21✔
215
                output: $success ? $this->normalizeOutput($data) : null,
21✔
216
                error: $success ? null : (is_scalar($data) ? (string) $data : json_encode($data)),
21✔
217
                startedAt: $start,
21✔
218
                endedAt: $end,
21✔
219
                handlerClass: $class,
21✔
220
            );
21✔
221
            if ($logger) {
21✔
222
                $logger->end(date('Y-m-d H:i:s'));
3✔
223
                $logger->log($job, $executionResult, null);
3✔
224
            }
225

226
            return $executionResult;
21✔
227
        } catch (Throwable $e) {
11✔
228
            if ($bufferActive && ob_get_level() > 0) {
11✔
229
                try {
230
                    ob_end_clean();
11✔
NEW
231
                } catch (Throwable) {
×
232
                }
233
            }
234
            $t               = microtime(true);
11✔
235
            $executionResult = new ExecutionResult(false, null, $e->getMessage(), $start, $t, null);
11✔
236
            if ($logger) {
11✔
237
                $logger->end(date('Y-m-d H:i:s'));
1✔
238
                $logger->log($job, $executionResult, null);
1✔
239
            }
240

241
            return $executionResult;
11✔
242
        }
243
    }
244

245
    private function normalizeOutput(mixed $data): ?string
246
    {
247
        if ($data === null) {
21✔
NEW
248
            return null;
×
249
        }
250
        if (is_scalar($data)) {
21✔
251
            return (string) $data;
20✔
252
        }
253

254
        return json_encode($data);
1✔
255
    }
256

257
    /**
258
     * Execute job with timeout protection.
259
     */
260
    private function safeExecuteWithTimeout(Job $job, int $timeout): ExecutionResult
261
    {
262
        if ($timeout <= 0) {
×
263
            return $this->safeExecute($job); // No timeout
×
264
        }
265

266
        $startTime = time();
×
267
        $result    = null;
×
268
        $timedOut  = false;
×
269

270
        // Fork execution check using pcntl if available
271
        if (function_exists('pcntl_alarm')) {
×
272
            // Register alarm handler
273
            pcntl_signal(SIGALRM, static function () use (&$timedOut): void {
×
274
                $timedOut = true;
×
275
            });
×
276
            pcntl_alarm($timeout);
×
277

278
            try {
279
                $result = $this->safeExecute($job);
×
280
                pcntl_alarm(0); // Cancel alarm
×
281
            } catch (Throwable $e) {
×
282
                pcntl_alarm(0);
×
283

284
                throw $e;
×
285
            }
286

287
            if ($timedOut) {
×
288
                throw JobException::forJobTimeout($job->getName(), $timeout);
×
289
            }
290
        } else {
291
            // Fallback: simple time check (less accurate, no kill)
292
            $result = $this->safeExecute($job);
×
293

294
            if (time() - $startTime > $timeout) {
×
295
                log_message('warning', "Job {$job->getName()} exceeded timeout of {$timeout}s (no pcntl available for hard kill)");
×
296
            }
297
        }
298

299
        return $result;
×
300
    }
301

302
    /**
303
     * Builds and executes or enqueues the callback job based on descriptor.
304
     */
305
    private function dispatchCallbackJob(Job $parent, ExecutionResult $result): void
306
    {
307
        $descriptor = $parent->getCallbackDescriptor();
9✔
308
        if (! $descriptor) {
9✔
309
            return;
×
310
        }
311
        // Filter already normalized to: always|success|failure
312
        $filter = $descriptor->filter ?? 'always';
9✔
313
        if ($filter === 'success' && ! $result->success) {
9✔
314
            return;
×
315
        }
316
        if ($filter === 'failure' && $result->success) {
9✔
317
            return;
1✔
318
        }
319

320
        // Build child job via user builder
321
        try {
322
            $builder = $descriptor->builder;
8✔
323
            $child   = $builder($parent);
8✔
324
        } catch (Throwable $e) {
×
325
            return; // Fail silently to not break parent flow
×
326
        }
327
        if (! $child instanceof Job) {
8✔
328
            return; // Invalid builder return
×
329
        }
330

331
        // Inherit meta into payload if requested
332
        $inherit = $descriptor->inherit ?? [];
8✔
333
        $meta    = [
8✔
334
            'parentStatus' => $result->success,
8✔
335
        ];
8✔
336
        if (in_array('output', $inherit, true)) {
8✔
337
            $meta['parentOutput'] = $result->output;
7✔
338
        }
339
        if (in_array('error', $inherit, true)) {
8✔
340
            $meta['parentError'] = $result->error;
7✔
341
        }
342
        if (in_array('attempts', $inherit, true)) {
8✔
343
            $meta['parentAttempts'] = $parent->getAttempt();
×
344
        }
345
        if (in_array('name', $inherit, true)) {
8✔
346
            $meta['parentName'] = $parent->getName();
1✔
347
        }
348
        if (in_array('source', $inherit, true)) {
8✔
349
            $meta['parentSource'] = $parent->getSource();
×
350
        }
351

352
        // Attempt to merge meta into child payload (if array/object)
353
        $payload = $child->getPayload();
8✔
354
        if ($payload instanceof Closure) {
8✔
355
            // Cannot inject meta into a closure payload; skip meta merge.
356
        } elseif (is_array($payload)) {
8✔
357
            $payload['meta'] = isset($payload['meta']) && is_array($payload['meta'])
×
358
                ? $payload['meta'] + $meta
×
359
                : $meta;
×
360
        } elseif (is_object($payload) && ! ($payload instanceof Closure)) {
8✔
361
            foreach ($meta as $k => $v) {
8✔
362
                try {
363
                    $payload->{$k} = $v;
8✔
364
                } catch (Throwable) { // ignore
8✔
365
                }
366
            }
367
        } else {
368
            // Wrap scalar/callable/closure into array structure
369
            $payload = [
×
370
                'data' => $payload,
×
371
                'meta' => $meta,
×
372
            ];
×
373
        }
374

375
        // Replace modified payload directly (child preserves queue & configuration)
376
        try {
377
            $child->setPayload($payload);
8✔
378
        } catch (Throwable) {
×
379
            // ignore
380
        }
381

382
        // Mark origin
383
        if (method_exists($child, 'source')) {
8✔
384
            $child->source('callback');
8✔
385
        }
386
        if (method_exists($child, 'markAsCallbackChild')) {
8✔
387
            $child->markAsCallbackChild((bool) ($descriptor->allowChain ?? false));
8✔
388
        }
389

390
        $allowChain = (bool) ($descriptor->allowChain ?? false);
8✔
391

392
        if ($child->getQueue() !== null) {
8✔
393
            // Enqueue: we cannot process child's callback chain now (will happen when worker executes it)
394
            try {
395
                $child->push();
1✔
396
            } catch (Throwable) { // silent
×
397
            }
398
        } else {
399
            // Inline execution (we can cascade if allowed)
400
            try {
401
                $childResult = $this->executeJobInternal($child);
7✔
402
                if ($allowChain && method_exists($child, 'hasCallbackJob') && $child->hasCallbackJob()) {
7✔
403
                    // recursive dispatch for child
404
                    $this->dispatchCallbackJob($child, $childResult);
7✔
405
                }
406
            } catch (Throwable) { // ignore
×
407
            }
408
        }
409
    }
410
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc