• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

avoutic / web-framework / 19859702412

02 Dec 2025 01:08PM UTC coverage: 73.006% (-0.09%) from 73.091%
19859702412

push

github

avoutic
Add queue:clear command to clear a queue

1977 of 2708 relevant lines covered (73.01%)

2.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.0
/src/Task/QueueWorker.php
1
<?php
2

3
/*
4
 * This file is part of WebFramework.
5
 *
6
 * (c) Avoutic <avoutic@gmail.com>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11

12
namespace WebFramework\Task;
13

14
use Carbon\Carbon;
15
use WebFramework\Exception\ArgumentParserException;
16
use WebFramework\Logging\LogService;
17
use WebFramework\Queue\QueueService;
18

19
class QueueWorker extends ConsoleTask
20
{
21
    private int $emptySleep = 1;
22
    private ?int $maxJobs = null;
23
    private ?int $maxRuntime = null;
24

25
    public function __construct(
6✔
26
        private QueueService $queueService,
27
        private LogService $logger,
28
        private ?string $queueName = null,
29
    ) {}
6✔
30

31
    public function getCommand(): string
×
32
    {
33
        return 'queue:worker';
×
34
    }
35

36
    public function getDescription(): string
×
37
    {
38
        return 'Run a queue worker';
×
39
    }
40

41
    public function getUsage(): string
42
    {
43
        return <<<'EOF'
44
        Run a queue worker.
45

46
        This task will run a queue worker for the given queue name.
47

48
        The worker will process jobs from the queue until the maximum number of jobs or
49
        runtime is reached.
50

51
        The worker will sleep for 1 second, and check again for jobs, if there are no
52
        jobs to process.
53

54
        Usage:
55
        framework queue:worker <queueName> [--max-jobs=<maxJobs>] [--max-runtime=<maxRuntime>]
56

57
        Options:
58
        --empty-sleep=<emptySleep> The number of seconds to sleep when there are no jobs (default: 1)
59
        --max-jobs=<maxJobs>    The maximum number of jobs to process (default: unlimited)
60
        --max-runtime=<maxRuntime> The maximum run time of the worker (default: unlimited)
61
        EOF;
62
    }
63

64
    public function getArguments(): array
×
65
    {
66
        return [
×
67
            new TaskArgument('queueName', 'The name of the queue to work', true, [$this, 'setQueueName']),
×
68
        ];
×
69
    }
70

71
    public function getOptions(): array
×
72
    {
73
        return [
×
74
            new TaskOption('empty-sleep', 'e', 'The number of seconds to sleep when there are no jobs', true, [$this, 'setEmptySleep']),
×
75
            new TaskOption('max-jobs', 'm', 'The maximum number of jobs to process', true, [$this, 'setMaxJobs']),
×
76
            new TaskOption('max-runtime', 'r', 'The maximum runtime of the worker', true, [$this, 'setMaxRuntime']),
×
77
        ];
×
78
    }
79

80
    public function getEmptySleep(): ?int
×
81
    {
82
        return $this->emptySleep;
×
83
    }
84

85
    public function setEmptySleep(string $emptySleep): void
×
86
    {
87
        $this->emptySleep = (int) $emptySleep;
×
88
    }
89

90
    public function getMaxJobs(): ?int
1✔
91
    {
92
        return $this->maxJobs;
1✔
93
    }
94

95
    public function setMaxJobs(string $maxJobs): void
3✔
96
    {
97
        if (!is_numeric($maxJobs) || $maxJobs < 1)
3✔
98
        {
99
            throw new ArgumentParserException('Max jobs must be a number greater than 0');
2✔
100
        }
101

102
        $this->maxJobs = (int) $maxJobs;
1✔
103
    }
104

105
    public function getMaxRuntime(): ?int
1✔
106
    {
107
        return $this->maxRuntime;
1✔
108
    }
109

110
    public function setMaxRuntime(string $maxRuntime): void
3✔
111
    {
112
        if (!is_numeric($maxRuntime) || $maxRuntime < 1)
3✔
113
        {
114
            throw new ArgumentParserException('Max runtime must be a number greater than 0');
2✔
115
        }
116

117
        $this->maxRuntime = (int) $maxRuntime;
1✔
118
    }
119

120
    public function getQueueName(): ?string
1✔
121
    {
122
        return $this->queueName;
1✔
123
    }
124

125
    public function setQueueName(string $queueName): void
1✔
126
    {
127
        $this->queueName = $queueName;
1✔
128
    }
129

130
    public function execute(): void
7✔
131
    {
132
        if ($this->queueName === null)
7✔
133
        {
134
            throw new ArgumentParserException('Queue name not set');
1✔
135
        }
136

137
        $queue = $this->queueService->get($this->queueName);
6✔
138

139
        $jobsProcessed = 0;
6✔
140
        $startTime = Carbon::now();
6✔
141

142
        while (true)
6✔
143
        {
144
            $this->logger->debug('default', 'Popping job from queue', ['queue' => $this->queueName]);
6✔
145
            $job = $queue->popJob();
6✔
146

147
            if ($job === null)
6✔
148
            {
149
                $this->logger->debug('default', 'No job found, sleeping', [
1✔
150
                    'queue' => $this->queueName,
1✔
151
                    'delay' => $this->emptySleep,
1✔
152
                ]);
1✔
153

154
                Carbon::sleep($this->emptySleep);
1✔
155

156
                continue;
1✔
157
            }
158

159
            try
160
            {
161
                $jobHandler = $this->queueService->getJobHandler($job);
6✔
162
                $jobHandler->handle($job);
6✔
163

164
                // Mark job as completed if no exception was thrown
165
                $queue->markJobCompleted($job);
5✔
166

167
                $jobsProcessed++;
5✔
168
            }
169
            catch (\Throwable $e)
1✔
170
            {
171
                // Log to default channel with simple message
172
                $this->logger->error('default', 'Job execution failed: '.get_class($e).': '.$e->getMessage());
1✔
173

174
                // Log to exception channel with detailed context
175
                $this->logger->error('exception', 'Job execution failed with exception', [
1✔
176
                    'queue' => $this->queueName,
1✔
177
                    'jobId' => $job->getJobId(),
1✔
178
                    'jobName' => $job->getJobName(),
1✔
179
                    'exception' => get_class($e),
1✔
180
                    'message' => $e->getMessage(),
1✔
181
                    'trace' => $e->getTraceAsString(),
1✔
182
                ]);
1✔
183

184
                // Handle exceptions - mark job as failed with exception details
185
                $queue->markJobFailed($job, $e);
1✔
186

187
                // Continue processing instead of crashing the worker
188
                $jobsProcessed++;
1✔
189
            }
190

191
            if ($this->maxJobs !== null && $jobsProcessed >= $this->maxJobs)
6✔
192
            {
193
                break;
5✔
194
            }
195

196
            if ($this->maxRuntime !== null && $startTime->diffInSeconds() >= $this->maxRuntime)
2✔
197
            {
198
                break;
1✔
199
            }
200
        }
201
    }
202
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc