• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

daycry / jobs / 21445690009

28 Jan 2026 04:05PM UTC coverage: 55.964%. Remained the same
21445690009

push

github

daycry
Fix parameter handling for oneTime and background flags

Updated the logic to set 'oneTime' and 'background' flags to use array_key_exists, ensuring they are set to true if present in params, regardless of value. Also removed unused Symfony Process import.

0 of 4 new or added lines in 1 file covered. (0.0%)

1173 of 2096 relevant lines covered (55.96%)

4.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.38
/src/Commands/QueueRunCommand.php
1
<?php
2

3
declare(strict_types=1);
4

5
/**
6
 * This file is part of Daycry Queues.
7
 *
8
 * (c) Daycry <daycry9@proton.me>
9
 *
10
 * For the full copyright and license information, please view
11
 * the LICENSE file that was distributed with this source code.
12
 */
13

14
namespace Daycry\Jobs\Commands;
15

16
use CodeIgniter\CLI\CLI;
17
use CodeIgniter\Exceptions\ExceptionInterface;
18
use Config\Services;
19
use DateTimeInterface;
20
use Daycry\Jobs\Exceptions\JobException;
21
use Daycry\Jobs\Execution\JobLifecycleCoordinator;
22
use Daycry\Jobs\Job;
23
use Daycry\Jobs\Libraries\QueueManager;
24
use Daycry\Jobs\Libraries\RateLimiter;
25
use Daycry\Jobs\Metrics\Metrics;
26
use Daycry\Jobs\Queues\JobEnvelope;
27
use Daycry\Jobs\Queues\RequeueHelper;
28

29
/**
30
 * Long-running (or one-shot) queue worker command.
31
 * Pulls messages from the configured queue backend, executes them via lifecycle coordinator,
32
 * and applies basic retry (requeue) fallback when failures occur.
33
 */
34
class QueueRunCommand extends BaseJobsCommand
35
{
36
    protected $name                       = 'jobs:queue:run';
37
    protected $description                = 'Start queue worker.';
38
    protected $usage                      = 'queue:run <queue> [Options]';
39
    protected $arguments                  = ['queue' => 'The queue name.'];
40
    protected $options                    = ['--oneTime' => 'Only executes one time.', '--background' => 'Run the worker in background.'];
41
    protected bool $locked                = false;
42
    private ?RequeueHelper $requeueHelper = null;
43

44
    protected function earlyChecks(Job $job): void
45
    {
46
    }
1✔
47

48
    protected function lateChecks(Job $job): void
49
    {
50
    }
1✔
51

52
    protected function earlyCallbackChecks(Job $job): void
53
    {
54
    }
×
55

56
    protected function lateCallbackChecks(Job $job): void
57
    {
58
    }
×
59

60
    protected function conditionalChecks(): bool
61
    {
62
        return true;
×
63
    }
64

65
    public function run(array $params): void
66
    {
67
        $queue      = $params['queue'] ?? $params[0] ?? CLI::getOption('queue');
×
NEW
68
        $oneTime    = array_key_exists('oneTime', $params) ? true : CLI::getOption('oneTime');
×
NEW
69
        $background = array_key_exists('background', $params) ? true : CLI::getOption('background');
×
70

71
        // Spawn background child and exit parent if requested (avoid respawn with --noBackground)
72
        if ($background) {
×
73
            $cmd = sprintf(
×
NEW
74
                '%s %s %s',
×
NEW
75
                PHP_BINARY,
×
76
                realpath(FCPATH . '../spark'),
×
77
                escapeshellarg($this->name) . ' --queue ' . escapeshellarg($queue) . ($oneTime ? ' --oneTime' : ''),
×
78
            );
×
79

80
            exec($cmd);
×
81

82
            return;
×
83
        }
84

85
        if (empty($queue)) {
×
86
            $queue = CLI::prompt(lang('Queue.insertQueue'), config('Jobs')->queues, 'required');
×
87
        }
88

89
        while (true) {
×
90
            if ($this->conditionalChecks()) {
×
91
                $this->processQueue($queue);
×
92

93
                if ($oneTime) {
×
94
                    return;
×
95
                }
96

97
                sleep(config('Jobs')->defaultTimeout ?? 5);
×
98
            }
99
        }
100
    }
101

102
    protected function processQueue(string $queue): void
103
    {
104
        $response = [];
1✔
105
        $metrics  = Metrics::get();
1✔
106
        $this->requeueHelper ??= new RequeueHelper($metrics);
1✔
107

108
        // Rate limiting check
109
        $config    = config('Jobs');
1✔
110
        $rateLimit = $config->queueRateLimits[$queue] ?? 0;
1✔
111

112
        if ($rateLimit > 0) {
1✔
113
            $rateLimiter = new RateLimiter();
×
114
            if (! $rateLimiter->allow($queue, $rateLimit)) {
×
115
                // Rate limit exceeded, skip processing this cycle
116
                CLI::write("[Rate Limited] Queue '{$queue}' has reached limit of {$rateLimit} jobs/minute", 'yellow');
×
117

118
                return;
×
119
            }
120
        }
121

122
        Services::resetSingle('request');
1✔
123
        Services::resetSingle('response');
1✔
124

125
        try {
126
            $worker      = $this->getWorker();
1✔
127
            $queueEntity = $worker->watch($queue);
1✔
128

129
            if ($queueEntity === null) {
1✔
130
                // No available job for this queue at this time.
131
                return;
×
132
            }
133

134
            if ($queueEntity !== null) {
1✔
135
                $metrics->increment('jobs_fetched', 1, ['queue' => $queue]);
1✔
136
                $this->locked = true;
1✔
137
                if (! ($queueEntity instanceof JobEnvelope)) {
1✔
138
                    throw JobException::validationError('Legacy queue entity unsupported (expecting JobEnvelope).');
×
139
                }
140
                $decoded = $queueEntity->payload;
1✔
141
                if (! is_object($decoded)) {
1✔
142
                    throw JobException::validationError('Invalid envelope payload format.');
×
143
                }
144
                $job = Job::fromQueueRecord($decoded);
1✔
145

146
                $this->earlyChecks($job);
1✔
147

148
                $this->lateChecks($job); // todavía antes de ejecutar? mantener orden original
1✔
149

150
                $coordinator = new JobLifecycleCoordinator();
1✔
151
                $startExec   = microtime(true);
1✔
152
                $outcome     = $coordinator->run($job, 'queue');
1✔
153
                $latency     = microtime(true) - $startExec;
1✔
154
                $exec        = $outcome->finalResult;
1✔
155
                $response    = [
1✔
156
                    'status'     => $exec->success,
1✔
157
                    'statusCode' => $exec->success ? 200 : 500,
1✔
158
                    'data'       => $exec->output,
1✔
159
                    'error'      => $exec->success ? null : $exec->error,
1✔
160
                ];
1✔
161

162
                // Execution completed; outcome handled below.
163

164
                // Finalización: usar completion strategy ya ejecutada dentro del coordinator.
165
                // Remoción/requeue ya la maneja la estrategia QueueCompletionStrategy (si lo configuramos). Si aún no, aplicamos fallback:
166
                $this->requeueHelper->finalize($job, $queueEntity, static fn ($j, $r) => $worker->removeJob($j, $r), $exec->success);
1✔
167
                if ($queueEntity instanceof JobEnvelope && $queueEntity->createdAt instanceof DateTimeInterface) {
1✔
168
                    $age = microtime(true) - $queueEntity->createdAt->getTimestamp();
1✔
169
                    $metrics->observe('jobs_age_seconds', $age, ['queue' => $queue]);
1✔
170
                }
171
                $metrics->observe('jobs_exec_seconds', $latency, ['queue' => $queue]);
1✔
172
            }
173
        } catch (ExceptionInterface $e) {
×
174
            $response = $this->handleException($e, $worker ?? null, $job ?? null);
×
175
        }
176

177
        $this->locked = false;
1✔
178
        unset($job, $queueEntity);
1✔
179
    }
180

181
    protected function getWorker()
182
    {
183
        return QueueManager::instance()->getDefault();
×
184
    }
185

186
    /*protected function prepareResponse($result): array
187
    {
188
        $response['status'] = true;
189

190
        if (! $result instanceof Response) {
191
            $result = (Services::response(null, true))->setStatusCode(200)->setBody($result);
192
        }
193

194
        $response['statusCode'] = $result->getStatusCode();
195
        $response['data']       = $result->getBody();
196

197
        return $response;
198
    }*/
199

200
    protected function handleException($e, $worker, $job): array
201
    {
202
        $response['statusCode'] = $e->getCode();
×
203
        $response['error']      = $e->getMessage();
×
204
        $response['status']     = false;
×
205

206
        if ($worker && $job) {
×
207
            $worker->removeJob($job, true);
×
208
        }
209

210
        $this->showError($e);
×
211

212
        return $response;
×
213
    }
214

215
    protected function finalizeJob(array $response, $worker, Job $job): void
216
    {
217
        // Ya no se usa: la lógica de finalize se hace inline tras ejecutar el JobExecutor.
218
    }
×
219

220
    // Metrics retrieval now centralized in Metrics::get(); no local logic needed.
221
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc