• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

daycry / jobs / 21145580946

19 Jan 2026 04:55PM UTC coverage: 58.025% (-4.5%) from 62.567%
21145580946

push

github

daycry
Add security, performance, and health features; simplify architecture

Introduces shell command whitelisting, smart token detection, per-queue rate limiting, dead letter queue, job timeout protection, config caching, and a health monitoring command. Refactors architecture by consolidating traits, removing the CompletionStrategy pattern, and unifying retry policies under RetryPolicyFixed. Updates documentation and tests to reflect new features and architectural changes, ensuring backward compatibility and improved reliability.

143 of 340 new or added lines in 20 files covered. (42.06%)

1 existing line in 1 file now uncovered.

1193 of 2056 relevant lines covered (58.03%)

4.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.52
/src/Commands/QueueRunCommand.php
1
<?php
2

3
declare(strict_types=1);
4

5
/**
6
 * This file is part of Daycry Queues.
7
 *
8
 * (c) Daycry <daycry9@proton.me>
9
 *
10
 * For the full copyright and license information, please view
11
 * the LICENSE file that was distributed with this source code.
12
 */
13

14
namespace Daycry\Jobs\Commands;
15

16
use CodeIgniter\CLI\CLI;
17
use CodeIgniter\Exceptions\ExceptionInterface;
18
use CodeIgniter\HTTP\Response;
19
use Config\Services;
20
use DateTimeInterface;
21
use Daycry\Jobs\Exceptions\JobException;
22
use Daycry\Jobs\Execution\JobLifecycleCoordinator;
23
use Daycry\Jobs\Job;
24
use Daycry\Jobs\Libraries\QueueManager;
25
use Daycry\Jobs\Libraries\RateLimiter;
26
use Daycry\Jobs\Metrics\Metrics;
27
use Daycry\Jobs\Queues\JobEnvelope;
28
use Daycry\Jobs\Queues\RequeueHelper;
29

30
/**
31
 * Long-running (or one-shot) queue worker command.
32
 * Pulls messages from the configured queue backend, executes them via lifecycle coordinator,
33
 * and applies basic retry (requeue) fallback when failures occur.
34
 */
35
class QueueRunCommand extends BaseJobsCommand
36
{
37
    protected $name                       = 'jobs:queue:run';
38
    protected $description                = 'Start queue worker.';
39
    protected $usage                      = 'queue:run <queue> [Options]';
40
    protected $arguments                  = ['queue' => 'The queue name.'];
41
    protected $options                    = ['--oneTime' => 'Only executes one time.'];
42
    protected bool $locked                = false;
43
    private ?RequeueHelper $requeueHelper = null;
44

45
    protected function earlyChecks(Job $job): void
46
    {
47
    }
1✔
48

49
    protected function lateChecks(Job $job): void
50
    {
51
    }
1✔
52

53
    protected function earlyCallbackChecks(Job $job): void
54
    {
55
    }
×
56

57
    protected function lateCallbackChecks(Job $job): void
58
    {
59
    }
×
60

61
    protected function conditionalChecks(): bool
62
    {
63
        return true;
×
64
    }
65

66
    public function run(array $params): void
67
    {
68
        $queue   = $params[0] ?? CLI::getOption('queue');
×
69
        $oneTime = array_key_exists('oneTime', $params) || CLI::getOption('oneTime');
×
70

71
        if (empty($queue)) {
×
72
            $queue = CLI::prompt(lang('Queue.insertQueue'));
×
73
        }
74

75
        while (true) {
×
76
            if ($this->conditionalChecks()) {
×
77
                $this->processQueue($queue);
×
78

79
                if ($oneTime) {
×
80
                    return;
×
81
                }
82

83
                sleep(config('Jobs')->defaultTimeout ?? 5);
×
84
            }
85
        }
86
    }
87

88
    protected function processQueue(string $queue): void
89
    {
90
        $response = [];
1✔
91
        $metrics  = Metrics::get();
1✔
92
        $this->requeueHelper ??= new RequeueHelper($metrics);
1✔
93

94
        // Rate limiting check
95
        $config    = config('Jobs');
1✔
96
        $rateLimit = $config->queueRateLimits[$queue] ?? 0;
1✔
97

98
        if ($rateLimit > 0) {
1✔
NEW
99
            $rateLimiter = new RateLimiter();
×
NEW
100
            if (! $rateLimiter->allow($queue, $rateLimit)) {
×
101
                // Rate limit exceeded, skip processing this cycle
NEW
102
                CLI::write("[Rate Limited] Queue '{$queue}' has reached limit of {$rateLimit} jobs/minute", 'yellow');
×
103

NEW
104
                return;
×
105
            }
106
        }
107

108
        Services::resetSingle('request');
1✔
109
        Services::resetSingle('response');
1✔
110

111
        try {
112
            $worker      = $this->getWorker();
1✔
113
            $queueEntity = $worker->watch($queue);
1✔
114

115
            if ($queueEntity !== null) {
1✔
116
                $metrics->increment('jobs_fetched', 1, ['queue' => $queue]);
1✔
117
                $this->locked = true;
1✔
118
                if (! ($queueEntity instanceof JobEnvelope)) {
1✔
119
                    throw JobException::validationError('Legacy queue entity unsupported (expecting JobEnvelope).');
×
120
                }
121
                $decoded = $queueEntity->payload;
1✔
122
                if (! is_object($decoded)) {
1✔
123
                    throw JobException::validationError('Invalid envelope payload format.');
×
124
                }
125
                $job = Job::fromQueueRecord($decoded);
1✔
126

127
                $this->earlyChecks($job);
1✔
128

129
                $this->lateChecks($job); // todavía antes de ejecutar? mantener orden original
1✔
130

131
                $coordinator = new JobLifecycleCoordinator();
1✔
132
                $startExec   = microtime(true);
1✔
133
                $outcome     = $coordinator->run($job, 'queue');
1✔
134
                $latency     = microtime(true) - $startExec;
1✔
135
                $exec        = $outcome->finalResult;
1✔
136

137
                $response = [
1✔
138
                    'status'     => $exec->success,
1✔
139
                    'statusCode' => $exec->success ? 200 : 500,
1✔
140
                    'data'       => $exec->output,
1✔
141
                    'error'      => $exec->success ? null : $exec->error,
1✔
142
                ];
1✔
143

144
                // Finalización: usar completion strategy ya ejecutada dentro del coordinator.
145
                // Remoción/requeue ya la maneja la estrategia QueueCompletionStrategy (si lo configuramos). Si aún no, aplicamos fallback:
146
                $this->requeueHelper->finalize($job, $queueEntity, static fn ($j, $r) => $worker->removeJob($j, $r), $exec->success);
1✔
147
                if ($queueEntity instanceof JobEnvelope && $queueEntity->createdAt instanceof DateTimeInterface) {
1✔
148
                    $age = microtime(true) - $queueEntity->createdAt->getTimestamp();
1✔
149
                    $metrics->observe('jobs_age_seconds', $age, ['queue' => $queue]);
1✔
150
                }
151
                $metrics->observe('jobs_exec_seconds', $latency, ['queue' => $queue]);
1✔
152
            }
153
        } catch (ExceptionInterface $e) {
×
154
            $response = $this->handleException($e, $worker ?? null, $job ?? null);
×
155
        }
156

157
        $this->locked = false;
1✔
158
        unset($job, $queueEntity);
1✔
159
    }
160

161
    protected function getWorker()
162
    {
163
        return QueueManager::instance()->getDefault();
×
164
    }
165

166
    /*protected function prepareResponse($result): array
167
    {
168
        $response['status'] = true;
169

170
        if (! $result instanceof Response) {
171
            $result = (Services::response(null, true))->setStatusCode(200)->setBody($result);
172
        }
173

174
        $response['statusCode'] = $result->getStatusCode();
175
        $response['data']       = $result->getBody();
176

177
        return $response;
178
    }*/
179

180
    protected function handleException($e, $worker, $job): array
181
    {
182
        $response['statusCode'] = $e->getCode();
×
183
        $response['error']      = $e->getMessage();
×
184
        $response['status']     = false;
×
185

186
        if ($worker && $job) {
×
187
            $worker->removeJob($job, true);
×
188
        }
189

190
        $this->showError($e);
×
191

192
        return $response;
×
193
    }
194

195
    protected function finalizeJob(array $response, $worker, Job $job): void
196
    {
197
        // Ya no se usa: la lógica de finalize se hace inline tras ejecutar el JobExecutor.
198
    }
×
199

200
    // Metrics retrieval now centralized in Metrics::get(); no local logic needed.
201
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc