• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

daycry / jobs / 21444656761

28 Jan 2026 03:37PM UTC coverage: 55.831% (-0.5%) from 56.301%
21444656761

push

github

daycry
Add background execution to QueueRunCommand

Introduces support for running the queue command in the background using Symfony Process. Refactors parameter handling for 'queue', 'oneTime', and 'background' options, and improves queue prompt with validation.

0 of 18 new or added lines in 1 file covered. (0.0%)

2 existing lines in 2 files now uncovered.

1173 of 2101 relevant lines covered (55.83%)

4.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.59
/src/Execution/JobLifecycleCoordinator.php
1
<?php
2

3
declare(strict_types=1);
4

5
/**
6
 * This file is part of Daycry Queues.
7
 *
8
 * (c) Daycry <daycry9@proton.me>
9
 *
10
 * For the full copyright and license information, please view
11
 * the LICENSE file that was distributed with this source code.
12
 */
13

14
namespace Daycry\Jobs\Execution;
15

16
use Closure;
17
use Daycry\Jobs\Exceptions\JobException;
18
use Daycry\Jobs\Job;
19
use RuntimeException;
20
use Throwable;
21

22
/**
23
 * Orchestrates full job lifecycle including retries, notifications, and timeout protection.
24
 * Uses configured RetryPolicy to drive control flow.
25
 * Ensures single-instance jobs acquire and release a runtime flag to avoid concurrent runs.
26
 */
27
class JobLifecycleCoordinator
28
{
29
    public function __construct(
30
        private ?JobExecutor $executor = null,
31
        /**
32
         * @var callable(int):void
33
         */
34
        private $sleeper = null,
35
    ) {
36
        $this->executor ??= new JobExecutor();
18✔
37
        $this->sleeper ??= static function (int $seconds): void { if ($seconds > 0) { sleep($seconds); } };
18✔
38
    }
39

40
    public function run(Job $job, string $source = 'cron'): LifecycleOutcome
41
    {
42
        $attemptsMeta = [];
18✔
43

44
        // Read retry config inline - use direct config() in coordinator to respect test changes
45
        $cfg    = config('Jobs');
18✔
46
        $policy = new RetryPolicyFixed(
18✔
47
            base: $cfg->retryBackoffBase,
18✔
48
            strategy: $cfg->retryBackoffStrategy,
18✔
49
            multiplier: $cfg->retryBackoffMultiplier,
18✔
50
            max: $cfg->retryBackoffMax,
18✔
51
            jitter: $cfg->retryBackoffJitter,
18✔
52
        );
18✔
53

54
        $maxRetries   = max(0, $job->getMaxRetries() ?? 0);
18✔
55
        $finalFailure = false;
18✔
56
        $requeued     = false;
18✔
57
        $finalResult  = null;
18✔
58

59
        // Use persistent attempt counter from job instead of local counter
60
        $persistentAttempt = $job->getAttempt();
18✔
61

62
        // Lock single instance (si aplica)
63
        if ($job->isSingleInstance()) {
18✔
64
            if ($job->isRunning()) {
×
65
                throw new RuntimeException('Job already running: ' . $job->getName());
×
66
            }
67
            $job->saveRunningFlag();
×
68
        }
69

70
        try {
71
            while (true) {
18✔
72
                $persistentAttempt++;
18✔
73

74
                // Determine effective timeout for this job.
75
                // Priority: job-specific timeout (Job::getTimeout()), then config defaultTimeout.
76
                // If neither is set, treat as unlimited (0). Do NOT apply a global "cap" here.
77
                $jobSpecific    = method_exists($job, 'getTimeout') ? $job->getTimeout() : null;
18✔
78
                $defaultTimeout = $cfg->defaultTimeout ?? null;
18✔
79

80
                if ($jobSpecific !== null) {
18✔
81
                    $timeout = (int) $jobSpecific;
×
82
                } elseif ($defaultTimeout !== null) {
18✔
83
                    $timeout = (int) $defaultTimeout;
×
84
                } else {
85
                    // Unlimited by default if nothing specified
86
                    $timeout = 0;
18✔
87
                }
88

89
                $exec = ($timeout > 0)
18✔
90
                    ? $this->safeExecuteWithTimeout($job, $timeout)
×
91
                    : $this->safeExecute($job);
18✔
92

93
                $attemptsMeta[] = [
18✔
94
                    'attempt'  => $persistentAttempt,
18✔
95
                    'success'  => $exec->success,
18✔
96
                    'error'    => $exec->error,
18✔
97
                    'duration' => $exec->durationSeconds(),
18✔
98
                ];
18✔
99

100
                // Notificaciones directas con ExecutionResult
101
                if ($exec->success && $job->shouldNotifyOnSuccess()) {
18✔
102
                    $job->notify($exec);
×
103
                } elseif (! $exec->success && $job->shouldNotifyOnFailure()) {
18✔
104
                    $job->notify($exec);
×
105
                }
106

107
                if ($exec->success) {
18✔
108
                    // Success: no extra action needed (completion handled by RequeueHelper)
109
                    $finalResult = $exec;
11✔
110
                    break;
11✔
111
                }
112

113
                // Fallo - check against persistent counter
114
                if ($persistentAttempt > $maxRetries) {
9✔
115
                    // Final failure: no extra action needed (completion handled by RequeueHelper)
116
                    $finalResult  = $exec;
7✔
117
                    $finalFailure = true;
7✔
118
                    break;
7✔
119
                }
120

121
                $delay = $policy->computeDelay($persistentAttempt + 1); // Próximo intento
6✔
122
                if ($delay > 0) {
6✔
123
                    ($this->sleeper)($delay);
5✔
124
                }
125
            }
126
        } finally {
127
            if ($job->isSingleInstance()) {
18✔
128
                $job->clearRunningFlag();
18✔
129
            }
130
        }
131

132
        if (! $finalResult) {
18✔
133
            // fallback improbable
134
            $finalResult = new ExecutionResult(false, null, 'Unknown execution state', microtime(true), microtime(true));
×
135
        }
136

137
        // Dispatch callback job if defined
138
        if (method_exists($job, 'hasCallbackJob') && $job->hasCallbackJob()) {
18✔
139
            $this->dispatchCallbackJob($job, $finalResult);
9✔
140
        }
141

142
        return new LifecycleOutcome(
18✔
143
            finalResult: $finalResult,
18✔
144
            attempts: $persistentAttempt,
18✔
145
            finalFailure: $finalFailure,
18✔
146
            requeued: $requeued,
18✔
147
            attemptsMeta: $attemptsMeta,
18✔
148
        );
18✔
149
    }
150

151
    private function safeExecute(Job $job): ExecutionResult
152
    {
153
        try {
154
            $start = microtime(true);
18✔
155

156
            return $this->executor->execute($job); // Ya retorna ExecutionResult directo
18✔
157
        } catch (Throwable $e) {
×
158
            $t = microtime(true);
×
159

160
            return new ExecutionResult(false, null, $e->getMessage(), $t, $t, null);
×
161
        }
162
    }
163

164
    /**
165
     * Execute job with timeout protection.
166
     */
167
    private function safeExecuteWithTimeout(Job $job, int $timeout): ExecutionResult
168
    {
169
        if ($timeout <= 0) {
×
170
            return $this->safeExecute($job); // No timeout
×
171
        }
172

173
        $startTime = time();
×
174
        $result    = null;
×
175
        $timedOut  = false;
×
176

177
        // Fork execution check using pcntl if available
178
        if (function_exists('pcntl_alarm')) {
×
179
            // Register alarm handler
180
            pcntl_signal(SIGALRM, static function () use (&$timedOut): void {
×
181
                $timedOut = true;
×
182
            });
×
183
            pcntl_alarm($timeout);
×
184

185
            try {
186
                $result = $this->safeExecute($job);
×
187
                pcntl_alarm(0); // Cancel alarm
×
188
            } catch (Throwable $e) {
×
189
                pcntl_alarm(0);
×
190

191
                throw $e;
×
192
            }
193

194
            if ($timedOut) {
×
195
                throw JobException::forJobTimeout($job->getName(), $timeout);
×
196
            }
197
        } else {
198
            // Fallback: simple time check (less accurate, no kill)
199
            $result = $this->safeExecute($job);
×
200

201
            if (time() - $startTime > $timeout) {
×
202
                log_message('warning', "Job {$job->getName()} exceeded timeout of {$timeout}s (no pcntl available for hard kill)");
×
203
            }
204
        }
205

206
        return $result;
×
207
    }
208

209
    /**
210
     * Builds and executes or enqueues the callback job based on descriptor.
211
     */
212
    private function dispatchCallbackJob(Job $parent, ExecutionResult $result): void
213
    {
214
        $descriptor = $parent->getCallbackDescriptor();
9✔
215
        if (! $descriptor) {
9✔
216
            return;
×
217
        }
218
        // Filter already normalized to: always|success|failure
219
        $filter = $descriptor->filter ?? 'always';
9✔
220
        if ($filter === 'success' && ! $result->success) {
9✔
221
            return;
×
222
        }
223
        if ($filter === 'failure' && $result->success) {
9✔
224
            return;
1✔
225
        }
226

227
        // Build child job via user builder
228
        try {
229
            $builder = $descriptor->builder;
8✔
230
            $child   = $builder($parent);
8✔
231
        } catch (Throwable $e) {
×
232
            return; // Fail silently to not break parent flow
×
233
        }
234
        if (! $child instanceof Job) {
8✔
235
            return; // Invalid builder return
×
236
        }
237

238
        // Inherit meta into payload if requested
239
        $inherit = $descriptor->inherit ?? [];
8✔
240
        $meta    = [
8✔
241
            'parentStatus' => $result->success,
8✔
242
        ];
8✔
243
        if (in_array('output', $inherit, true)) {
8✔
244
            $meta['parentOutput'] = $result->output;
7✔
245
        }
246
        if (in_array('error', $inherit, true)) {
8✔
247
            $meta['parentError'] = $result->error;
7✔
248
        }
249
        if (in_array('attempts', $inherit, true)) {
8✔
250
            $meta['parentAttempts'] = $parent->getAttempt();
×
251
        }
252
        if (in_array('name', $inherit, true)) {
8✔
253
            $meta['parentName'] = $parent->getName();
1✔
254
        }
255
        if (in_array('source', $inherit, true)) {
8✔
256
            $meta['parentSource'] = $parent->getSource();
×
257
        }
258

259
        // Attempt to merge meta into child payload (if array/object)
260
        $payload = $child->getPayload();
8✔
261
        if ($payload instanceof Closure) {
8✔
262
            // Cannot inject meta into a closure payload; skip meta merge.
263
        } elseif (is_array($payload)) {
×
264
            $payload['meta'] = isset($payload['meta']) && is_array($payload['meta'])
×
265
                ? $payload['meta'] + $meta
×
266
                : $meta;
×
267
        } elseif (is_object($payload) && ! ($payload instanceof Closure)) {
×
268
            foreach ($meta as $k => $v) {
×
269
                try {
270
                    $payload->{$k} = $v;
×
271
                } catch (Throwable) { // ignore
×
272
                }
273
            }
274
        } else {
275
            // Wrap scalar/callable/closure into array structure
276
            $payload = [
×
277
                'data' => $payload,
×
278
                'meta' => $meta,
×
279
            ];
×
280
        }
281

282
        // Replace modified payload directly (child preserves queue & configuration)
283
        try {
284
            $child->setPayload($payload);
8✔
285
        } catch (Throwable) {
×
286
            // ignore
287
        }
288

289
        // Mark origin
290
        if (method_exists($child, 'source')) {
8✔
291
            $child->source('callback');
8✔
292
        }
293
        if (method_exists($child, 'markAsCallbackChild')) {
8✔
294
            $child->markAsCallbackChild((bool) ($descriptor->allowChain ?? false));
8✔
295
        }
296

297
        $allowChain = (bool) ($descriptor->allowChain ?? false);
8✔
298

299
        if ($child->getQueue() !== null) {
8✔
300
            // Enqueue: we cannot process child's callback chain now (will happen when worker executes it)
301
            try {
302
                $child->push();
1✔
UNCOV
303
            } catch (Throwable) { // silent
×
304
            }
305
        } else {
306
            // Inline execution (we can cascade if allowed)
307
            try {
308
                $childResult = $this->executor->execute($child);
7✔
309
                if ($allowChain && method_exists($child, 'hasCallbackJob') && $child->hasCallbackJob()) {
7✔
310
                    // recursive dispatch for child
311
                    $this->dispatchCallbackJob($child, $childResult);
7✔
312
                }
313
            } catch (Throwable) { // ignore
×
314
            }
315
        }
316
    }
317
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc