• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

daycry / jobs / 21442227581

28 Jan 2026 02:30PM UTC coverage: 56.301% (-3.1%) from 59.413%
21442227581

push

github

daycry
Improve queue handling and job execution logic

Refactor queue worker to support background execution and improve job fetching logic. Update JobLifecycleCoordinator to prioritize job-specific and default timeouts without a global cap. Replace custom UUID generation with service-based UUID v7 in JobLogger. Ensure queue scheduling uses application timezone for consistency.

13 of 24 new or added lines in 5 files covered. (54.17%)

63 existing lines in 4 files now uncovered.

1175 of 2087 relevant lines covered (56.3%)

4.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.27
/src/Execution/JobLifecycleCoordinator.php
1
<?php
2

3
declare(strict_types=1);
4

5
/**
6
 * This file is part of Daycry Queues.
7
 *
8
 * (c) Daycry <daycry9@proton.me>
9
 *
10
 * For the full copyright and license information, please view
11
 * the LICENSE file that was distributed with this source code.
12
 */
13

14
namespace Daycry\Jobs\Execution;
15

16
use Closure;
17
use Daycry\Jobs\Exceptions\JobException;
18
use Daycry\Jobs\Job;
19
use RuntimeException;
20
use Throwable;
21

22
/**
23
 * Orchestrates full job lifecycle including retries, notifications, and timeout protection.
24
 * Uses configured RetryPolicy to drive control flow.
25
 * Ensures single-instance jobs acquire and release a runtime flag to avoid concurrent runs.
26
 */
27
class JobLifecycleCoordinator
28
{
29
    public function __construct(
30
        private ?JobExecutor $executor = null,
31
        /**
32
         * @var callable(int):void
33
         */
34
        private $sleeper = null,
35
    ) {
36
        $this->executor ??= new JobExecutor();
18✔
37
        $this->sleeper ??= static function (int $seconds): void { if ($seconds > 0) { sleep($seconds); } };
18✔
38
    }
39

40
    public function run(Job $job, string $source = 'cron'): LifecycleOutcome
41
    {
42
        $attemptsMeta = [];
18✔
43

44
        // Read retry config inline - use direct config() in coordinator to respect test changes
45
        $cfg    = config('Jobs');
18✔
46
        $policy = new RetryPolicyFixed(
18✔
47
            base: $cfg->retryBackoffBase,
18✔
48
            strategy: $cfg->retryBackoffStrategy,
18✔
49
            multiplier: $cfg->retryBackoffMultiplier,
18✔
50
            max: $cfg->retryBackoffMax,
18✔
51
            jitter: $cfg->retryBackoffJitter,
18✔
52
        );
18✔
53

54
        $maxRetries   = max(0, $job->getMaxRetries() ?? 0);
18✔
55
        $finalFailure = false;
18✔
56
        $requeued     = false;
18✔
57
        $finalResult  = null;
18✔
58

59
        // Use persistent attempt counter from job instead of local counter
60
        $persistentAttempt = $job->getAttempt();
18✔
61

62
        // Lock single instance (si aplica)
63
        if ($job->isSingleInstance()) {
18✔
64
            if ($job->isRunning()) {
×
65
                throw new RuntimeException('Job already running: ' . $job->getName());
×
66
            }
67
            $job->saveRunningFlag();
×
68
        }
69

70
        try {
71
            while (true) {
18✔
72
                $persistentAttempt++;
18✔
73

74
                // Determine effective timeout for this job.
75
                // Priority: job-specific timeout (Job::getTimeout()), then config defaultTimeout.
76
                // If neither is set, treat as unlimited (0). Do NOT apply a global "cap" here.
77
                $jobSpecific    = method_exists($job, 'getTimeout') ? $job->getTimeout() : null;
18✔
78
                $defaultTimeout = $cfg->defaultTimeout ?? null;
18✔
79

80
                if ($jobSpecific !== null) {
18✔
NEW
81
                    $timeout = (int) $jobSpecific;
×
82
                } elseif ($defaultTimeout !== null) {
18✔
NEW
83
                    $timeout = (int) $defaultTimeout;
×
84
                } else {
85
                    // Unlimited by default if nothing specified
86
                    $timeout = 0;
18✔
87
                }
88

89
                $exec = ($timeout > 0)
18✔
90
                    ? $this->safeExecuteWithTimeout($job, $timeout)
×
91
                    : $this->safeExecute($job);
18✔
92

93
                $attemptsMeta[] = [
18✔
94
                    'attempt'  => $persistentAttempt,
18✔
95
                    'success'  => $exec->success,
18✔
96
                    'error'    => $exec->error,
18✔
97
                    'duration' => $exec->durationSeconds(),
18✔
98
                ];
18✔
99

100
                // Notificaciones directas con ExecutionResult
101
                if ($exec->success && $job->shouldNotifyOnSuccess()) {
18✔
102
                    $job->notify($exec);
×
103
                } elseif (! $exec->success && $job->shouldNotifyOnFailure()) {
18✔
104
                    $job->notify($exec);
×
105
                }
106

107
                if ($exec->success) {
18✔
108
                    // Success: no extra action needed (completion handled by RequeueHelper)
109
                    $finalResult = $exec;
11✔
110
                    break;
11✔
111
                }
112

113
                // Fallo - check against persistent counter
114
                if ($persistentAttempt > $maxRetries) {
9✔
115
                    // Final failure: no extra action needed (completion handled by RequeueHelper)
116
                    $finalResult  = $exec;
7✔
117
                    $finalFailure = true;
7✔
118
                    break;
7✔
119
                }
120

121
                $delay = $policy->computeDelay($persistentAttempt + 1); // Próximo intento
6✔
122
                if ($delay > 0) {
6✔
123
                    ($this->sleeper)($delay);
5✔
124
                }
125
            }
126
        } finally {
127
            if ($job->isSingleInstance()) {
18✔
128
                $job->clearRunningFlag();
18✔
129
            }
130
        }
131

132
        if (! $finalResult) {
18✔
133
            // fallback improbable
134
            $finalResult = new ExecutionResult(false, null, 'Unknown execution state', microtime(true), microtime(true));
×
135
        }
136

137
        // Dispatch callback job if defined
138
        if (method_exists($job, 'hasCallbackJob') && $job->hasCallbackJob()) {
18✔
139
            $this->dispatchCallbackJob($job, $finalResult);
9✔
140
        }
141

142
        return new LifecycleOutcome(
18✔
143
            finalResult: $finalResult,
18✔
144
            attempts: $persistentAttempt,
18✔
145
            finalFailure: $finalFailure,
18✔
146
            requeued: $requeued,
18✔
147
            attemptsMeta: $attemptsMeta,
18✔
148
        );
18✔
149
    }
150

151
    private function safeExecute(Job $job): ExecutionResult
152
    {
153
        try {
154
            $start = microtime(true);
18✔
155

156
            return $this->executor->execute($job); // Ya retorna ExecutionResult directo
18✔
157
        } catch (Throwable $e) {
×
158
            $t = microtime(true);
×
159

160
            return new ExecutionResult(false, null, $e->getMessage(), $t, $t, null);
×
161
        }
162
    }
163

164
    /**
165
     * Execute job with timeout protection.
166
     */
167
    private function safeExecuteWithTimeout(Job $job, int $timeout): ExecutionResult
168
    {
UNCOV
169
        if ($timeout <= 0) {
×
170
            return $this->safeExecute($job); // No timeout
×
171
        }
172

UNCOV
173
        $startTime = time();
×
UNCOV
174
        $result    = null;
×
UNCOV
175
        $timedOut  = false;
×
176

177
        // Fork execution check using pcntl if available
UNCOV
178
        if (function_exists('pcntl_alarm')) {
×
179
            // Register alarm handler
UNCOV
180
            pcntl_signal(SIGALRM, static function () use (&$timedOut): void {
×
181
                $timedOut = true;
×
UNCOV
182
            });
×
UNCOV
183
            pcntl_alarm($timeout);
×
184

185
            try {
UNCOV
186
                $result = $this->safeExecute($job);
×
UNCOV
187
                pcntl_alarm(0); // Cancel alarm
×
188
            } catch (Throwable $e) {
×
189
                pcntl_alarm(0);
×
190

191
                throw $e;
×
192
            }
193

UNCOV
194
            if ($timedOut) {
×
195
                throw JobException::forJobTimeout($job->getName(), $timeout);
×
196
            }
197
        } else {
198
            // Fallback: simple time check (less accurate, no kill)
199
            $result = $this->safeExecute($job);
×
200

201
            if (time() - $startTime > $timeout) {
×
202
                log_message('warning', "Job {$job->getName()} exceeded timeout of {$timeout}s (no pcntl available for hard kill)");
×
203
            }
204
        }
205

UNCOV
206
        return $result;
×
207
    }
208

209
    /**
210
     * Builds and executes or enqueues the callback job based on descriptor.
211
     */
212
    private function dispatchCallbackJob(Job $parent, ExecutionResult $result): void
213
    {
214
        $descriptor = $parent->getCallbackDescriptor();
9✔
215
        if (! $descriptor) {
9✔
216
            return;
×
217
        }
218
        // Filter already normalized to: always|success|failure
219
        $filter = $descriptor->filter ?? 'always';
9✔
220
        if ($filter === 'success' && ! $result->success) {
9✔
221
            return;
×
222
        }
223
        if ($filter === 'failure' && $result->success) {
9✔
224
            return;
1✔
225
        }
226

227
        // Build child job via user builder
228
        try {
229
            $builder = $descriptor->builder;
8✔
230
            $child   = $builder($parent);
8✔
231
        } catch (Throwable $e) {
×
232
            return; // Fail silently to not break parent flow
×
233
        }
234
        if (! $child instanceof Job) {
8✔
235
            return; // Invalid builder return
×
236
        }
237

238
        // Inherit meta into payload if requested
239
        $inherit = $descriptor->inherit ?? [];
8✔
240
        $meta    = [
8✔
241
            'parentStatus' => $result->success,
8✔
242
        ];
8✔
243
        if (in_array('output', $inherit, true)) {
8✔
244
            $meta['parentOutput'] = $result->output;
7✔
245
        }
246
        if (in_array('error', $inherit, true)) {
8✔
247
            $meta['parentError'] = $result->error;
7✔
248
        }
249
        if (in_array('attempts', $inherit, true)) {
8✔
250
            $meta['parentAttempts'] = $parent->getAttempt();
×
251
        }
252
        if (in_array('name', $inherit, true)) {
8✔
253
            $meta['parentName'] = $parent->getName();
1✔
254
        }
255
        if (in_array('source', $inherit, true)) {
8✔
256
            $meta['parentSource'] = $parent->getSource();
×
257
        }
258

259
        // Attempt to merge meta into child payload (if array/object)
260
        $payload = $child->getPayload();
8✔
261
        if ($payload instanceof Closure) {
8✔
262
            // Cannot inject meta into a closure payload; skip meta merge.
263
        } elseif (is_array($payload)) {
×
264
            $payload['meta'] = isset($payload['meta']) && is_array($payload['meta'])
×
265
                ? $payload['meta'] + $meta
×
266
                : $meta;
×
267
        } elseif (is_object($payload) && ! ($payload instanceof Closure)) {
×
268
            foreach ($meta as $k => $v) {
×
269
                try {
270
                    $payload->{$k} = $v;
×
271
                } catch (Throwable) { // ignore
×
272
                }
273
            }
274
        } else {
275
            // Wrap scalar/callable/closure into array structure
276
            $payload = [
×
277
                'data' => $payload,
×
278
                'meta' => $meta,
×
279
            ];
×
280
        }
281

282
        // Replace modified payload directly (child preserves queue & configuration)
283
        try {
284
            $child->setPayload($payload);
8✔
285
        } catch (Throwable) {
×
286
            // ignore
287
        }
288

289
        // Mark origin
290
        if (method_exists($child, 'source')) {
8✔
291
            $child->source('callback');
8✔
292
        }
293
        if (method_exists($child, 'markAsCallbackChild')) {
8✔
294
            $child->markAsCallbackChild((bool) ($descriptor->allowChain ?? false));
8✔
295
        }
296

297
        $allowChain = (bool) ($descriptor->allowChain ?? false);
8✔
298

299
        if ($child->getQueue() !== null) {
8✔
300
            // Enqueue: we cannot process child's callback chain now (will happen when worker executes it)
301
            try {
302
                $child->push();
1✔
303
            } catch (Throwable) { // silent
1✔
304
            }
305
        } else {
306
            // Inline execution (we can cascade if allowed)
307
            try {
308
                $childResult = $this->executor->execute($child);
7✔
309
                if ($allowChain && method_exists($child, 'hasCallbackJob') && $child->hasCallbackJob()) {
7✔
310
                    // recursive dispatch for child
311
                    $this->dispatchCallbackJob($child, $childResult);
7✔
312
                }
313
            } catch (Throwable) { // ignore
×
314
            }
315
        }
316
    }
317
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc