• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

daycry / jobs / 21444656761

28 Jan 2026 03:37PM UTC coverage: 55.831% (-0.5%) from 56.301%
21444656761

push

github

daycry
Add background execution to QueueRunCommand

Introduces support for running the queue command in the background using Symfony Process. Refactors parameter handling for 'queue', 'oneTime', and 'background' options, and improves queue prompt with validation.

0 of 18 new or added lines in 1 file covered. (0.0%)

2 existing lines in 2 files now uncovered.

1173 of 2101 relevant lines covered (55.83%)

4.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

46.51
/src/Commands/QueueRunCommand.php
1
<?php
2

3
declare(strict_types=1);
4

5
/**
6
 * This file is part of Daycry Queues.
7
 *
8
 * (c) Daycry <daycry9@proton.me>
9
 *
10
 * For the full copyright and license information, please view
11
 * the LICENSE file that was distributed with this source code.
12
 */
13

14
namespace Daycry\Jobs\Commands;
15

16
use CodeIgniter\CLI\CLI;
17
use CodeIgniter\Exceptions\ExceptionInterface;
18
use Config\Services;
19
use Symfony\Component\Process\Process;
20
use DateTimeInterface;
21
use Daycry\Jobs\Exceptions\JobException;
22
use Daycry\Jobs\Execution\JobLifecycleCoordinator;
23
use Daycry\Jobs\Job;
24
use Daycry\Jobs\Libraries\QueueManager;
25
use Daycry\Jobs\Libraries\RateLimiter;
26
use Daycry\Jobs\Metrics\Metrics;
27
use Daycry\Jobs\Queues\JobEnvelope;
28
use Daycry\Jobs\Queues\RequeueHelper;
29

30
/**
31
 * Long-running (or one-shot) queue worker command.
32
 * Pulls messages from the configured queue backend, executes them via lifecycle coordinator,
33
 * and applies basic retry (requeue) fallback when failures occur.
34
 */
35
class QueueRunCommand extends BaseJobsCommand
36
{
37
    protected $name                       = 'jobs:queue:run';
38
    protected $description                = 'Start queue worker.';
39
    protected $usage                      = 'queue:run <queue> [Options]';
40
    protected $arguments                  = ['queue' => 'The queue name.'];
41
    protected $options                    = ['--oneTime' => 'Only executes one time.', '--background' => 'Run the worker in background.'];
42
    protected bool $locked                = false;
43
    private ?RequeueHelper $requeueHelper = null;
44

45
    protected function earlyChecks(Job $job): void
46
    {
47
    }
1✔
48

49
    protected function lateChecks(Job $job): void
50
    {
51
    }
1✔
52

53
    protected function earlyCallbackChecks(Job $job): void
54
    {
55
    }
×
56

57
    protected function lateCallbackChecks(Job $job): void
58
    {
59
    }
×
60

61
    protected function conditionalChecks(): bool
62
    {
63
        return true;
×
64
    }
65

66
    public function run(array $params): void
67
    {
NEW
68
        $queue      = $params['queue'] ?? $params[0] ?? CLI::getOption('queue');
×
NEW
69
        $oneTime    = isset($params['oneTime']) ? (bool) $params['oneTime'] : (bool) CLI::getOption('oneTime');
×
NEW
70
        $background = isset($params['background']) ? (bool) $params['background'] : (bool) CLI::getOption('background');
×
71

72
        // Spawn background child and exit parent if requested (avoid respawn with --noBackground)
NEW
73
        if ($background) {
×
NEW
74
            $cwd = dirname(FCPATH);
×
75

NEW
76
            $spark = $cwd . DIRECTORY_SEPARATOR . 'spark';
×
77
            // Pass queue as named option to avoid interactive prompt in non-interactive child
NEW
78
            if($oneTime) {
×
NEW
79
                $args = [PHP_BINARY, $spark, $this->name, '--queue ' . $queue, '--oneTime'];
×
80
            } else {
NEW
81
                $args = [PHP_BINARY, $spark, $this->name, '--queue ' . $queue];
×
82
            }
83

NEW
84
            $cmd = sprintf(
×
NEW
85
            '%s %s %s',
×
NEW
86
            PHP_BINARY,
×
NEW
87
                realpath(FCPATH . '../spark'),
×
NEW
88
                escapeshellarg($this->name) . ' --queue ' . escapeshellarg($queue) . ($oneTime ? ' --oneTime' : ''),
×
NEW
89
            );
×
90

NEW
91
            exec($cmd);
×
92

NEW
93
            return;
×
94
        }
95

96
        if (empty($queue)) {
×
NEW
97
            $queue = CLI::prompt(lang('Queue.insertQueue'), config('Jobs')->queues, 'required');
×
98
        }
99

100
        while (true) {
×
101
            if ($this->conditionalChecks()) {
×
102
                $this->processQueue($queue);
×
103

104
                if ($oneTime) {
×
105
                    return;
×
106
                }
107

108
                sleep(config('Jobs')->defaultTimeout ?? 5);
×
109
            }
110
        }
111
    }
112

113
    protected function processQueue(string $queue): void
114
    {
115
        $response = [];
1✔
116
        $metrics  = Metrics::get();
1✔
117
        $this->requeueHelper ??= new RequeueHelper($metrics);
1✔
118

119
        // Rate limiting check
120
        $config    = config('Jobs');
1✔
121
        $rateLimit = $config->queueRateLimits[$queue] ?? 0;
1✔
122

123
        if ($rateLimit > 0) {
1✔
124
            $rateLimiter = new RateLimiter();
×
125
            if (! $rateLimiter->allow($queue, $rateLimit)) {
×
126
                // Rate limit exceeded, skip processing this cycle
127
                CLI::write("[Rate Limited] Queue '{$queue}' has reached limit of {$rateLimit} jobs/minute", 'yellow');
×
128

129
                return;
×
130
            }
131
        }
132

133
        Services::resetSingle('request');
1✔
134
        Services::resetSingle('response');
1✔
135

136
        try {
137
            $worker      = $this->getWorker();
1✔
138
            $queueEntity = $worker->watch($queue);
1✔
139

140
            if ($queueEntity === null) {
1✔
141
                // No available job for this queue at this time.
142
                return;
×
143
            }
144

145
            if ($queueEntity !== null) {
1✔
146
                $metrics->increment('jobs_fetched', 1, ['queue' => $queue]);
1✔
147
                $this->locked = true;
1✔
148
                if (! ($queueEntity instanceof JobEnvelope)) {
1✔
149
                    throw JobException::validationError('Legacy queue entity unsupported (expecting JobEnvelope).');
×
150
                }
151
                $decoded = $queueEntity->payload;
1✔
152
                if (! is_object($decoded)) {
1✔
153
                    throw JobException::validationError('Invalid envelope payload format.');
×
154
                }
155
                $job = Job::fromQueueRecord($decoded);
1✔
156

157
                $this->earlyChecks($job);
1✔
158

159
                $this->lateChecks($job); // todavía antes de ejecutar? mantener orden original
1✔
160

161
                $coordinator = new JobLifecycleCoordinator();
1✔
162
                $startExec   = microtime(true);
1✔
163
                $outcome     = $coordinator->run($job, 'queue');
1✔
164
                $latency     = microtime(true) - $startExec;
1✔
165
                $exec        = $outcome->finalResult;
1✔
166
                $response    = [
1✔
167
                    'status'     => $exec->success,
1✔
168
                    'statusCode' => $exec->success ? 200 : 500,
1✔
169
                    'data'       => $exec->output,
1✔
170
                    'error'      => $exec->success ? null : $exec->error,
1✔
171
                ];
1✔
172

173
                // Execution completed; outcome handled below.
174

175
                // Finalización: usar completion strategy ya ejecutada dentro del coordinator.
176
                // Remoción/requeue ya la maneja la estrategia QueueCompletionStrategy (si lo configuramos). Si aún no, aplicamos fallback:
177
                $this->requeueHelper->finalize($job, $queueEntity, static fn ($j, $r) => $worker->removeJob($j, $r), $exec->success);
1✔
178
                if ($queueEntity instanceof JobEnvelope && $queueEntity->createdAt instanceof DateTimeInterface) {
1✔
179
                    $age = microtime(true) - $queueEntity->createdAt->getTimestamp();
1✔
180
                    $metrics->observe('jobs_age_seconds', $age, ['queue' => $queue]);
1✔
181
                }
182
                $metrics->observe('jobs_exec_seconds', $latency, ['queue' => $queue]);
1✔
183
            }
184
        } catch (ExceptionInterface $e) {
×
185
            $response = $this->handleException($e, $worker ?? null, $job ?? null);
×
186
        }
187

188
        $this->locked = false;
1✔
189
        unset($job, $queueEntity);
1✔
190
    }
191

192
    protected function getWorker()
193
    {
194
        return QueueManager::instance()->getDefault();
×
195
    }
196

197
    /*protected function prepareResponse($result): array
198
    {
199
        $response['status'] = true;
200

201
        if (! $result instanceof Response) {
202
            $result = (Services::response(null, true))->setStatusCode(200)->setBody($result);
203
        }
204

205
        $response['statusCode'] = $result->getStatusCode();
206
        $response['data']       = $result->getBody();
207

208
        return $response;
209
    }*/
210

211
    protected function handleException($e, $worker, $job): array
212
    {
213
        $response['statusCode'] = $e->getCode();
×
214
        $response['error']      = $e->getMessage();
×
215
        $response['status']     = false;
×
216

217
        if ($worker && $job) {
×
218
            $worker->removeJob($job, true);
×
219
        }
220

221
        $this->showError($e);
×
222

223
        return $response;
×
224
    }
225

226
    protected function finalizeJob(array $response, $worker, Job $job): void
227
    {
228
        // Ya no se usa: la lógica de finalize se hace inline tras ejecutar el JobExecutor.
229
    }
×
230

231
    // Metrics retrieval now centralized in Metrics::get(); no local logic needed.
232
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc