• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

daycry / jobs / 26886467550

03 Jun 2026 01:01PM UTC coverage: 88.948% (+14.0%) from 74.974%
26886467550

push

github

web-flow
v3.0: single clean architecture (remove V1, lease-based queues, secure-by-default)

Complete v3.0 rewrite into a single, clean architecture. The v1 API and the V2\ scaffolding
are removed (no facade, no dual code); the package passes PHPStan level 6 + strict-rules +
codeigniter with NO baseline.

- Definition: Jobs::define()->...->dispatch() fluent builder -> immutable JobDefinition.
- Handlers decoupled from the god-object (JobHandlerInterface / AbstractJobHandler / TypedJobHandler + JobContext).
- One QueueBackend contract (enqueue/fetch(lease)/ack/nack(delay)/abandon/reapExpired) with 5 backends:
  Sync, Database, Redis, Beanstalk, ServiceBus.
- Runtime: one attempt per fetch; real interrupting Timeout; opt-in idempotency; single-instance lock.
- Worker/Cron: jobs:queue:work, jobs:queue:reap, jobs:cronjob:run, jobs:queue:purge.
- Secure-by-default: HMAC-signed envelopes, per-queue handler allowlist, ShellHandler deny-by-default,
  EventHandler allowlist, UrlHandler anti-SSRF.

Resolves audit findings #1,#2,#3,#4,#5,#6,#7,#8,#10,#12,#13,#17,#18,#19,#20,#22.
Tests: 359 (Beanstalk live); line coverage 88.9%; PHPStan/Psalm/Rector/cs green on PHP 8.2-8.5.

BREAKING CHANGE: v1 API removed. See docs/MIGRATION-v1-to-v3.md.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

983 of 1103 new or added lines in 43 files covered. (89.12%)

15 existing lines in 3 files now uncovered.

1497 of 1683 relevant lines covered (88.95%)

7.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.95
/src/Models/QueueModel.php
1
<?php
2

3
declare(strict_types=1);
4

5
/**
6
 * This file is part of Daycry Queues.
7
 *
8
 * (c) Daycry <daycry9@proton.me>
9
 *
10
 * For the full copyright and license information, please view
11
 * the LICENSE file that was distributed with this source code.
12
 */
13

14
namespace Daycry\Jobs\Models;
15

16
use CodeIgniter\Database\ConnectionInterface;
17
use CodeIgniter\Model;
18
use CodeIgniter\Validation\ValidationInterface;
19
use Config\Database;
20
use DateTime;
21
use DateTimeZone;
22
use Daycry\Jobs\Entities\Queue;
23
use Throwable;
24

25
/**
26
 * Model for interacting with queued job records (insertion, fetching next pending job).
27
 */
28
class QueueModel extends Model
29
{
30
    protected $primaryKey     = 'id';
31
    protected $returnType     = Queue::class;
32
    protected $useSoftDeletes = false;
33
    protected $allowedFields  = [
34
        'identifier',
35
        'queue',
36
        'payload',
37
        'priority',
38
        'schedule',
39
        'status',
40
        'max_retries',
41
        'attempts',
42
    ];
43
    protected $useTimestamps                 = true;
44
    protected $createdField                  = 'created_at';
45
    protected $updatedField                  = 'updated_at';
46
    protected $deletedField                  = 'deleted_at';
47
    private static ?bool $supportsSkipLocked = null;
48

49
    /**
50
     * @param ConnectionInterface<mixed, mixed>|null $db
51
     */
52
    public function __construct(?ConnectionInterface &$db = null, ?ValidationInterface $validation = null)
53
    {
54
        if (! $db instanceof ConnectionInterface) {
25✔
55
            $db            = Database::connect(config('Jobs')->database['group']);
25✔
56
            $this->DBGroup = config('Jobs')->database['group'];
25✔
57
        }
58

59
        parent::__construct($db, $validation);
25✔
60
    }
61

62
    protected function initialize(): void
63
    {
64
        parent::initialize();
25✔
65

66
        $this->table = config('Jobs')->database['table'];
25✔
67
    }
68

69
    /**
70
     * Fetch next pending job ready for execution ordered by priority then schedule.
71
     */
72
    public function getJob(): ?Queue
73
    {
74
        $now = (new DateTime('now', new DateTimeZone(config('App')->appTimezone)))->format('Y-m-d H:i:s');
2✔
75

76
        return $this->where('status', 'pending')
2✔
77
            ->where('schedule <=', $now)
2✔
78
            ->orderBy('priority ASC, schedule ASC')
2✔
79
            ->first();
2✔
80
    }
81

82
    /**
83
     * Reserve a job from the queue safely (Atomic operation).
84
     * Attempts FOR UPDATE SKIP LOCKED first (MySQL 8+, PostgreSQL 9.5+),
85
     * falls back to optimistic locking for older databases or SQLite.
86
     *
87
     * When $ownerToken is provided (v3 backend path), the reserved row is stamped with
88
     * reserved_at = now and owner_token = $ownerToken so the reaper can later detect and
89
     * recover leases left behind by a crashed worker.
90
     */
91
    public function reserveJob(string $queue, ?string $ownerToken = null): ?Queue
92
    {
93
        // Try atomic locking first (best for concurrency)
94
        if (self::$supportsSkipLocked !== false) {
9✔
95
            $result = $this->reserveJobSkipLocked($queue, $ownerToken);
8✔
96
            if ($result instanceof Queue || self::$supportsSkipLocked === true) {
8✔
UNCOV
97
                return $result;
×
98
            }
99
        }
100

101
        return $this->reserveJobOptimistic($queue, $ownerToken);
9✔
102
    }
103

104
    /**
105
     * Reserve using FOR UPDATE SKIP LOCKED (MySQL 8+, PostgreSQL 9.5+).
106
     */
107
    private function reserveJobSkipLocked(string $queue, ?string $ownerToken = null): ?Queue
108
    {
109
        $table = $this->db->prefixTable($this->table);
8✔
110
        $now   = (new DateTime('now', new DateTimeZone(config('App')->appTimezone)))->format('Y-m-d H:i:s');
8✔
111

112
        try {
113
            $this->db->transStart();
8✔
114

115
            $sql = "SELECT id FROM {$table}
8✔
116
                    WHERE queue = ? AND status = 'pending' AND schedule <= ?
117
                    AND (available_at IS NULL OR available_at <= ?)
118
                    ORDER BY priority ASC, schedule ASC
119
                    LIMIT 1
120
                    FOR UPDATE SKIP LOCKED";
8✔
121

122
            $query = $this->db->query($sql, [$queue, $now, $now]);
8✔
123
            $row   = $query->getRow();
8✔
124

NEW
125
            if ($row === null) {
×
UNCOV
126
                $this->db->transComplete();
×
UNCOV
127
                self::$supportsSkipLocked = true;
×
128

UNCOV
129
                return null;
×
130
            }
131

132
            $updateSql = "UPDATE {$table}
×
133
                          SET status = 'in_progress', reserved_at = ?, owner_token = ?, updated_at = ?
134
                          WHERE id = ?";
×
135

NEW
136
            $this->db->query($updateSql, [$now, $ownerToken, $now, $row->id]);
×
137
            $this->db->transComplete();
×
138

139
            self::$supportsSkipLocked = true;
×
140

NEW
141
            return $this->findQueue((int) $row->id);
×
142
        } catch (Throwable) {
8✔
143
            try {
144
                $this->db->transRollback();
8✔
145
            } catch (Throwable) {
×
146
            }
147
            // Database doesn't support SKIP LOCKED — fall back permanently
148
            self::$supportsSkipLocked = false;
8✔
149

150
            return null;
8✔
151
        }
152
    }
153

154
    /**
155
     * Reserve using optimistic locking (fallback for older databases).
156
     * Uses exponential backoff with jitter to scale under contention.
157
     */
158
    private function reserveJobOptimistic(string $queue, ?string $ownerToken = null): ?Queue
159
    {
160
        $table       = $this->db->prefixTable($this->table);
9✔
161
        $maxAttempts = 10;
9✔
162
        $baseDelayUs = 10_000;  // 10ms inicial
9✔
163
        $maxDelayUs  = 500_000; // 500ms cap por intento
9✔
164

165
        for ($attempts = 0; $attempts < $maxAttempts; $attempts++) {
9✔
166
            $now = (new DateTime('now', new DateTimeZone(config('App')->appTimezone)))->format('Y-m-d H:i:s');
9✔
167

168
            $sql = "SELECT id FROM {$table}
9✔
169
                    WHERE queue = ? AND status = 'pending' AND schedule <= ?
170
                    AND (available_at IS NULL OR available_at <= ?)
171
                    ORDER BY priority ASC, schedule ASC LIMIT 1";
9✔
172

173
            $query = $this->db->query($sql, [$queue, $now, $now]);
9✔
174
            $row   = $query->getRow();
9✔
175

176
            if ($row === null) {
9✔
177
                // Queue empty for this worker; no point retrying.
178
                return null;
3✔
179
            }
180

181
            $updateSql = "UPDATE {$table}
8✔
182
                          SET status = 'in_progress', reserved_at = ?, owner_token = ?, updated_at = ?
183
                          WHERE id = ? AND status = 'pending'";
8✔
184

185
            $this->db->query($updateSql, [$now, $ownerToken, $now, $row->id]);
8✔
186

187
            if ($this->db->affectedRows() > 0) {
8✔
188
                return $this->findQueue((int) $row->id);
8✔
189
            }
190

191
            // Lost the race: exponential backoff with ±20% jitter, capped.
UNCOV
192
            $expDelay = min($maxDelayUs, $baseDelayUs * (2 ** $attempts));
×
UNCOV
193
            $jitter   = (int) ($expDelay * (random_int(-200, 200) / 1000));
×
UNCOV
194
            usleep(max(1_000, $expDelay + $jitter));
×
195
        }
196

UNCOV
197
        return null;
×
198
    }
199

200
    /**
201
     * Fetch a row by primary key and narrow it to a {@see Queue} entity. The model's
202
     * $returnType is Queue::class so a hit always materialises as a Queue; this helper makes
203
     * that contract explicit for static analysis (find() is typed object|array|null upstream).
204
     */
205
    private function findQueue(int $id): ?Queue
206
    {
207
        $row = $this->find($id);
8✔
208

209
        return $row instanceof Queue ? $row : null;
8✔
210
    }
211

212
    /**
213
     * Reclaim rows stuck in 'in_progress' whose lease exceeded the visibility timeout
214
     * (the owning worker crashed/stalled). Returns the number of rows recovered.
215
     */
216
    public function reapStuck(string $queue, int $visibilityTimeoutSeconds): int
217
    {
218
        $table     = $this->db->prefixTable($this->table);
4✔
219
        $threshold = (new DateTime('now', new DateTimeZone(config('App')->appTimezone)))
4✔
220
            ->modify('-' . max(0, $visibilityTimeoutSeconds) . ' seconds')
4✔
221
            ->format('Y-m-d H:i:s');
4✔
222

223
        $sql = "UPDATE {$table}
4✔
224
                SET status = 'pending', owner_token = NULL, reserved_at = NULL
225
                WHERE queue = ? AND status = 'in_progress' AND reserved_at IS NOT NULL AND reserved_at < ?";
4✔
226

227
        $this->db->query($sql, [$queue, $threshold]);
4✔
228

229
        return $this->db->affectedRows();
4✔
230
    }
231

232
    /**
233
     * Requeue a leased row IN PLACE for retry: same row/id, attempts incremented, made
234
     * available again after $delaySeconds (retry backoff). Avoids the orphan-row leak of
235
     * the legacy mark-failed + insert-new pattern.
236
     */
237
    public function requeueInPlace(int $id, int $delaySeconds = 0): bool
238
    {
239
        $table     = $this->db->prefixTable($this->table);
3✔
240
        $tz        = new DateTimeZone(config('App')->appTimezone);
3✔
241
        $now       = (new DateTime('now', $tz))->format('Y-m-d H:i:s');
3✔
242
        $available = (new DateTime('now', $tz))->modify('+' . max(0, $delaySeconds) . ' seconds')->format('Y-m-d H:i:s');
3✔
243

244
        $sql = "UPDATE {$table}
3✔
245
                SET status = 'pending', attempts = attempts + 1, available_at = ?, reserved_at = NULL, owner_token = NULL, updated_at = ?
246
                WHERE id = ?";
3✔
247

248
        $this->db->query($sql, [$available, $now, $id]);
3✔
249

250
        return $this->db->affectedRows() > 0;
3✔
251
    }
252

253
    /**
254
     * Set a terminal status ('completed' | 'failed') on a leased row.
255
     */
256
    public function markStatus(int $id, string $status): bool
257
    {
258
        $table = $this->db->prefixTable($this->table);
4✔
259
        $now   = (new DateTime('now', new DateTimeZone(config('App')->appTimezone)))->format('Y-m-d H:i:s');
4✔
260

261
        $sql = "UPDATE {$table}
4✔
262
                SET status = ?, reserved_at = NULL, owner_token = NULL, updated_at = ?
263
                WHERE id = ?";
4✔
264

265
        $this->db->query($sql, [$status, $now, $id]);
4✔
266

267
        return $this->db->affectedRows() > 0;
4✔
268
    }
269

270
    /**
271
     * Reset SKIP LOCKED detection (for testing).
272
     */
273
    public static function resetSkipLockedDetection(): void
274
    {
275
        self::$supportsSkipLocked = null;
30✔
276
    }
277
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc