• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

daycry / jobs / 25524504346

07 May 2026 06:23PM UTC coverage: 75.009% (-0.04%) from 75.053%
25524504346

push

github

daycry
chore: apply Rector fixes so analyze workflow passes

The Static Analysis workflow runs rector process --dry-run, which exits
non-zero on PHP 8.2/8.3 when changes are pending. Applied the 11
recommended refactors (NullToStrictStringFuncCallArgRector,
RemoveReflectionSetAccessibleCallsRector,
AssertEmptyNullableObjectToAssertInstanceofRector,
ClassPropertyAssignToConstructorPromotionRector,
ReadOnlyClassRector, RecastingRemovalRector, RepeatedOrEqualToInArrayRector,
FlipTypeControlToUseExclusiveTypeRector, AddInstanceofAssertForNullableArgumentRector)
across V2 value objects, the new V1_1 tests, and the multi-phase v1.2 files
so dry-run reports "Rector is done!" again.

531 tests pass; PHPStan and Psalm remain clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

3 of 4 new or added lines in 4 files covered. (75.0%)

2 existing lines in 1 file now uncovered.

2137 of 2849 relevant lines covered (75.01%)

12.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.57
/src/Commands/QueueRunCommand.php
1
<?php
2

3
declare(strict_types=1);
4

5
/**
6
 * This file is part of Daycry Queues.
7
 *
8
 * (c) Daycry <daycry9@proton.me>
9
 *
10
 * For the full copyright and license information, please view
11
 * the LICENSE file that was distributed with this source code.
12
 */
13

14
namespace Daycry\Jobs\Commands;
15

16
use Daycry\Jobs\Metrics\MetricsCollectorInterface;
17
use CodeIgniter\CLI\CLI;
18
use Config\Database;
19
use Config\Services;
20
use DateTimeInterface;
21
use Daycry\Jobs\Exceptions\JobException;
22
use Daycry\Jobs\Execution\JobLifecycleCoordinator;
23
use Daycry\Jobs\Job;
24
use Daycry\Jobs\Libraries\CircuitBreaker;
25
use Daycry\Jobs\Libraries\QueueManager;
26
use Daycry\Jobs\Libraries\RateLimiter;
27
use Daycry\Jobs\Metrics\Metrics;
28
use Daycry\Jobs\Queues\JobEnvelope;
29
use Daycry\Jobs\Queues\RequeueHelper;
30
use RuntimeException;
31
use Throwable;
32

33
/**
34
 * Long-running (or one-shot) queue worker command.
35
 * Pulls messages from the configured queue backend, executes them via lifecycle coordinator,
36
 * and applies basic retry (requeue) fallback when failures occur.
37
 */
38
class QueueRunCommand extends BaseJobsCommand
39
{
40
    protected $name                       = 'jobs:queue:run';
41
    protected $description                = 'Start queue worker.';
42
    protected $usage                      = 'queue:run <queue> [Options]';
43
    protected $arguments                  = ['queue' => 'The queue name.'];
44
    protected $options                    = ['--oneTime' => 'Only executes one time.', '--background' => 'Run the worker in background.'];
45
    protected bool $locked                = false;
46
    private ?RequeueHelper $requeueHelper = null;
47
    private bool $shouldStop              = false;
48
    private int $iterationCount           = 0;
49

50
    protected function earlyChecks(Job $job): void
51
    {
52
    }
4✔
53

54
    protected function lateChecks(Job $job): void
55
    {
56
    }
4✔
57

58
    protected function earlyCallbackChecks(Job $job): void
59
    {
60
    }
1✔
61

62
    protected function lateCallbackChecks(Job $job): void
63
    {
64
    }
1✔
65

66
    protected function conditionalChecks(): bool
67
    {
68
        return true;
5✔
69
    }
70

71
    public function run(array $params): void
72
    {
73
        $queue      = $params['queue'] ?? $params[0] ?? CLI::getOption('queue');
7✔
74
        $oneTime    = array_key_exists('oneTime', $params) ? true : CLI::getOption('oneTime');
7✔
75
        $background = array_key_exists('background', $params) ? true : CLI::getOption('background');
7✔
76

77
        // Spawn background child and exit parent if requested (avoid respawn with --noBackground)
78
        if ($background) {
7✔
79
            $phpBin    = $this->getPhpBinary();
1✔
80
            $sparkPath = ROOTPATH . 'spark';
1✔
81

82
            $args = escapeshellarg('--queue') . ' ' . escapeshellarg($queue);
1✔
83
            if ($oneTime) {
1✔
84
                $args .= ' ' . escapeshellarg('--oneTime');
1✔
85
            }
86

87
            if (str_starts_with(strtolower(PHP_OS), strtolower('WIN'))) {
1✔
88
                // Windows: use start /B and redirect to NUL
89
                $cmd    = sprintf('%s %s %s %s', escapeshellarg($phpBin), escapeshellarg($sparkPath), escapeshellarg($this->name), $args);
×
90
                $winCmd = 'start "" /B ' . $cmd . ' > NUL 2>&1';
×
91
                pclose(popen($winCmd, 'r'));
×
92
            } else {
93
                // POSIX: use nohup and redirect to /dev/null, detach with &
94
                $cmd      = sprintf('%s %s %s %s', escapeshellarg($phpBin), escapeshellarg($sparkPath), escapeshellarg($this->name), $args);
1✔
95
                $posixCmd = 'nohup ' . $cmd . ' > /dev/null 2>&1 &';
1✔
96
                exec($posixCmd);
1✔
97
            }
98

99
            return;
1✔
100
        }
101

102
        if (empty($queue)) {
6✔
103
            $queue = CLI::prompt(lang('Queue.insertQueue'), config('Jobs')->queues, 'required');
×
104
        }
105

106
        // Register signal handlers for graceful shutdown (POSIX only)
107
        if (function_exists('pcntl_signal')) {
6✔
108
            pcntl_signal(SIGTERM, function (): void {
6✔
109
                $this->shouldStop = true;
×
110
                CLI::write('[Worker] SIGTERM received, finishing current job...', 'yellow');
×
111
            });
6✔
112
            pcntl_signal(SIGINT, function (): void {
6✔
113
                $this->shouldStop = true;
×
114
                CLI::write('[Worker] SIGINT received, finishing current job...', 'yellow');
×
115
            });
6✔
116
        }
117

118
        while (true) {
6✔
119
            if (function_exists('pcntl_signal_dispatch')) {
6✔
120
                pcntl_signal_dispatch();
6✔
121
            }
122

123
            if ($this->shouldStop) {
6✔
124
                CLI::write('[Worker] Graceful shutdown complete.', 'yellow');
2✔
125
                break;
2✔
126
            }
127

128
            if ($this->conditionalChecks()) {
5✔
129
                $this->processQueue($queue);
5✔
130
                $this->maintenanceTick();
5✔
131

132
                if ($oneTime) {
5✔
133
                    return;
4✔
134
                }
135

136
                $this->idleSleep();
1✔
137
            }
138
        }
139
    }
140

141
    /**
142
     * Periodic worker-loop maintenance:
143
     *  - Reset in-memory metrics every 1 000 iterations to bound memory growth (F19).
144
     *  - Ping the database connection every 100 iterations and reconnect if needed (F23).
145
     */
146
    private function maintenanceTick(): void
147
    {
148
        $this->iterationCount++;
5✔
149

150
        if ($this->iterationCount % 1000 === 0) {
5✔
151
            $metrics = Metrics::get();
×
NEW
152
            if ($metrics instanceof MetricsCollectorInterface && method_exists($metrics, 'reset')) {
×
153
                $metrics->reset();
×
154
            }
155
            if (function_exists('gc_collect_cycles')) {
×
156
                gc_collect_cycles();
×
157
            }
158
        }
159

160
        if ($this->iterationCount % 100 === 0) {
5✔
161
            $this->ensureDatabaseConnection();
×
162
        }
163
    }
164

165
    /**
166
     * Cheap SELECT 1 against the configured DB connection. On failure the connection is
167
     * reset so the next query reconnects. Important for long-lived workers behind MySQL
168
     * wait_timeout / connection idle eviction.
169
     */
170
    private function ensureDatabaseConnection(): void
171
    {
172
        try {
173
            $cfg   = config('Jobs');
×
174
            $group = $cfg->database['group'] ?? null;
×
175
            $db    = Database::connect($group);
×
176
            // simpleQuery returns a resource on success and false on failure across drivers.
177
            $result = $db->simpleQuery('SELECT 1');
×
178
            if ($result === false) {
×
179
                throw new RuntimeException('SELECT 1 failed (driver returned false).');
×
180
            }
181
        } catch (Throwable $e) {
×
182
            log_message('warning', 'QueueRunCommand: DB ping failed, reconnecting — ' . $e->getMessage());
×
183

184
            try {
185
                $cfg   = config('Jobs');
×
186
                $group = $cfg->database['group'] ?? null;
×
187
                Database::connect($group, false)->reconnect();
×
188
            } catch (Throwable $reconnect) {
×
189
                log_message('error', 'QueueRunCommand: DB reconnect failed — ' . $reconnect->getMessage());
×
190
            }
191
        }
192
    }
193

194
    /**
195
     * Sleep between polling cycles, unless the active backend uses blocking
196
     * fetch semantics (Redis BRPOPLPUSH, Beanstalk reserve_with_timeout) in
197
     * which case watch() already absorbed the wait.
198
     */
199
    private function idleSleep(): void
200
    {
201
        $cfg = config('Jobs');
1✔
202

203
        if (($cfg->blockingFetch ?? false) === true && in_array($cfg->worker, ['redis', 'beanstalk'], true)) {
1✔
204
            return;
×
205
        }
206

207
        sleep($cfg->pollInterval);
1✔
208
    }
209

210
    protected function processQueue(string $queue): void
211
    {
212
        $response = [];
12✔
213
        $metrics  = Metrics::get();
12✔
214
        $this->requeueHelper ??= new RequeueHelper($metrics);
12✔
215

216
        // Rate limiting check
217
        $config    = config('Jobs');
12✔
218
        $rateLimit = $config->queueRateLimits[$queue] ?? 0;
12✔
219

220
        if ($rateLimit > 0) {
12✔
221
            $rateLimiter = new RateLimiter();
1✔
222
            if (! $rateLimiter->allow($queue, $rateLimit)) {
1✔
223
                // Rate limit exceeded, skip processing this cycle
224
                CLI::write("[Rate Limited] Queue '{$queue}' has reached limit of {$rateLimit} jobs/minute", 'yellow');
1✔
225

226
                return;
1✔
227
            }
228
        }
229

230
        // Circuit breaker check
231
        $breaker = new CircuitBreaker(
11✔
232
            'queue_' . $queue,
11✔
233
            $config->circuitBreakerThreshold,
11✔
234
            $config->circuitBreakerCooldown,
11✔
235
        );
11✔
236

237
        if (! $breaker->isAvailable()) {
11✔
238
            CLI::write("[Circuit Open] Backend for '{$queue}' is temporarily unavailable, skipping.", 'red');
1✔
239

240
            return;
1✔
241
        }
242

243
        Services::resetSingle('request');
10✔
244
        Services::resetSingle('response');
10✔
245

246
        try {
247
            $worker      = $this->getWorker();
10✔
248
            $queueEntity = $worker->watch($queue);
10✔
249
            $breaker->recordSuccess();
9✔
250

251
            if ($queueEntity === null) {
9✔
252
                // No available job for this queue at this time.
253
                return;
3✔
254
            }
255

256
            if ($queueEntity !== null) {
6✔
257
                $metrics->increment('jobs_fetched', 1, ['queue' => $queue]);
6✔
258
                $this->locked = true;
6✔
259
                if (! ($queueEntity instanceof JobEnvelope)) {
6✔
260
                    throw JobException::validationError('Legacy queue entity unsupported (expecting JobEnvelope).');
1✔
261
                }
262
                $decoded = $queueEntity->payload;
5✔
263
                if (! is_object($decoded)) {
5✔
264
                    throw JobException::validationError('Invalid envelope payload format.');
1✔
265
                }
266
                $job = Job::fromQueueRecord($decoded);
4✔
267
                // Inject backend ID into job instance for context availability
268
                $job->setJobId($queueEntity->id);
4✔
269

270
                $this->earlyChecks($job);
4✔
271

272
                $this->lateChecks($job); // todavía antes de ejecutar? mantener orden original
4✔
273

274
                $coordinator = new JobLifecycleCoordinator();
4✔
275
                $startExec   = microtime(true);
4✔
276
                $outcome     = $coordinator->run($job, 'queue');
4✔
277
                $latency     = microtime(true) - $startExec;
4✔
278
                $exec        = $outcome->finalResult;
4✔
279
                $response    = [
4✔
280
                    'status'     => $exec->success,
4✔
281
                    'statusCode' => $exec->success ? 200 : 500,
4✔
282
                    'data'       => $exec->output,
4✔
283
                    'error'      => $exec->success ? null : $exec->error,
4✔
284
                ];
4✔
285

286
                // Execution completed; outcome handled below.
287

288
                // Finalización: usar completion strategy ya ejecutada dentro del coordinator.
289
                // Remoción/requeue ya la maneja la estrategia QueueCompletionStrategy (si lo configuramos). Si aún no, aplicamos fallback:
290
                $this->requeueHelper->finalize($job, $queueEntity, static fn ($j, $r) => $worker->removeJob($j, $r), $exec->success);
4✔
291
                if ($queueEntity->createdAt instanceof DateTimeInterface) {
4✔
292
                    $age = microtime(true) - $queueEntity->createdAt->getTimestamp();
2✔
293
                    $metrics->observe('jobs_age_seconds', $age, ['queue' => $queue]);
2✔
294
                }
295
                $metrics->observe('jobs_exec_seconds', $latency, ['queue' => $queue]);
4✔
296
            }
297
        } catch (Throwable $e) {
3✔
298
            $breaker->recordFailure();
3✔
299
            $response = $this->handleException($e, $worker ?? null, $job ?? null);
3✔
300
        }
301

302
        $this->locked = false;
7✔
303
        unset($job, $queueEntity);
7✔
304
    }
305

306
    protected function getWorker()
307
    {
308
        return QueueManager::instance()->getDefault();
×
309
    }
310

311
    protected function handleException($e, $worker, $job): array
312
    {
313
        $response['statusCode'] = $e->getCode();
5✔
314
        $response['error']      = $e->getMessage();
5✔
315
        $response['status']     = false;
5✔
316

317
        if ($worker && $job) {
5✔
318
            $worker->removeJob($job, true);
1✔
319
        }
320

321
        $this->showError($e);
5✔
322

323
        return $response;
5✔
324
    }
325

326
    private function getPhpBinary(): string
327
    {
328
        if (PHP_SAPI === 'cli') {
2✔
329
            return PHP_BINARY;
2✔
330
        }
331

332
        return (string) (env('PHP_BINARY_PATH') ?? 'php');
×
333
    }
334
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc