• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

daycry / jobs / 24514486770

05 Apr 2026 08:48AM UTC coverage: 53.982% (-2.2%) from 56.164%
24514486770

push

github

daycry
Optimize

50 of 192 new or added lines in 14 files covered. (26.04%)

3 existing lines in 3 files now uncovered.

1220 of 2260 relevant lines covered (53.98%)

4.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.26
/src/Queues/RedisQueue.php
1
<?php
2

3
declare(strict_types=1);
4

5
/**
6
 * This file is part of Daycry Queues.
7
 *
8
 * (c) Daycry <daycry9@proton.me>
9
 *
10
 * For the full copyright and license information, please view
11
 * the LICENSE file that was distributed with this source code.
12
 */
13

14
namespace Daycry\Jobs\Queues;
15

16
use Config\Cache;
17
use Daycry\Jobs\Interfaces\QueueInterface;
18
use Daycry\Jobs\Interfaces\WorkerInterface;
19
use Daycry\Jobs\Job as QueuesJob; // ext-redis required
20
use Daycry\Jobs\Libraries\RedisHandler as JobsRedisHandler;
21
use Redis;
22
use RuntimeException;
23
use Throwable;
24

25
/**
26
 * Redis-backed queue implementation.
27
 *
28
 * Storage model:
29
 *  - Immediate jobs: LPUSH into {prefix}{queue}-waiting, consumed via RPOP (FIFO semantics).
30
 *  - Delayed jobs: stored in a sorted set {prefix}{queue}-delayed with score = due timestamp, then promoted.
31
 *  - (Future) Failed jobs key reserved {prefix}{queue}-failed (not yet persisted here).
32
 *
33
 * Contract notes:
34
 *  - enqueue(): returns a string job identifier (timestamp-randhex).
35
 *  - watch(): promotes due delayed jobs then pops one waiting job; returns decoded stdClass or null.
36
 *  - removeJob(): if $recreate true it re-dispatches through Job::enqueue preserving retry semantics.
37
 */
38
class RedisQueue extends BaseQueue implements QueueInterface, WorkerInterface
39
{
40
    /**
41
     * @var object|null Redis client instance when ext-redis is available
42
     */
43
    private $redis;
44

45
    private ?object $job = null; // decoded structure { id,time,delay,data }
46
    private string $prefix;
47

48
    public function __construct()
49
    {
50
        $this->prefix = 'jobs:';
5✔
51
        if (! class_exists('Redis')) {
5✔
52
            return;
×
53
        }
54

55
        try {
56
            $cacheConfig          = config(Cache::class);
5✔
57
            $cacheConfig->handler = 'redis';
5✔
58
            $handler              = new JobsRedisHandler($cacheConfig);
5✔
59
            $handler->initialize();
5✔
60
            $this->redis = $handler->getRedis();
5✔
61
        } catch (Throwable) {
×
62
            $this->redis = null; // swallow; enqueue/watch will handle absence
×
63
        }
64
    }
65

66
    public function enqueue(object $data): string
67
    {
68
        $queue   = $data->queue ?? 'default';
4✔
69
        $delay   = $this->calculateDelay($data);
4✔
70
        $now     = time();
4✔
71
        $id      = $now . '-' . $this->generateId(bytes: 4);
4✔
72
        $payload = $this->getSerializer()->serialize((object) [
4✔
73
            'id'    => $id,
4✔
74
            'time'  => $now,
4✔
75
            'delay' => $delay->seconds,
4✔
76
            'data'  => $data,
4✔
77
        ]);
4✔
78
        if (! $this->redis) {
4✔
79
            throw new RuntimeException('Redis extension not available');
×
80
        }
81
        if (! $delay->isImmediate()) {
4✔
82
            $this->redis->zAdd($this->delayedKey($queue), $now + $delay->seconds, $payload);
2✔
83
        } else {
84
            $this->redis->lPush($this->waitingKey($queue), $payload);
2✔
85
        }
86

87
        log_message('debug', 'Job enqueued in Redis queue ' . $queue . ' with ID ' . $id);
4✔
88

89
        return $id;
4✔
90
    }
91

92
    public function watch(string $queue)
93
    {
94
        if (! $this->redis) {
4✔
95
            return null;
×
96
        }
97
        $this->promoteDelayed($queue);
4✔
98
        $raw = $this->redis->rPop($this->waitingKey($queue));
4✔
99
        if (! $raw) {
4✔
100
            return null;
3✔
101
        }
102
        $this->job = $this->getSerializer()->deserialize($raw);
3✔
103
        if (! $this->job) {
3✔
104
            return null;
×
105
        }
106
        $decodedPayload = $this->job->data ?? null;
3✔
107
        if ($decodedPayload) {
3✔
108
            return JobEnvelope::fromBackend(
3✔
109
                backend: 'redis',
3✔
110
                id: (string) ($this->job->id ?? ''),
3✔
111
                queue: $queue,
3✔
112
                payload: $decodedPayload,
3✔
113
                extraMeta: [
3✔
114
                    'delay' => $this->job->delay ?? 0,
3✔
115
                    'time'  => $this->job->time ?? null,
3✔
116
                ],
3✔
117
                raw: $this->job,
3✔
118
            );
3✔
119
        }
120

121
        return null;
×
122
    }
123

124
    public function removeJob(QueuesJob $job, bool $recreate = false): bool
125
    {
126
        if ($recreate) {
1✔
127
            // Re-enqueue directly to this redis backend instead of using push()
128
            // which might use a different worker from QueueManager
129
            $this->enqueue($job->toObject());
1✔
130
        }
131
        $this->job = null;
1✔
132

133
        return true;
1✔
134
    }
135

136
    private function promoteDelayed(string $queue): void
137
    {
138
        if (! $this->redis) {
4✔
139
            return;
×
140
        }
141
        $now = time();
4✔
142

143
        // Atomic promotion via Lua script to prevent duplicate job execution
144
        // under concurrent workers. Falls back to non-atomic approach if eval fails.
145
        $lua = <<<'LUA'
4✔
146
local items = redis.call('ZRANGEBYSCORE', KEYS[1], 0, ARGV[1], 'LIMIT', 0, 50)
147
for _, item in ipairs(items) do
148
    if redis.call('ZREM', KEYS[1], item) == 1 then
149
        redis.call('LPUSH', KEYS[2], item)
150
    end
151
end
152
return #items
153
LUA;
4✔
154

155
        try {
156
            $this->redis->eval($lua, [$this->delayedKey($queue), $this->waitingKey($queue), (string) $now], 2);
4✔
NEW
157
        } catch (Throwable) {
×
158
            // Fallback: non-atomic promotion with zRem check
NEW
159
            $items = $this->redis->zRangeByScore($this->delayedKey($queue), 0, $now, ['limit' => [0, 50]]);
×
NEW
160
            if (! $items) {
×
NEW
161
                return;
×
162
            }
163

NEW
164
            foreach ($items as $raw) {
×
NEW
165
                if ($this->redis->zRem($this->delayedKey($queue), $raw) > 0) {
×
NEW
166
                    $this->redis->lPush($this->waitingKey($queue), $raw);
×
167
                }
168
            }
169
        }
170
    }
171

172
    private function waitingKey(string $queue): string
173
    {
174
        return $this->prefix . $queue . '-waiting';
5✔
175
    }
176

177
    private function delayedKey(string $queue): string
178
    {
179
        return $this->prefix . $queue . '-delayed';
4✔
180
    }
181

182
    // Método failedKey eliminado por no uso; se gestionará almacenamiento de fallos en implementación futura.
183
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc