• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

daycry / jobs / 26886467550

03 Jun 2026 01:01PM UTC coverage: 88.948% (+14.0%) from 74.974%
26886467550

push

github

web-flow
v3.0: single clean architecture (remove V1, lease-based queues, secure-by-default)

Complete v3.0 rewrite into a single, clean architecture. The v1 API and the V2\ scaffolding
are removed (no facade, no dual code); the package passes PHPStan level 6 + strict-rules +
codeigniter with NO baseline.

- Definition: Jobs::define()->...->dispatch() fluent builder -> immutable JobDefinition.
- Handlers decoupled from the god-object (JobHandlerInterface / AbstractJobHandler / TypedJobHandler + JobContext).
- One QueueBackend contract (enqueue/fetch(lease)/ack/nack(delay)/abandon/reapExpired) with 5 backends:
  Sync, Database, Redis, Beanstalk, ServiceBus.
- Runtime: one attempt per fetch; real interrupting Timeout; opt-in idempotency; single-instance lock.
- Worker/Cron: jobs:queue:work, jobs:queue:reap, jobs:cronjob:run, jobs:queue:purge.
- Secure-by-default: HMAC-signed envelopes, per-queue handler allowlist, ShellHandler deny-by-default,
  EventHandler allowlist, UrlHandler anti-SSRF.

Resolves audit findings #1,#2,#3,#4,#5,#6,#7,#8,#10,#12,#13,#17,#18,#19,#20,#22.
Tests: 359 (Beanstalk live); line coverage 88.9%; PHPStan/Psalm/Rector/cs green on PHP 8.2-8.5.

BREAKING CHANGE: v1 API removed. See docs/MIGRATION-v1-to-v3.md.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

983 of 1103 new or added lines in 43 files covered. (89.12%)

15 existing lines in 3 files now uncovered.

1497 of 1683 relevant lines covered (88.95%)

7.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.71
/src/Queues/Backends/RedisBackend.php
1
<?php
2

3
declare(strict_types=1);
4

5
/**
6
 * This file is part of Daycry Queues.
7
 *
8
 * (c) Daycry <daycry9@proton.me>
9
 *
10
 * For the full copyright and license information, please view
11
 * the LICENSE file that was distributed with this source code.
12
 */
13

14
namespace Daycry\Jobs\Queues\Backends;
15

16
use Config\Cache;
17
use Daycry\Jobs\Definition\JobDefinition;
18
use Daycry\Jobs\Libraries\RedisHandler as JobsRedisHandler;
19
use Daycry\Jobs\Queues\EnvelopeFactory;
20
use Daycry\Jobs\Queues\JobEnvelope;
21
use Daycry\Jobs\Queues\JobLease;
22
use Daycry\Jobs\Queues\QueueBackend;
23
use Redis;
24
use RuntimeException;
25
use stdClass;
26
use Throwable;
27

28
/**
29
 * Redis-backed queue implementing the v3 {@see QueueBackend} contract.
30
 *
31
 * Storage model per queue (prefix 'jobs:'):
32
 *  - {q}-waiting          LIST   ready messages (LPUSH tail, RPOPLPUSH head).
33
 *  - {q}-delayed          ZSET   future messages (score = due unix ts).
34
 *  - {q}-processing       LIST   in-flight messages (atomic move via RPOPLPUSH).
35
 *  - {q}-processing-meta  HASH   raw payload => json{ts, owner} (visibility / ownership).
36
 *
37
 * v3 changes over the legacy RedisQueue:
38
 *  - nack() RE-SERIALISES the payload with attempts+1 (the legacy code re-pushed the original
39
 *    raw string, so the incremented attempt counter was lost on Redis).
40
 *  - nack(delay) honours the delay by sending the message to the delayed ZSET.
41
 *  - reapExpired() recovers messages whose visibility timeout elapsed; renewLease() lets a
42
 *    long-running worker extend the lease so the reaper does not reclaim a live job.
43
 *
44
 * Delivery is at-least-once; handlers should be idempotent.
45
 */
46
final class RedisBackend implements QueueBackend
47
{
48
    private const BACKEND = 'redis';
49

50
    private ?Redis $redis  = null;
51
    private string $prefix = 'jobs:';
52

53
    public function __construct()
54
    {
55
        if (! class_exists(Redis::class)) {
8✔
NEW
56
            return;
×
57
        }
58

59
        try {
60
            $cacheConfig          = clone config(Cache::class);
8✔
61
            $cacheConfig->handler = 'redis';
8✔
62
            $handler              = new JobsRedisHandler($cacheConfig);
8✔
63
            $handler->initialize();
8✔
64
            $this->redis = $handler->getRedis();
8✔
NEW
65
        } catch (Throwable $e) {
×
NEW
66
            log_message('warning', 'RedisBackend: connection failed — ' . $e->getMessage());
×
NEW
67
            $this->redis = null;
×
68
        }
69
    }
70

71
    public function isConnected(): bool
72
    {
73
        return $this->redis instanceof Redis;
7✔
74
    }
75

76
    public function enqueue(JobDefinition $definition): string
77
    {
78
        $redis = $this->redis;
6✔
79
        if (! $redis instanceof Redis) {
6✔
NEW
80
            throw new RuntimeException('Redis extension/connection not available');
×
81
        }
82

83
        $queue      = $definition->queue ?? 'default';
6✔
84
        $identifier = bin2hex(random_bytes(8));
6✔
85
        $wire       = EnvelopeFactory::toWire($definition, $identifier);
6✔
86
        $raw        = json_encode($wire, JSON_THROW_ON_ERROR);
6✔
87

88
        $now   = time();
6✔
89
        $due   = $definition->scheduledAt?->getTimestamp() ?? $now;
6✔
90
        $delay = max(0, $due - $now);
6✔
91

92
        if ($delay > 0) {
6✔
93
            $redis->zAdd($this->delayedKey($queue), $now + $delay, $raw);
1✔
94
        } else {
95
            $redis->lPush($this->waitingKey($queue), $raw);
5✔
96
        }
97

98
        return $identifier;
6✔
99
    }
100

101
    public function fetch(string $queue): ?JobLease
102
    {
103
        $redis = $this->redis;
7✔
104
        if (! $redis instanceof Redis) {
7✔
NEW
105
            return null;
×
106
        }
107

108
        $this->promoteDelayed($queue);
7✔
109

110
        $raw = $redis->rpoplpush($this->waitingKey($queue), $this->processingKey($queue));
7✔
111
        if (! is_string($raw) || $raw === '') {
7✔
112
            return null;
3✔
113
        }
114

115
        $owner             = bin2hex(random_bytes(16));
7✔
116
        $visibilityTimeout = (int) (config('Jobs')->redisProcessingVisibilityTimeout ?? 300);
7✔
117
        $redis->hSet($this->processingMetaKey($queue), $raw, json_encode(['ts' => time(), 'owner' => $owner], JSON_THROW_ON_ERROR));
7✔
118

119
        $wire = json_decode($raw);
7✔
120
        if (! $wire instanceof stdClass) {
7✔
121
            // Corrupt entry: drop it so the reaper does not get stuck on it.
122
            $this->dropFromProcessing($queue, $raw);
1✔
123

124
            return null;
1✔
125
        }
126

127
        $envelope = new JobEnvelope(
6✔
128
            id: isset($wire->identifier) ? (string) $wire->identifier : '',
6✔
129
            queue: $queue,
6✔
130
            payload: $wire,
6✔
131
            name: isset($wire->name) && is_string($wire->name) ? $wire->name : null,
6✔
132
            attempts: isset($wire->attempts) ? (int) $wire->attempts : 0,
6✔
133
            priority: isset($wire->priority) ? (int) $wire->priority : null,
6✔
134
            meta: ['backend' => self::BACKEND, 'owner' => $owner],
6✔
135
            raw: $wire,
6✔
136
        );
6✔
137

138
        return JobLease::withRelativeExpiry($envelope, $raw, $owner, $visibilityTimeout, self::BACKEND);
6✔
139
    }
140

141
    public function ack(JobLease $lease): bool
142
    {
143
        $redis = $this->redis;
3✔
144
        if (! $redis instanceof Redis) {
3✔
NEW
145
            return false;
×
146
        }
147

148
        $this->dropFromProcessing($lease->envelope->queue, $lease->token);
3✔
149

150
        return true;
3✔
151
    }
152

153
    public function nack(JobLease $lease, ?int $delaySeconds = null): bool
154
    {
155
        $redis = $this->redis;
1✔
156
        if (! $redis instanceof Redis) {
1✔
NEW
157
            return false;
×
158
        }
159

160
        $queue = $lease->envelope->queue;
1✔
161
        $raw   = $lease->token;
1✔
162

163
        // Re-serialise with attempts+1 so the incremented counter survives the requeue.
164
        $wire = json_decode($raw);
1✔
165
        if ($wire instanceof stdClass) {
1✔
166
            $wire->attempts = (isset($wire->attempts) ? (int) $wire->attempts : 0) + 1;
1✔
167
            $newRaw         = json_encode($wire, JSON_THROW_ON_ERROR);
1✔
168
        } else {
NEW
169
            $newRaw = $raw;
×
170
        }
171

172
        $delay = max(0, $delaySeconds ?? 0);
1✔
173

174
        $redis->multi();
1✔
175
        $redis->lrem($this->processingKey($queue), $raw, 1);
1✔
176
        $redis->hDel($this->processingMetaKey($queue), $raw);
1✔
177
        if ($delay > 0) {
1✔
NEW
178
            $redis->zAdd($this->delayedKey($queue), time() + $delay, $newRaw);
×
179
        } else {
180
            $redis->lPush($this->waitingKey($queue), $newRaw);
1✔
181
        }
182
        $redis->exec();
1✔
183

184
        return true;
1✔
185
    }
186

187
    public function abandon(JobLease $lease): bool
188
    {
189
        $redis = $this->redis;
1✔
190
        if (! $redis instanceof Redis) {
1✔
NEW
191
            return false;
×
192
        }
193

194
        // No retry: just drop it from processing. The runtime forwards to the DLQ first.
195
        $this->dropFromProcessing($lease->envelope->queue, $lease->token);
1✔
196

197
        return true;
1✔
198
    }
199

200
    public function reapExpired(string $queue, int $visibilityTimeout): int
201
    {
202
        $redis = $this->redis;
2✔
203
        if (! $redis instanceof Redis) {
2✔
NEW
204
            return 0;
×
205
        }
206

207
        $entries = $redis->hGetAll($this->processingMetaKey($queue));
2✔
208
        if (! is_array($entries)) {
2✔
NEW
209
            return 0;
×
210
        }
211

212
        $now    = time();
2✔
213
        $reaped = 0;
2✔
214

215
        foreach ($entries as $raw => $metaJson) {
2✔
216
            $meta = json_decode($metaJson, true);
2✔
217
            $ts   = is_array($meta) && isset($meta['ts']) ? (int) $meta['ts'] : 0;
2✔
218
            if ($now - $ts <= $visibilityTimeout) {
2✔
219
                continue;
1✔
220
            }
221

222
            $redis->multi();
1✔
223
            $redis->lrem($this->processingKey($queue), (string) $raw, 1);
1✔
224
            $redis->lPush($this->waitingKey($queue), (string) $raw);
1✔
225
            $redis->hDel($this->processingMetaKey($queue), (string) $raw);
1✔
226
            $redis->exec();
1✔
227
            $reaped++;
1✔
228
        }
229

230
        return $reaped;
2✔
231
    }
232

233
    /**
234
     * Extend the visibility deadline of a held lease (worker heartbeat). Re-stamps the
235
     * processing-meta timestamp so the reaper does not reclaim a still-running job.
236
     */
237
    public function renewLease(JobLease $lease): bool
238
    {
239
        $redis = $this->redis;
1✔
240
        if (! $redis instanceof Redis) {
1✔
NEW
241
            return false;
×
242
        }
243

244
        $redis->hSet(
1✔
245
            $this->processingMetaKey($lease->envelope->queue),
1✔
246
            $lease->token,
1✔
247
            json_encode(['ts' => time(), 'owner' => $lease->ownerToken], JSON_THROW_ON_ERROR),
1✔
248
        );
1✔
249

250
        return true;
1✔
251
    }
252

253
    private function dropFromProcessing(string $queue, string $raw): void
254
    {
255
        $redis = $this->redis;
5✔
256
        if (! $redis instanceof Redis) {
5✔
NEW
257
            return;
×
258
        }
259

260
        $redis->multi();
5✔
261
        $redis->lrem($this->processingKey($queue), $raw, 1);
5✔
262
        $redis->hDel($this->processingMetaKey($queue), $raw);
5✔
263
        $redis->exec();
5✔
264
    }
265

266
    private function promoteDelayed(string $queue): void
267
    {
268
        $redis = $this->redis;
7✔
269
        if (! $redis instanceof Redis) {
7✔
NEW
270
            return;
×
271
        }
272

273
        $now = time();
7✔
274

275
        // Atomic promotion via Lua to prevent duplicate promotion under concurrent workers.
276
        $lua = <<<'LUA'
7✔
277
            local items = redis.call('ZRANGEBYSCORE', KEYS[1], 0, ARGV[1], 'LIMIT', 0, 50)
278
            for _, item in ipairs(items) do
279
                if redis.call('ZREM', KEYS[1], item) == 1 then
280
                    redis.call('LPUSH', KEYS[2], item)
281
                end
282
            end
283
            return #items
284
            LUA;
7✔
285

286
        try {
287
            $redis->eval($lua, [$this->delayedKey($queue), $this->waitingKey($queue), (string) $now], 2);
7✔
NEW
288
        } catch (Throwable) {
×
289
            // Non-atomic fallback: zRem gate prevents duplicates across workers.
NEW
290
            $items = $redis->zRangeByScore($this->delayedKey($queue), '0', (string) $now, ['limit' => [0, 50]]);
×
NEW
291
            if (! is_array($items)) {
×
NEW
292
                return;
×
293
            }
294

NEW
295
            foreach ($items as $raw) {
×
NEW
296
                if ($redis->zRem($this->delayedKey($queue), (string) $raw) > 0) {
×
NEW
297
                    $redis->lPush($this->waitingKey($queue), (string) $raw);
×
298
                }
299
            }
300
        }
301
    }
302

303
    private function waitingKey(string $queue): string
304
    {
305
        return $this->prefix . $queue . '-waiting';
7✔
306
    }
307

308
    private function delayedKey(string $queue): string
309
    {
310
        return $this->prefix . $queue . '-delayed';
7✔
311
    }
312

313
    private function processingKey(string $queue): string
314
    {
315
        return $this->prefix . $queue . '-processing';
7✔
316
    }
317

318
    private function processingMetaKey(string $queue): string
319
    {
320
        return $this->prefix . $queue . '-processing-meta';
7✔
321
    }
322
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc