• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

daycry / jobs / 26886467550

03 Jun 2026 01:01PM UTC coverage: 88.948% (+14.0%) from 74.974%
26886467550

push

github

web-flow
v3.0: single clean architecture (remove V1, lease-based queues, secure-by-default)

Complete v3.0 rewrite into a single, clean architecture. The v1 API and the V2\ scaffolding
are removed (no facade, no dual code); the package passes PHPStan level 6 + strict-rules +
codeigniter with NO baseline.

- Definition: Jobs::define()->...->dispatch() fluent builder -> immutable JobDefinition.
- Handlers decoupled from the god-object (JobHandlerInterface / AbstractJobHandler / TypedJobHandler + JobContext).
- One QueueBackend contract (enqueue/fetch(lease)/ack/nack(delay)/abandon/reapExpired) with 5 backends:
  Sync, Database, Redis, Beanstalk, ServiceBus.
- Runtime: one attempt per fetch; real interrupting Timeout; opt-in idempotency; single-instance lock.
- Worker/Cron: jobs:queue:work, jobs:queue:reap, jobs:cronjob:run, jobs:queue:purge.
- Secure-by-default: HMAC-signed envelopes, per-queue handler allowlist, ShellHandler deny-by-default,
  EventHandler allowlist, UrlHandler anti-SSRF.

Resolves audit findings #1,#2,#3,#4,#5,#6,#7,#8,#10,#12,#13,#17,#18,#19,#20,#22.
Tests: 359 (Beanstalk live); line coverage 88.9%; PHPStan/Psalm/Rector/cs green on PHP 8.2-8.5.

BREAKING CHANGE: v1 API removed. See docs/MIGRATION-v1-to-v3.md.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

983 of 1103 new or added lines in 43 files covered. (89.12%)

15 existing lines in 3 files now uncovered.

1497 of 1683 relevant lines covered (88.95%)

7.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.92
/src/Commands/QueueWorkCommand.php
1
<?php
2

3
declare(strict_types=1);
4

5
/**
6
 * This file is part of Daycry Queues.
7
 *
8
 * (c) Daycry <daycry9@proton.me>
9
 *
10
 * For the full copyright and license information, please view
11
 * the LICENSE file that was distributed with this source code.
12
 */
13

14
namespace Daycry\Jobs\Commands;
15

16
use CodeIgniter\CLI\CLI;
17
use Daycry\Jobs\Libraries\CircuitBreaker;
18
use Daycry\Jobs\Libraries\RateLimiter;
19
use Daycry\Jobs\Queues\BackendFactory;
20
use Daycry\Jobs\Worker\QueueWorker;
21
use Throwable;
22

23
/**
24
 * v3 queue worker: pulls messages from the configured backend and runs them through the
25
 * QueueWorker pipeline (fetch -> verify -> run one attempt -> ack/nack(backoff)/abandon).
26
 *
27
 * Graceful shutdown: SIGTERM/SIGINT (POSIX) finish the current cycle and exit. A circuit
28
 * breaker skips a backend that is failing, and per-queue rate limits are honoured. Use
29
 * --once (or --max N) to bound the number of cycles (one-shot / cron-friendly).
30
 */
31
final class QueueWorkCommand extends BaseJobsCommand
32
{
33
    protected $name        = 'jobs:queue:work';
34
    protected $description = 'Start a v3 queue worker.';
35
    protected $usage       = 'jobs:queue:work [queue] [--once] [--max N] [--backend name]';
36
    protected $arguments   = ['queue' => 'The queue name (defaults to the first configured queue).'];
37
    protected $options     = [
38
        '--once'    => 'Process a single cycle and exit.',
39
        '--max'     => 'Process at most N cycles then exit (0 = unlimited).',
40
        '--backend' => 'Override the configured backend name.',
41
    ];
42
    private bool $shouldStop = false;
43

44
    /**
45
     * @param array<int|string, string|null> $params
46
     */
47
    public function run(array $params): int
48
    {
49
        $this->getConfig();
5✔
50

51
        $queueArg = $params['queue'] ?? $params[0] ?? CLI::getOption('queue');
5✔
52
        $queue    = is_string($queueArg) && $queueArg !== '' ? $queueArg : $this->firstQueue();
5✔
53

54
        $backendOption = $params['backend'] ?? CLI::getOption('backend');
5✔
55
        $backend       = BackendFactory::make(
5✔
56
            $this->config,
5✔
57
            is_string($backendOption) && $backendOption !== '' ? $backendOption : null,
5✔
58
        );
5✔
59
        $worker = new QueueWorker($backend);
5✔
60

61
        $maxCycles = $this->resolveMaxCycles($params);
5✔
62

63
        $this->registerSignalHandlers();
5✔
64
        CLI::write("[Worker] processing queue '{$queue}'" . ($maxCycles > 0 ? " (max {$maxCycles})" : ''), 'green');
5✔
65

66
        $cycles = 0;
5✔
67

68
        while (true) {
5✔
69
            if (function_exists('pcntl_signal_dispatch')) {
5✔
70
                pcntl_signal_dispatch();
5✔
71
            }
72
            if ($this->shouldStop) {
5✔
NEW
73
                CLI::write('[Worker] graceful shutdown complete.', 'yellow');
×
NEW
74
                break;
×
75
            }
76

77
            $status = $this->processCycle($worker, $queue);
5✔
78
            $cycles++;
5✔
79

80
            if ($maxCycles > 0 && $cycles >= $maxCycles) {
5✔
81
                break;
5✔
82
            }
83

NEW
84
            if (in_array($status, ['empty', 'rate-limited', 'circuit-open', 'error'], true)) {
×
NEW
85
                sleep(max(1, $this->config->pollInterval));
×
86
            }
87
        }
88

89
        return self::SUCCESS;
5✔
90
    }
91

92
    private function processCycle(QueueWorker $worker, string $queue): string
93
    {
94
        $limit = $this->config->queueRateLimits[$queue] ?? 0;
5✔
95
        if ($limit > 0 && ! (new RateLimiter())->allow($queue, $limit)) {
5✔
96
            CLI::write("[Rate Limited] '{$queue}' reached {$limit} jobs/min.", 'yellow');
1✔
97

98
            return 'rate-limited';
1✔
99
        }
100

101
        $breaker = new CircuitBreaker('queue_' . $queue, $this->config->circuitBreakerThreshold, $this->config->circuitBreakerCooldown);
4✔
102
        if (! $breaker->isAvailable()) {
4✔
103
            CLI::write("[Circuit Open] backend for '{$queue}' temporarily unavailable.", 'red');
1✔
104

105
            return 'circuit-open';
1✔
106
        }
107

108
        try {
109
            $result = $worker->processOnce($queue);
3✔
110
            $breaker->recordSuccess();
2✔
111
        } catch (Throwable $e) {
1✔
112
            $breaker->recordFailure();
1✔
113
            CLI::error("[Worker] backend error on '{$queue}': " . $e->getMessage());
1✔
114

115
            return 'error';
1✔
116
        }
117

118
        if ($result->status !== 'empty') {
2✔
119
            $suffix = $result->error !== null ? ' - ' . $result->error : '';
1✔
120
            CLI::write("[{$result->status}] {$queue} (attempt {$result->attempts}){$suffix}", $result->status === 'acked' ? 'cyan' : 'yellow');
1✔
121
        }
122

123
        return $result->status;
2✔
124
    }
125

126
    /**
127
     * @param array<int|string, string|null> $params
128
     */
129
    private function resolveMaxCycles(array $params): int
130
    {
131
        if (array_key_exists('once', $params) || CLI::getOption('once') !== null) {
5✔
NEW
132
            return 1;
×
133
        }
134

135
        $max = $params['max'] ?? CLI::getOption('max');
5✔
136

137
        return is_numeric($max) ? max(1, (int) $max) : 0;
5✔
138
    }
139

140
    private function firstQueue(): string
141
    {
NEW
142
        $queues = $this->config->queues;
×
NEW
143
        if (is_array($queues)) {
×
NEW
144
            $first = $queues[0] ?? 'default';
×
145

NEW
146
            return $first !== '' ? $first : 'default';
×
147
        }
148

NEW
149
        $parts = explode(',', $queues);
×
NEW
150
        $first = trim($parts[0]);
×
151

NEW
152
        return $first !== '' ? $first : 'default';
×
153
    }
154

155
    private function registerSignalHandlers(): void
156
    {
157
        if (! function_exists('pcntl_signal')) {
5✔
NEW
158
            return;
×
159
        }
160
        if (function_exists('pcntl_async_signals')) {
5✔
161
            pcntl_async_signals(true);
5✔
162
        }
163

164
        $handler = function (): void {
5✔
NEW
165
            $this->shouldStop = true;
×
NEW
166
            CLI::write('[Worker] stop signal received, finishing current cycle...', 'yellow');
×
167
        };
5✔
168
        pcntl_signal(SIGTERM, $handler);
5✔
169
        pcntl_signal(SIGINT, $handler);
5✔
170
    }
171
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc