• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

daycry / jobs / 19498141349

19 Nov 2025 10:30AM UTC coverage: 62.5% (+0.7%) from 61.815%
19498141349

push

github

daycry
- Fixes

4 of 4 new or added lines in 4 files covered. (100.0%)

41 existing lines in 9 files now uncovered.

1145 of 1832 relevant lines covered (62.5%)

4.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.57
/src/Execution/JobLifecycleCoordinator.php
1
<?php
2

3
declare(strict_types=1);
4

5
/**
6
 * This file is part of Daycry Queues.
7
 *
8
 * (c) Daycry <daycry9@proton.me>
9
 *
10
 * For the full copyright and license information, please view
11
 * the LICENSE file that was distributed with this source code.
12
 */
13

14
namespace Daycry\Jobs\Execution;
15

16
use Closure;
17
use Daycry\Jobs\Job;
18
use RuntimeException;
19
use Throwable;
20

21
/**
22
 * Orchestrates full job lifecycle including retries, completion strategies, and notifications.
23
 * Uses configured RetryPolicy and CompletionStrategy factories to drive control flow.
24
 * Ensures single-instance jobs acquire and release a runtime flag to avoid concurrent runs.
25
 */
26
class JobLifecycleCoordinator
27
{
28
    public function __construct(
29
        private RetryPolicyFactory $retryFactory = new RetryPolicyFactory(),
30
        private CompletionStrategyFactory $completionFactory = new CompletionStrategyFactory(),
31
        private ?JobExecutor $executor = null,
32
        /**
33
         * @var callable(int):void
34
         */
35
        private $sleeper = null,
36
    ) {
37
        $this->executor ??= new JobExecutor();
18✔
38
        $this->sleeper ??= static function (int $seconds): void { if ($seconds > 0) { sleep($seconds); } };
18✔
39
    }
40

41
    public function run(Job $job, ExecutionContext $ctx): LifecycleOutcome
42
    {
43
        $attempt      = 0;
18✔
44
        $attemptsMeta = [];
18✔
45
        $policy       = $this->retryFactory->for($ctx);
18✔
46
        $completion   = $this->completionFactory->for($ctx);
18✔
47

48
        $maxRetries   = max(0, $ctx->maxRetries);
18✔
49
        $finalFailure = false;
18✔
50
        $requeued     = false;
18✔
51
        $finalResult  = null;
18✔
52

53
        // Lock single instance (si aplica)
54
        if ($ctx->singleInstance && $job->isSingleInstance()) {
18✔
55
            if ($job->isRunning()) {
×
56
                throw new RuntimeException('Job already running: ' . $job->getName());
×
57
            }
58
            $job->saveRunningFlag();
×
59
        }
60

61
        try {
62
            while (true) {
18✔
63
                $attempt++;
18✔
64
                $exec = $this->safeExecute($job);
18✔
65

66
                $attemptsMeta[] = [
18✔
67
                    'attempt'  => $attempt,
18✔
68
                    'success'  => $exec->success,
18✔
69
                    'error'    => $exec->error,
18✔
70
                    'duration' => $exec->durationSeconds(),
18✔
71
                ];
18✔
72

73
                // Notificaciones directas con ExecutionResult
74
                if ($ctx->notifyOnSuccess && $exec->success && $job->shouldNotifyOnSuccess()) {
18✔
75
                    $job->notify($exec);
×
76
                } elseif ($ctx->notifyOnFailure && ! $exec->success && $job->shouldNotifyOnFailure()) {
18✔
77
                    $job->notify($exec);
×
78
                }
79

80
                if ($exec->success) {
18✔
81
                    $completion->onSuccess($job, $exec, $ctx);
11✔
82
                    $finalResult = $exec;
11✔
83
                    break;
11✔
84
                }
85

86
                // Fallo
87
                if ($attempt > $maxRetries) {
9✔
88
                    $completion->onFailure($job, $exec, $ctx, $attempt);
7✔
89
                    $finalResult  = $exec;
7✔
90
                    $finalFailure = true;
7✔
91
                    break;
7✔
92
                }
93

94
                $delay = $policy->computeDelay($attempt + 1); // Próximo intento
6✔
95
                if ($delay > 0) {
6✔
96
                    ($this->sleeper)($delay);
5✔
97
                }
98
            }
99
        } finally {
100
            if ($ctx->singleInstance && $job->isSingleInstance()) {
18✔
101
                $job->clearRunningFlag();
18✔
102
            }
103
        }
104

105
        if (! $finalResult) {
18✔
106
            // fallback improbable
107
            $finalResult = new ExecutionResult(false, null, 'Unknown execution state', microtime(true), microtime(true));
×
108
        }
109

110
        // Dispatch callback job if defined
111
        if (method_exists($job, 'hasCallbackJob') && $job->hasCallbackJob()) {
18✔
112
            $this->dispatchCallbackJob($job, $finalResult);
9✔
113
        }
114

115
        return new LifecycleOutcome(
18✔
116
            finalResult: $finalResult,
18✔
117
            attempts: $attempt,
18✔
118
            finalFailure: $finalFailure,
18✔
119
            requeued: $requeued,
18✔
120
            attemptsMeta: $attemptsMeta,
18✔
121
        );
18✔
122
    }
123

124
    private function safeExecute(Job $job): ExecutionResult
125
    {
126
        try {
127
            $start = microtime(true);
18✔
128

129
            return $this->executor->execute($job); // Ya retorna ExecutionResult directo
18✔
130
        } catch (Throwable $e) {
×
131
            $t = microtime(true);
×
132

133
            return new ExecutionResult(false, null, $e->getMessage(), $t, $t, null);
×
134
        }
135
    }
136

137
    /**
138
     * Builds and executes or enqueues the callback job based on descriptor.
139
     */
140
    private function dispatchCallbackJob(Job $parent, ExecutionResult $result): void
141
    {
142
        $descriptor = $parent->getCallbackDescriptor();
9✔
143
        if (! $descriptor) {
9✔
144
            return;
×
145
        }
146
        // Filter already normalized to: always|success|failure
147
        $filter = $descriptor->filter ?? 'always';
9✔
148
        if ($filter === 'success' && ! $result->success) {
9✔
149
            return;
×
150
        }
151
        if ($filter === 'failure' && $result->success) {
9✔
152
            return;
1✔
153
        }
154

155
        // Build child job via user builder
156
        try {
157
            $builder = $descriptor->builder;
8✔
158
            $child   = $builder($parent);
8✔
159
        } catch (Throwable $e) {
×
160
            return; // Fail silently to not break parent flow
×
161
        }
162
        if (! $child instanceof Job) {
8✔
163
            return; // Invalid builder return
×
164
        }
165

166
        // Inherit meta into payload if requested
167
        $inherit = $descriptor->inherit ?? [];
8✔
168
        $meta    = [
8✔
169
            'parentStatus' => $result->success,
8✔
170
        ];
8✔
171
        if (in_array('output', $inherit, true)) {
8✔
172
            $meta['parentOutput'] = $result->output;
7✔
173
        }
174
        if (in_array('error', $inherit, true)) {
8✔
175
            $meta['parentError'] = $result->error;
7✔
176
        }
177
        if (in_array('attempts', $inherit, true)) {
8✔
178
            $meta['parentAttempts'] = $parent->getAttempt();
×
179
        }
180
        if (in_array('name', $inherit, true)) {
8✔
181
            $meta['parentName'] = $parent->getName();
1✔
182
        }
183
        if (in_array('source', $inherit, true)) {
8✔
184
            $meta['parentSource'] = $parent->getSource();
×
185
        }
186

187
        // Attempt to merge meta into child payload (if array/object)
188
        $payload = $child->getPayload();
8✔
189
        if ($payload instanceof Closure) {
8✔
190
            // Cannot inject meta into a closure payload; skip meta merge.
191
        } elseif (is_array($payload)) {
×
192
            $payload['meta'] = isset($payload['meta']) && is_array($payload['meta'])
×
193
                ? $payload['meta'] + $meta
×
194
                : $meta;
×
195
        } elseif (is_object($payload) && ! ($payload instanceof Closure)) {
×
196
            foreach ($meta as $k => $v) {
×
197
                try {
198
                    $payload->{$k} = $v;
×
199
                } catch (Throwable) { // ignore
×
200
                }
201
            }
202
        } else {
203
            // Wrap scalar/callable/closure into array structure
204
            $payload = [
×
205
                'data' => $payload,
×
206
                'meta' => $meta,
×
207
            ];
×
208
        }
209

210
        // Replace modified payload directly (child preserves queue & configuration)
211
        try {
212
            $child->setPayload($payload);
8✔
213
        } catch (Throwable) {
×
214
            // ignore
215
        }
216

217
        // Mark origin
218
        if (method_exists($child, 'source')) {
8✔
219
            $child->source('callback');
8✔
220
        }
221
        if (method_exists($child, 'markAsCallbackChild')) {
8✔
222
            $child->markAsCallbackChild((bool) ($descriptor->allowChain ?? false));
8✔
223
        }
224

225
        $allowChain = (bool) ($descriptor->allowChain ?? false);
8✔
226

227
        if ($child->getQueue() !== null) {
8✔
228
            // Enqueue: we cannot process child's callback chain now (will happen when worker executes it)
229
            try {
230
                $child->push();
1✔
UNCOV
231
            } catch (Throwable) { // silent
×
232
            }
233
        } else {
234
            // Inline execution (we can cascade if allowed)
235
            try {
236
                $childResult = $this->executor->execute($child);
7✔
237
                if ($allowChain && method_exists($child, 'hasCallbackJob') && $child->hasCallbackJob()) {
7✔
238
                    // recursive dispatch for child
239
                    $this->dispatchCallbackJob($child, $childResult);
7✔
240
                }
241
            } catch (Throwable) { // ignore
×
242
            }
243
        }
244
    }
245
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc