• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

daycry / jobs / 19498141349

19 Nov 2025 10:30AM UTC coverage: 62.5% (+0.7%) from 61.815%
19498141349

push

github

daycry
- Fixes

4 of 4 new or added lines in 4 files covered. (100.0%)

41 existing lines in 9 files now uncovered.

1145 of 1832 relevant lines covered (62.5%)

4.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.69
/src/Queues/RedisQueue.php
1
<?php
2

3
declare(strict_types=1);
4

5
/**
6
 * This file is part of Daycry Queues.
7
 *
8
 * (c) Daycry <daycry9@proton.me>
9
 *
10
 * For the full copyright and license information, please view
11
 * the LICENSE file that was distributed with this source code.
12
 */
13

14
namespace Daycry\Jobs\Queues;
15

16
use Config\Cache;
17
use DateTimeImmutable;
18
use Daycry\Jobs\Interfaces\QueueInterface;
19
use Daycry\Jobs\Interfaces\WorkerInterface;
20
use Daycry\Jobs\Job as QueuesJob; // ext-redis required
21
use Daycry\Jobs\Libraries\DateTimeHelper;
22
use Daycry\Jobs\Libraries\RedisHandler as JobsRedisHandler;
23
use Redis;
24
use RuntimeException;
25
use Throwable;
26

27
/**
28
 * Redis-backed queue implementation.
29
 *
30
 * Storage model:
31
 *  - Immediate jobs: LPUSH into {prefix}{queue}-waiting, consumed via RPOP (FIFO semantics).
32
 *  - Delayed jobs: stored in a sorted set {prefix}{queue}-delayed with score = due timestamp, then promoted.
33
 *  - (Future) Failed jobs key reserved {prefix}{queue}-failed (not yet persisted here).
34
 *
35
 * Contract notes:
36
 *  - enqueue(): returns a string job identifier (timestamp-randhex).
37
 *  - watch(): promotes due delayed jobs then pops one waiting job; returns decoded stdClass or null.
38
 *  - removeJob(): if $recreate true it re-dispatches through Job::enqueue preserving retry semantics.
39
 */
40
class RedisQueue extends BaseQueue implements QueueInterface, WorkerInterface
41
{
42
    /**
43
     * @var object|null Redis client instance when ext-redis is available
44
     */
45
    private $redis;
46

47
    private ?object $job = null; // decoded structure { id,time,delay,data }
48
    private string $prefix;
49

50
    public function __construct()
51
    {
52
        $this->prefix = 'jobs:';
5✔
53
        if (! class_exists('Redis')) {
5✔
54
            return;
×
55
        }
56

57
        try {
58
            $cacheConfig          = config(Cache::class);
5✔
59
            $cacheConfig->handler = 'redis';
5✔
60
            $handler              = new JobsRedisHandler($cacheConfig);
5✔
61
            $handler->initialize();
5✔
62
            $this->redis = $handler->getRedis();
5✔
63
        } catch (Throwable) {
×
64
            $this->redis = null; // swallow; enqueue/watch will handle absence
×
65
        }
66
    }
67

68
    public function enqueue(object $data): string
69
    {
70
        $queue = $data->queue ?? 'default';
4✔
71
        $delay = $this->calculateDelay($data);
4✔
72
        $now     = time();
4✔
73
        $id      = $now . '-' . $this->generateId(bytes: 4);
4✔
74
        $payload = $this->getSerializer()->serialize((object) [
4✔
75
            'id'    => $id,
4✔
76
            'time'  => $now,
4✔
77
            'delay' => $delay->seconds,
4✔
78
            'data'  => $data,
4✔
79
        ]);
4✔
80
        if (! $this->redis) {
4✔
81
            throw new RuntimeException('Redis extension not available');
×
82
        }
83
        if (! $delay->isImmediate()) {
4✔
84
            $this->redis->zAdd($this->delayedKey($queue), $now + $delay->seconds, $payload);
2✔
85
        } else {
86
            $this->redis->lPush($this->waitingKey($queue), $payload);
2✔
87
        }
88

89
        log_message('debug', 'Job enqueued in Redis queue ' . $queue . ' with ID ' . $id);
4✔
90

91
        return $id;
4✔
92
    }
93

94
    public function watch(string $queue)
95
    {
96
        if (! $this->redis) {
4✔
97
            return null;
×
98
        }
99
        $this->promoteDelayed($queue);
4✔
100
        $raw = $this->redis->rPop($this->waitingKey($queue));
4✔
101
        if (! $raw) {
4✔
102
            return null;
3✔
103
        }
104
        $this->job = $this->getSerializer()->deserialize($raw);
3✔
105
        if (! $this->job) {
3✔
UNCOV
106
            return null;
×
107
        }
108
        $decodedPayload = $this->job->data ?? null;
3✔
109
        if ($decodedPayload) {
3✔
110
            return JobEnvelope::fromBackend(
3✔
111
                backend: 'redis',
3✔
112
                id: (string) ($this->job->id ?? ''),
3✔
113
                queue: $queue,
3✔
114
                payload: $decodedPayload,
3✔
115
                extraMeta: [
3✔
116
                    'delay' => $this->job->delay ?? 0,
3✔
117
                    'time'  => $this->job->time ?? null,
3✔
118
                ],
3✔
119
                raw: $this->job,
3✔
120
            );
3✔
121
        }
122

UNCOV
123
        return null;
×
124
    }
125

126
    public function removeJob(QueuesJob $job, bool $recreate = false): bool
127
    {
128
        if ($recreate) {
1✔
129
            // Re-enqueue directly to this redis backend instead of using push()
130
            // which might use a different worker from QueueManager
131
            $this->enqueue($job->toObject());
1✔
132
        }
133
        $this->job = null;
1✔
134

135
        return true;
1✔
136
    }
137

138
    private function promoteDelayed(string $queue): void
139
    {
140
        if (! $this->redis) {
4✔
UNCOV
141
            return;
×
142
        }
143
        $now   = time();
4✔
144
        $items = $this->redis->zRangeByScore($this->delayedKey($queue), 0, $now, ['limit' => [0, 50]]);
4✔
145
        if (! $items) {
4✔
146
            return;
4✔
147
        }
148

149
        foreach ($items as $raw) {
2✔
150
            $this->redis->zRem($this->delayedKey($queue), $raw);
2✔
151
            $this->redis->lPush($this->waitingKey($queue), $raw);
2✔
152
        }
153
    }
154

155
    private function waitingKey(string $queue): string
156
    {
157
        return $this->prefix . $queue . '-waiting';
5✔
158
    }
159

160
    private function delayedKey(string $queue): string
161
    {
162
        return $this->prefix . $queue . '-delayed';
4✔
163
    }
164

165
    // Método failedKey eliminado por no uso; se gestionará almacenamiento de fallos en implementación futura.
166
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc