• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

daycry / jobs / 21145580946

19 Jan 2026 04:55PM UTC coverage: 58.025% (-4.5%) from 62.567%
21145580946

push

github

daycry
Add security, performance, and health features; simplify architecture

Introduces shell command whitelisting, smart token detection, per-queue rate limiting, dead letter queue, job timeout protection, config caching, and a health monitoring command. Refactors architecture by consolidating traits, removing the CompletionStrategy pattern, and unifying retry policies under RetryPolicyFixed. Updates documentation and tests to reflect new features and architectural changes, ensuring backward compatibility and improved reliability.

143 of 340 new or added lines in 20 files covered. (42.06%)

1 existing line in 1 file now uncovered.

1193 of 2056 relevant lines covered (58.03%)

4.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.92
/src/Execution/JobLifecycleCoordinator.php
1
<?php
2

3
declare(strict_types=1);
4

5
/**
6
 * This file is part of Daycry Queues.
7
 *
8
 * (c) Daycry <daycry9@proton.me>
9
 *
10
 * For the full copyright and license information, please view
11
 * the LICENSE file that was distributed with this source code.
12
 */
13

14
namespace Daycry\Jobs\Execution;
15

16
use Closure;
17
use Daycry\Jobs\Exceptions\JobException;
18
use Daycry\Jobs\Job;
19
use RuntimeException;
20
use Throwable;
21

22
/**
23
 * Orchestrates full job lifecycle including retries, notifications, and timeout protection.
24
 * Uses configured RetryPolicy to drive control flow.
25
 * Ensures single-instance jobs acquire and release a runtime flag to avoid concurrent runs.
26
 */
27
class JobLifecycleCoordinator
28
{
29
    public function __construct(
30
        private ?JobExecutor $executor = null,
31
        /**
32
         * @var callable(int):void
33
         */
34
        private $sleeper = null,
35
    ) {
36
        $this->executor ??= new JobExecutor();
18✔
37
        $this->sleeper ??= static function (int $seconds): void { if ($seconds > 0) { sleep($seconds); } };
18✔
38
    }
39

40
    public function run(Job $job, string $source = 'cron'): LifecycleOutcome
41
    {
42
        $attemptsMeta = [];
18✔
43

44
        // Read retry config inline - use direct config() in coordinator to respect test changes
45
        $cfg    = config('Jobs');
18✔
46
        $policy = new RetryPolicyFixed(
18✔
47
            base: $cfg->retryBackoffBase,
18✔
48
            strategy: $cfg->retryBackoffStrategy,
18✔
49
            multiplier: $cfg->retryBackoffMultiplier,
18✔
50
            max: $cfg->retryBackoffMax,
18✔
51
            jitter: $cfg->retryBackoffJitter,
18✔
52
        );
18✔
53

54
        $maxRetries   = max(0, $job->getMaxRetries() ?? 0);
18✔
55
        $finalFailure = false;
18✔
56
        $requeued     = false;
18✔
57
        $finalResult  = null;
18✔
58

59
        // Use persistent attempt counter from job instead of local counter
60
        $persistentAttempt = $job->getAttempt();
18✔
61

62
        // Lock single instance (si aplica)
63
        if ($job->isSingleInstance()) {
18✔
64
            if ($job->isRunning()) {
×
65
                throw new RuntimeException('Job already running: ' . $job->getName());
×
66
            }
67
            $job->saveRunningFlag();
×
68
        }
69

70
        try {
71
            while (true) {
18✔
72
                $persistentAttempt++;
18✔
73

74
                // Use timeout protection only if jobTimeout > 0
75
                $timeout = $cfg->jobTimeout ?? 0;
18✔
76
                $exec    = ($timeout > 0)
18✔
77
                    ? $this->safeExecuteWithTimeout($job, $timeout)
18✔
NEW
78
                    : $this->safeExecute($job);
×
79

80
                $attemptsMeta[] = [
18✔
81
                    'attempt'  => $persistentAttempt,
18✔
82
                    'success'  => $exec->success,
18✔
83
                    'error'    => $exec->error,
18✔
84
                    'duration' => $exec->durationSeconds(),
18✔
85
                ];
18✔
86

87
                // Notificaciones directas con ExecutionResult
88
                if ($exec->success && $job->shouldNotifyOnSuccess()) {
18✔
89
                    $job->notify($exec);
×
90
                } elseif (! $exec->success && $job->shouldNotifyOnFailure()) {
18✔
91
                    $job->notify($exec);
×
92
                }
93

94
                if ($exec->success) {
18✔
95
                    // Success: no extra action needed (completion handled by RequeueHelper)
96
                    $finalResult = $exec;
11✔
97
                    break;
11✔
98
                }
99

100
                // Fallo - check against persistent counter
101
                if ($persistentAttempt > $maxRetries) {
9✔
102
                    // Final failure: no extra action needed (completion handled by RequeueHelper)
103
                    $finalResult  = $exec;
7✔
104
                    $finalFailure = true;
7✔
105
                    break;
7✔
106
                }
107

108
                $delay = $policy->computeDelay($persistentAttempt + 1); // Próximo intento
6✔
109
                if ($delay > 0) {
6✔
110
                    ($this->sleeper)($delay);
5✔
111
                }
112
            }
113
        } finally {
114
            if ($job->isSingleInstance()) {
18✔
115
                $job->clearRunningFlag();
18✔
116
            }
117
        }
118

119
        if (! $finalResult) {
18✔
120
            // fallback improbable
121
            $finalResult = new ExecutionResult(false, null, 'Unknown execution state', microtime(true), microtime(true));
×
122
        }
123

124
        // Dispatch callback job if defined
125
        if (method_exists($job, 'hasCallbackJob') && $job->hasCallbackJob()) {
18✔
126
            $this->dispatchCallbackJob($job, $finalResult);
9✔
127
        }
128

129
        return new LifecycleOutcome(
18✔
130
            finalResult: $finalResult,
18✔
131
            attempts: $persistentAttempt,
18✔
132
            finalFailure: $finalFailure,
18✔
133
            requeued: $requeued,
18✔
134
            attemptsMeta: $attemptsMeta,
18✔
135
        );
18✔
136
    }
137

138
    private function safeExecute(Job $job): ExecutionResult
139
    {
140
        try {
141
            $start = microtime(true);
18✔
142

143
            return $this->executor->execute($job); // Ya retorna ExecutionResult directo
18✔
144
        } catch (Throwable $e) {
×
145
            $t = microtime(true);
×
146

147
            return new ExecutionResult(false, null, $e->getMessage(), $t, $t, null);
×
148
        }
149
    }
150

151
    /**
152
     * Execute job with timeout protection.
153
     */
154
    private function safeExecuteWithTimeout(Job $job, int $timeout): ExecutionResult
155
    {
156
        if ($timeout <= 0) {
18✔
NEW
157
            return $this->safeExecute($job); // No timeout
×
158
        }
159

160
        $startTime = time();
18✔
161
        $result    = null;
18✔
162
        $timedOut  = false;
18✔
163

164
        // Fork execution check using pcntl if available
165
        if (function_exists('pcntl_alarm')) {
18✔
166
            // Register alarm handler
167
            pcntl_signal(SIGALRM, static function () use (&$timedOut): void {
18✔
NEW
168
                $timedOut = true;
×
169
            });
18✔
170
            pcntl_alarm($timeout);
18✔
171

172
            try {
173
                $result = $this->safeExecute($job);
18✔
174
                pcntl_alarm(0); // Cancel alarm
18✔
NEW
175
            } catch (Throwable $e) {
×
NEW
176
                pcntl_alarm(0);
×
177

NEW
178
                throw $e;
×
179
            }
180

181
            if ($timedOut) {
18✔
NEW
182
                throw JobException::forJobTimeout($job->getName(), $timeout);
×
183
            }
184
        } else {
185
            // Fallback: simple time check (less accurate, no kill)
NEW
186
            $result = $this->safeExecute($job);
×
187

NEW
188
            if (time() - $startTime > $timeout) {
×
NEW
189
                log_message('warning', "Job {$job->getName()} exceeded timeout of {$timeout}s (no pcntl available for hard kill)");
×
190
            }
191
        }
192

193
        return $result;
18✔
194
    }
195

196
    /**
197
     * Builds and executes or enqueues the callback job based on descriptor.
198
     */
199
    private function dispatchCallbackJob(Job $parent, ExecutionResult $result): void
200
    {
201
        $descriptor = $parent->getCallbackDescriptor();
9✔
202
        if (! $descriptor) {
9✔
203
            return;
×
204
        }
205
        // Filter already normalized to: always|success|failure
206
        $filter = $descriptor->filter ?? 'always';
9✔
207
        if ($filter === 'success' && ! $result->success) {
9✔
208
            return;
×
209
        }
210
        if ($filter === 'failure' && $result->success) {
9✔
211
            return;
1✔
212
        }
213

214
        // Build child job via user builder
215
        try {
216
            $builder = $descriptor->builder;
8✔
217
            $child   = $builder($parent);
8✔
218
        } catch (Throwable $e) {
×
219
            return; // Fail silently to not break parent flow
×
220
        }
221
        if (! $child instanceof Job) {
8✔
222
            return; // Invalid builder return
×
223
        }
224

225
        // Inherit meta into payload if requested
226
        $inherit = $descriptor->inherit ?? [];
8✔
227
        $meta    = [
8✔
228
            'parentStatus' => $result->success,
8✔
229
        ];
8✔
230
        if (in_array('output', $inherit, true)) {
8✔
231
            $meta['parentOutput'] = $result->output;
7✔
232
        }
233
        if (in_array('error', $inherit, true)) {
8✔
234
            $meta['parentError'] = $result->error;
7✔
235
        }
236
        if (in_array('attempts', $inherit, true)) {
8✔
237
            $meta['parentAttempts'] = $parent->getAttempt();
×
238
        }
239
        if (in_array('name', $inherit, true)) {
8✔
240
            $meta['parentName'] = $parent->getName();
1✔
241
        }
242
        if (in_array('source', $inherit, true)) {
8✔
243
            $meta['parentSource'] = $parent->getSource();
×
244
        }
245

246
        // Attempt to merge meta into child payload (if array/object)
247
        $payload = $child->getPayload();
8✔
248
        if ($payload instanceof Closure) {
8✔
249
            // Cannot inject meta into a closure payload; skip meta merge.
250
        } elseif (is_array($payload)) {
×
251
            $payload['meta'] = isset($payload['meta']) && is_array($payload['meta'])
×
252
                ? $payload['meta'] + $meta
×
253
                : $meta;
×
254
        } elseif (is_object($payload) && ! ($payload instanceof Closure)) {
×
255
            foreach ($meta as $k => $v) {
×
256
                try {
257
                    $payload->{$k} = $v;
×
258
                } catch (Throwable) { // ignore
×
259
                }
260
            }
261
        } else {
262
            // Wrap scalar/callable/closure into array structure
263
            $payload = [
×
264
                'data' => $payload,
×
265
                'meta' => $meta,
×
266
            ];
×
267
        }
268

269
        // Replace modified payload directly (child preserves queue & configuration)
270
        try {
271
            $child->setPayload($payload);
8✔
272
        } catch (Throwable) {
×
273
            // ignore
274
        }
275

276
        // Mark origin
277
        if (method_exists($child, 'source')) {
8✔
278
            $child->source('callback');
8✔
279
        }
280
        if (method_exists($child, 'markAsCallbackChild')) {
8✔
281
            $child->markAsCallbackChild((bool) ($descriptor->allowChain ?? false));
8✔
282
        }
283

284
        $allowChain = (bool) ($descriptor->allowChain ?? false);
8✔
285

286
        if ($child->getQueue() !== null) {
8✔
287
            // Enqueue: we cannot process child's callback chain now (will happen when worker executes it)
288
            try {
289
                $child->push();
1✔
290
            } catch (Throwable) { // silent
×
291
            }
292
        } else {
293
            // Inline execution (we can cascade if allowed)
294
            try {
295
                $childResult = $this->executor->execute($child);
7✔
296
                if ($allowChain && method_exists($child, 'hasCallbackJob') && $child->hasCallbackJob()) {
7✔
297
                    // recursive dispatch for child
298
                    $this->dispatchCallbackJob($child, $childResult);
7✔
299
                }
300
            } catch (Throwable) { // ignore
×
301
            }
302
        }
303
    }
304
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc