• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

daycry / jobs / 19498141349

19 Nov 2025 10:30AM UTC coverage: 62.5% (+0.7%) from 61.815%
19498141349

push

github

daycry
- Fixes

4 of 4 new or added lines in 4 files covered. (100.0%)

41 existing lines in 9 files now uncovered.

1145 of 1832 relevant lines covered (62.5%)

4.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.5
/src/Commands/QueueRunCommand.php
1
<?php
2

3
declare(strict_types=1);
4

5
/**
6
 * This file is part of Daycry Queues.
7
 *
8
 * (c) Daycry <daycry9@proton.me>
9
 *
10
 * For the full copyright and license information, please view
11
 * the LICENSE file that was distributed with this source code.
12
 */
13

14
namespace Daycry\Jobs\Commands;
15

16
use CodeIgniter\CLI\CLI;
17
use CodeIgniter\Exceptions\ExceptionInterface;
18
use CodeIgniter\HTTP\Response;
19
use Config\Services;
20
use DateTimeInterface;
21
use Daycry\Jobs\Exceptions\JobException;
22
use Daycry\Jobs\Exceptions\QueueException;
23
use Daycry\Jobs\Execution\ExecutionContext;
24
use Daycry\Jobs\Execution\JobLifecycleCoordinator;
25
use Daycry\Jobs\Job;
26
use Daycry\Jobs\Libraries\QueueManager;
27
use Daycry\Jobs\Metrics\Metrics;
28
use Daycry\Jobs\Queues\JobEnvelope;
29
use Daycry\Jobs\Queues\RequeueHelper;
30

31
/**
32
 * Long-running (or one-shot) queue worker command.
33
 * Pulls messages from the configured queue backend, executes them via lifecycle coordinator,
34
 * and applies basic retry (requeue) fallback when failures occur.
35
 */
36
class QueueRunCommand extends BaseJobsCommand
37
{
38
    protected $name                       = 'jobs:queue:run';
39
    protected $description                = 'Start queue worker.';
40
    protected $usage                      = 'queue:run <queue> [Options]';
41
    protected $arguments                  = ['queue' => 'The queue name.'];
42
    protected $options                    = ['--oneTime' => 'Only executes one time.'];
43
    protected bool $locked                = false;
44
    private ?RequeueHelper $requeueHelper = null;
45

46
    protected function earlyChecks(Job $job): void
47
    {
48
    }
1✔
49

50
    protected function lateChecks(Job $job): void
51
    {
52
    }
1✔
53

54
    protected function earlyCallbackChecks(Job $job): void
55
    {
UNCOV
56
    }
×
57

58
    protected function lateCallbackChecks(Job $job): void
59
    {
UNCOV
60
    }
×
61

62
    protected function conditionalChecks(): bool
63
    {
64
        return true;
×
65
    }
66

67
    public function run(array $params): void
68
    {
UNCOV
69
        $queue   = $params[0] ?? CLI::getOption('queue');
×
70
        $oneTime = array_key_exists('oneTime', $params) || CLI::getOption('oneTime');
×
71

UNCOV
72
        if (empty($queue)) {
×
73
            $queue = CLI::prompt(lang('Queue.insertQueue'));
×
74
        }
75

UNCOV
76
        while (true) {
×
77
            if($this->conditionalChecks()) {
×
UNCOV
78
                $this->processQueue($queue);
×
79

UNCOV
80
                if ($oneTime) {
×
UNCOV
81
                    return;
×
82
                }
83

UNCOV
84
                sleep(config('Jobs')->defaultTimeout ?? 5);
×
85
            }
86
        }
87
    }
88

89
    protected function processQueue(string $queue): void
90
    {
91
        $response = [];
1✔
92
        $metrics  = Metrics::get();
1✔
93
        $this->requeueHelper ??= new RequeueHelper($metrics);
1✔
94

95
        Services::resetSingle('request');
1✔
96
        Services::resetSingle('response');
1✔
97

98
        try {
99
            $worker      = $this->getWorker();
1✔
100
            $queueEntity = $worker->watch($queue);
1✔
101

102
            if ($queueEntity !== null) {
1✔
103
                $metrics->increment('jobs_fetched', 1, ['queue' => $queue]);
1✔
104
                $this->locked = true;
1✔
105
                if (! ($queueEntity instanceof JobEnvelope)) {
1✔
UNCOV
106
                    throw JobException::validationError('Legacy queue entity unsupported (expecting JobEnvelope).');
×
107
                }
108
                $decoded = $queueEntity->payload;
1✔
109
                if (! is_object($decoded)) {
1✔
UNCOV
110
                    throw JobException::validationError('Invalid envelope payload format.');
×
111
                }
112
                $job = Job::fromQueueRecord($decoded);
1✔
113

114
                $this->earlyChecks($job);
1✔
115

116
                $this->lateChecks($job); // todavía antes de ejecutar? mantener orden original
1✔
117

118
                $ctx = new ExecutionContext(
1✔
119
                    source: 'queue',
1✔
120
                    maxRetries: $job->getMaxRetries() ?? 0,
1✔
121
                    notifyOnSuccess: $job->shouldNotifyOnSuccess(),
1✔
122
                    notifyOnFailure: $job->shouldNotifyOnFailure(),
1✔
123
                    singleInstance: $job->isSingleInstance(),
1✔
124
                    queueName: $queue,
1✔
125
                    queueWorker: $worker,
1✔
126
                    retryConfig: [
1✔
127
                        'strategy'   => config('Jobs')->retryBackoffStrategy,
1✔
128
                        'base'       => config('Jobs')->retryBackoffBase,
1✔
129
                        'multiplier' => config('Jobs')->retryBackoffMultiplier,
1✔
130
                        'jitter'     => config('Jobs')->retryBackoffJitter,
1✔
131
                        'max'        => config('Jobs')->retryBackoffMax,
1✔
132
                    ],
1✔
133
                    eventsEnabled: config('Jobs')->enableEvents ?? true,
1✔
134
                    meta: [],
1✔
135
                );
1✔
136

137
                $coordinator = new JobLifecycleCoordinator();
1✔
138
                $startExec   = microtime(true);
1✔
139
                $outcome     = $coordinator->run($job, $ctx);
1✔
140
                $latency     = microtime(true) - $startExec;
1✔
141
                $exec        = $outcome->finalResult;
1✔
142

143
                $response = [
1✔
144
                    'status'     => $exec->success,
1✔
145
                    'statusCode' => $exec->success ? 200 : 500,
1✔
146
                    'data'       => $exec->output,
1✔
147
                    'error'      => $exec->success ? null : $exec->error,
1✔
148
                ];
1✔
149

150
                // Finalización: usar completion strategy ya ejecutada dentro del coordinator.
151
                // Remoción/requeue ya la maneja la estrategia QueueCompletionStrategy (si lo configuramos). Si aún no, aplicamos fallback:
152
                $this->requeueHelper->finalize($job, $queueEntity, static fn ($j, $r) => $worker->removeJob($j, $r), $exec->success);
1✔
153
                if ($queueEntity instanceof JobEnvelope && $queueEntity->createdAt instanceof DateTimeInterface) {
1✔
154
                    $age = microtime(true) - $queueEntity->createdAt->getTimestamp();
1✔
155
                    $metrics->observe('jobs_age_seconds', $age, ['queue' => $queue]);
1✔
156
                }
157
                $metrics->observe('jobs_exec_seconds', $latency, ['queue' => $queue]);
1✔
158
            }
UNCOV
159
        } catch (ExceptionInterface $e) {
×
UNCOV
160
            $response = $this->handleException($e, $worker ?? null, $job ?? null);
×
161
        }
162

163
        $this->locked = false;
1✔
164
        unset($job, $queueEntity);
1✔
165
    }
166

167
    protected function getWorker()
168
    {
UNCOV
169
        return QueueManager::instance()->getDefault();
×
170
    }
171

172
    /*protected function prepareResponse($result): array
173
    {
174
        $response['status'] = true;
175

176
        if (! $result instanceof Response) {
177
            $result = (Services::response(null, true))->setStatusCode(200)->setBody($result);
178
        }
179

180
        $response['statusCode'] = $result->getStatusCode();
181
        $response['data']       = $result->getBody();
182

183
        return $response;
184
    }*/
185

186
    protected function handleException($e, $worker, $job): array
187
    {
188
        $response['statusCode'] = $e->getCode();
×
189
        $response['error']      = $e->getMessage();
×
UNCOV
190
        $response['status']     = false;
×
191

192
        if ($worker && $job) {
×
UNCOV
193
            $worker->removeJob($job, true);
×
194
        }
195

UNCOV
196
        $this->showError($e);
×
197

UNCOV
198
        return $response;
×
199
    }
200

201
    protected function finalizeJob(array $response, $worker, Job $job): void
202
    {
203
        // Ya no se usa: la lógica de finalize se hace inline tras ejecutar el JobExecutor.
UNCOV
204
    }
×
205

206
    // Metrics retrieval now centralized in Metrics::get(); no local logic needed.
207
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc