• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

daycry / jobs / 24850441053

23 Apr 2026 05:54PM UTC coverage: 52.404% (-1.5%) from 53.938%
24850441053

push

github

daycry
Fixes

104 of 219 new or added lines in 42 files covered. (47.49%)

14 existing lines in 9 files now uncovered.

1210 of 2309 relevant lines covered (52.4%)

4.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

42.48
/src/Commands/QueueRunCommand.php
1
<?php
2

3
declare(strict_types=1);
4

5
/**
6
 * This file is part of Daycry Queues.
7
 *
8
 * (c) Daycry <daycry9@proton.me>
9
 *
10
 * For the full copyright and license information, please view
11
 * the LICENSE file that was distributed with this source code.
12
 */
13

14
namespace Daycry\Jobs\Commands;
15

16
use Throwable;
17
use CodeIgniter\CLI\CLI;
18
use Config\Services;
19
use DateTimeInterface;
20
use Daycry\Jobs\Exceptions\JobException;
21
use Daycry\Jobs\Execution\JobLifecycleCoordinator;
22
use Daycry\Jobs\Job;
23
use Daycry\Jobs\Libraries\CircuitBreaker;
24
use Daycry\Jobs\Libraries\QueueManager;
25
use Daycry\Jobs\Libraries\RateLimiter;
26
use Daycry\Jobs\Metrics\Metrics;
27
use Daycry\Jobs\Queues\JobEnvelope;
28
use Daycry\Jobs\Queues\RequeueHelper;
29

30
/**
31
 * Long-running (or one-shot) queue worker command.
32
 * Pulls messages from the configured queue backend, executes them via lifecycle coordinator,
33
 * and applies basic retry (requeue) fallback when failures occur.
34
 */
35
class QueueRunCommand extends BaseJobsCommand
36
{
37
    protected $name                       = 'jobs:queue:run';
38
    protected $description                = 'Start queue worker.';
39
    protected $usage                      = 'queue:run <queue> [Options]';
40
    protected $arguments                  = ['queue' => 'The queue name.'];
41
    protected $options                    = ['--oneTime' => 'Only executes one time.', '--background' => 'Run the worker in background.'];
42
    protected bool $locked                = false;
43
    private ?RequeueHelper $requeueHelper = null;
44
    private bool $shouldStop              = false;
45

46
    protected function earlyChecks(Job $job): void
47
    {
48
    }
1✔
49

50
    protected function lateChecks(Job $job): void
51
    {
52
    }
1✔
53

54
    protected function earlyCallbackChecks(Job $job): void
55
    {
56
    }
×
57

58
    protected function lateCallbackChecks(Job $job): void
59
    {
60
    }
×
61

62
    protected function conditionalChecks(): bool
63
    {
64
        return true;
×
65
    }
66

67
    public function run(array $params): void
68
    {
69
        $queue      = $params['queue'] ?? $params[0] ?? CLI::getOption('queue');
×
70
        $oneTime    = array_key_exists('oneTime', $params) ? true : CLI::getOption('oneTime');
×
71
        $background = array_key_exists('background', $params) ? true : CLI::getOption('background');
×
72

73
        // Spawn background child and exit parent if requested (avoid respawn with --noBackground)
74
        if ($background) {
×
75
            $phpBin    = $this->getPhpBinary();
×
76
            $sparkPath = ROOTPATH . 'spark';
×
77

NEW
78
            $args = escapeshellarg('--queue') . ' ' . escapeshellarg($queue);
×
NEW
79
            if ($oneTime) {
×
NEW
80
                $args .= ' ' . escapeshellarg('--oneTime');
×
81
            }
82

UNCOV
83
            if (str_starts_with(strtolower(PHP_OS), strtolower('WIN'))) {
×
84
                // Windows: use start /B and redirect to NUL
NEW
85
                $cmd    = sprintf('%s %s %s %s', escapeshellarg($phpBin), escapeshellarg($sparkPath), escapeshellarg($this->name), $args);
×
86
                $winCmd = 'start "" /B ' . $cmd . ' > NUL 2>&1';
×
87
                pclose(popen($winCmd, 'r'));
×
88
            } else {
89
                // POSIX: use nohup and redirect to /dev/null, detach with &
NEW
90
                $cmd      = sprintf('%s %s %s %s', escapeshellarg($phpBin), escapeshellarg($sparkPath), escapeshellarg($this->name), $args);
×
91
                $posixCmd = 'nohup ' . $cmd . ' > /dev/null 2>&1 &';
×
92
                exec($posixCmd);
×
93
            }
94

95
            return;
×
96
        }
97

98
        if (empty($queue)) {
×
99
            $queue = CLI::prompt(lang('Queue.insertQueue'), config('Jobs')->queues, 'required');
×
100
        }
101

102
        // Register signal handlers for graceful shutdown (POSIX only)
103
        if (function_exists('pcntl_signal')) {
×
104
            pcntl_signal(SIGTERM, function (): void {
×
105
                $this->shouldStop = true;
×
106
                CLI::write('[Worker] SIGTERM received, finishing current job...', 'yellow');
×
107
            });
×
108
            pcntl_signal(SIGINT, function (): void {
×
109
                $this->shouldStop = true;
×
110
                CLI::write('[Worker] SIGINT received, finishing current job...', 'yellow');
×
111
            });
×
112
        }
113

114
        while (true) {
×
115
            if (function_exists('pcntl_signal_dispatch')) {
×
116
                pcntl_signal_dispatch();
×
117
            }
118

119
            if ($this->shouldStop) {
×
120
                CLI::write('[Worker] Graceful shutdown complete.', 'yellow');
×
121
                break;
×
122
            }
123

124
            if ($this->conditionalChecks()) {
×
125
                $this->processQueue($queue);
×
126

127
                if ($oneTime) {
×
128
                    return;
×
129
                }
130

131
                sleep(config('Jobs')->pollInterval);
×
132
            }
133
        }
134
    }
135

136
    protected function processQueue(string $queue): void
137
    {
138
        $response = [];
1✔
139
        $metrics  = Metrics::get();
1✔
140
        $this->requeueHelper ??= new RequeueHelper($metrics);
1✔
141

142
        // Rate limiting check
143
        $config    = config('Jobs');
1✔
144
        $rateLimit = $config->queueRateLimits[$queue] ?? 0;
1✔
145

146
        if ($rateLimit > 0) {
1✔
147
            $rateLimiter = new RateLimiter();
×
148
            if (! $rateLimiter->allow($queue, $rateLimit)) {
×
149
                // Rate limit exceeded, skip processing this cycle
150
                CLI::write("[Rate Limited] Queue '{$queue}' has reached limit of {$rateLimit} jobs/minute", 'yellow');
×
151

152
                return;
×
153
            }
154
        }
155

156
        // Circuit breaker check
157
        $breaker = new CircuitBreaker(
1✔
158
            'queue_' . $queue,
1✔
159
            $config->circuitBreakerThreshold,
1✔
160
            $config->circuitBreakerCooldown,
1✔
161
        );
1✔
162

163
        if (! $breaker->isAvailable()) {
1✔
164
            CLI::write("[Circuit Open] Backend for '{$queue}' is temporarily unavailable, skipping.", 'red');
×
165

166
            return;
×
167
        }
168

169
        Services::resetSingle('request');
1✔
170
        Services::resetSingle('response');
1✔
171

172
        try {
173
            $worker      = $this->getWorker();
1✔
174
            $queueEntity = $worker->watch($queue);
1✔
175
            $breaker->recordSuccess();
1✔
176

177
            if ($queueEntity === null) {
1✔
178
                // No available job for this queue at this time.
179
                return;
×
180
            }
181

182
            if ($queueEntity !== null) {
1✔
183
                $metrics->increment('jobs_fetched', 1, ['queue' => $queue]);
1✔
184
                $this->locked = true;
1✔
185
                if (! ($queueEntity instanceof JobEnvelope)) {
1✔
186
                    throw JobException::validationError('Legacy queue entity unsupported (expecting JobEnvelope).');
×
187
                }
188
                $decoded = $queueEntity->payload;
1✔
189
                if (! is_object($decoded)) {
1✔
190
                    throw JobException::validationError('Invalid envelope payload format.');
×
191
                }
192
                $job = Job::fromQueueRecord($decoded);
1✔
193
                // Inject backend ID into job instance for context availability
194
                $job->setJobId($queueEntity->id);
1✔
195

196
                $this->earlyChecks($job);
1✔
197

198
                $this->lateChecks($job); // todavía antes de ejecutar? mantener orden original
1✔
199

200
                $coordinator = new JobLifecycleCoordinator();
1✔
201
                $startExec   = microtime(true);
1✔
202
                $outcome     = $coordinator->run($job, 'queue');
1✔
203
                $latency     = microtime(true) - $startExec;
1✔
204
                $exec        = $outcome->finalResult;
1✔
205
                $response    = [
1✔
206
                    'status'     => $exec->success,
1✔
207
                    'statusCode' => $exec->success ? 200 : 500,
1✔
208
                    'data'       => $exec->output,
1✔
209
                    'error'      => $exec->success ? null : $exec->error,
1✔
210
                ];
1✔
211

212
                // Execution completed; outcome handled below.
213

214
                // Finalización: usar completion strategy ya ejecutada dentro del coordinator.
215
                // Remoción/requeue ya la maneja la estrategia QueueCompletionStrategy (si lo configuramos). Si aún no, aplicamos fallback:
216
                $this->requeueHelper->finalize($job, $queueEntity, static fn ($j, $r) => $worker->removeJob($j, $r), $exec->success);
1✔
217
                if ($queueEntity->createdAt instanceof DateTimeInterface) {
1✔
218
                    $age = microtime(true) - $queueEntity->createdAt->getTimestamp();
1✔
219
                    $metrics->observe('jobs_age_seconds', $age, ['queue' => $queue]);
1✔
220
                }
221
                $metrics->observe('jobs_exec_seconds', $latency, ['queue' => $queue]);
1✔
222
            }
NEW
223
        } catch (Throwable $e) {
×
224
            $breaker->recordFailure();
×
225
            $response = $this->handleException($e, $worker ?? null, $job ?? null);
×
226
        }
227

228
        $this->locked = false;
1✔
229
        unset($job, $queueEntity);
1✔
230
    }
231

232
    protected function getWorker()
233
    {
234
        return QueueManager::instance()->getDefault();
×
235
    }
236

237
    protected function handleException($e, $worker, $job): array
238
    {
239
        $response['statusCode'] = $e->getCode();
×
240
        $response['error']      = $e->getMessage();
×
241
        $response['status']     = false;
×
242

243
        if ($worker && $job) {
×
244
            $worker->removeJob($job, true);
×
245
        }
246

247
        $this->showError($e);
×
248

249
        return $response;
×
250
    }
251

252
    private function getPhpBinary(): string
253
    {
254
        if (PHP_SAPI === 'cli') {
×
255
            return PHP_BINARY;
×
256
        }
257

NEW
258
        return (string) (env('PHP_BINARY_PATH') ?? 'php');
×
259
    }
260
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc