• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

daycry / jobs / 24458365411

05 Apr 2026 08:48AM UTC coverage: 53.938% (-2.2%) from 56.164%
24458365411

push

github

daycry
Optimize

50 of 192 new or added lines in 14 files covered. (26.04%)

3 existing lines in 3 files now uncovered.

1219 of 2260 relevant lines covered (53.94%)

4.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.07
/src/Models/QueueModel.php
1
<?php
2

3
declare(strict_types=1);
4

5
/**
6
 * This file is part of Daycry Queues.
7
 *
8
 * (c) Daycry <daycry9@proton.me>
9
 *
10
 * For the full copyright and license information, please view
11
 * the LICENSE file that was distributed with this source code.
12
 */
13

14
namespace Daycry\Jobs\Models;
15

16
use CodeIgniter\Database\ConnectionInterface;
17
use CodeIgniter\Model;
18
use CodeIgniter\Validation\ValidationInterface;
19
use Config\Database;
20
use DateTime;
21
use DateTimeZone;
22
use Daycry\Jobs\Entities\Queue;
23

24
/**
25
 * Model for interacting with queued job records (insertion, fetching next pending job).
26
 */
27
class QueueModel extends Model
28
{
29
    protected $primaryKey     = 'id';
30
    protected $returnType     = Queue::class;
31
    protected $useSoftDeletes = false;
32
    protected $allowedFields  = [
33
        'identifier',
34
        'queue',
35
        'payload',
36
        'priority',
37
        'schedule',
38
        'status',
39
        'max_retries',
40
        'attempts',
41
    ];
42
    protected $useTimestamps = true;
43
    protected $createdField  = 'created_at';
44
    protected $updatedField  = 'updated_at';
45
    protected $deletedField  = 'deleted_at';
46

47
    public function __construct(?ConnectionInterface &$db = null, ?ValidationInterface $validation = null)
48
    {
49
        if ($db === null) {
6✔
50
            $db            = Database::connect(config('Jobs')->database['group']);
6✔
51
            $this->DBGroup = config('Jobs')->database['group'];
6✔
52
        }
53

54
        parent::__construct($db, $validation);
6✔
55
    }
56

57
    protected function initialize(): void
58
    {
59
        parent::initialize();
6✔
60

61
        $this->table = config('Jobs')->database['table'];
6✔
62
    }
63

64
    /**
65
     * Fetch next pending job ready for execution ordered by priority then schedule.
66
     */
67
    public function getJob(): ?Queue
68
    {
69
        $now = (new DateTime('now', new DateTimeZone(config('App')->appTimezone)))->format('Y-m-d H:i:s');
×
70

71
        return $this->where('status', 'pending')
×
72
            ->where('schedule <=', $now)
×
73
            ->orderBy('priority ASC, schedule ASC')
×
74
            ->first();
×
75
    }
76

77
    /**
78
     * Reserve a job from the queue safely (Atomic operation).
79
     * Attempts FOR UPDATE SKIP LOCKED first (MySQL 8+, PostgreSQL 9.5+),
80
     * falls back to optimistic locking for older databases or SQLite.
81
     */
82
    public function reserveJob(string $queue): ?Queue
83
    {
84
        // Try atomic locking first (best for concurrency)
85
        if (self::$supportsSkipLocked !== false) {
3✔
86
            $result = $this->reserveJobSkipLocked($queue);
1✔
87
            if ($result !== null || self::$supportsSkipLocked === true) {
1✔
NEW
88
                return $result;
×
89
            }
90
        }
91

92
        return $this->reserveJobOptimistic($queue);
3✔
93
    }
94

95
    private static ?bool $supportsSkipLocked = null;
96

97
    /**
98
     * Reserve using FOR UPDATE SKIP LOCKED (MySQL 8+, PostgreSQL 9.5+).
99
     */
100
    private function reserveJobSkipLocked(string $queue): ?Queue
101
    {
102
        $table = $this->db->prefixTable($this->table);
1✔
103
        $now   = (new DateTime('now', new DateTimeZone(config('App')->appTimezone)))->format('Y-m-d H:i:s');
1✔
104

105
        try {
106
            $this->db->transStart();
1✔
107

108
            $sql = "SELECT id FROM {$table}
1✔
109
                    WHERE queue = ? AND status = 'pending' AND schedule <= ?
110
                    ORDER BY priority ASC, schedule ASC
111
                    LIMIT 1
112
                    FOR UPDATE SKIP LOCKED";
1✔
113

114
            $query = $this->db->query($sql, [$queue, $now]);
1✔
115
            $row   = $query->getRow();
1✔
116

NEW
117
            if (! $row) {
×
NEW
118
                $this->db->transComplete();
×
NEW
119
                self::$supportsSkipLocked = true;
×
120

NEW
121
                return null;
×
122
            }
123

NEW
124
            $updateSql = "UPDATE {$table}
×
125
                          SET status = 'in_progress', updated_at = ?
NEW
126
                          WHERE id = ?";
×
127

NEW
128
            $this->db->query($updateSql, [$now, $row->id]);
×
NEW
129
            $this->db->transComplete();
×
130

NEW
131
            self::$supportsSkipLocked = true;
×
132

NEW
133
            return $this->find($row->id);
×
134
        } catch (\Throwable $e) {
1✔
135
            try {
136
                $this->db->transRollback();
1✔
NEW
137
            } catch (\Throwable) {
×
138
            }
139
            // Database doesn't support SKIP LOCKED — fall back permanently
140
            self::$supportsSkipLocked = false;
1✔
141

142
            return null;
1✔
143
        }
144
    }
145

146
    /**
147
     * Reserve using optimistic locking (fallback for older databases).
148
     */
149
    private function reserveJobOptimistic(string $queue): ?Queue
150
    {
151
        $table       = $this->db->prefixTable($this->table);
3✔
152
        $attempts    = 0;
3✔
153
        $maxAttempts = 3;
3✔
154

155
        while ($attempts < $maxAttempts) {
3✔
156
            $now = (new DateTime('now', new DateTimeZone(config('App')->appTimezone)))->format('Y-m-d H:i:s');
3✔
157

158
            $sql = "SELECT id FROM {$table}
3✔
159
                    WHERE queue = ? AND status = 'pending' AND schedule <= ?
160
                    ORDER BY priority ASC, schedule ASC LIMIT 1";
3✔
161

162
            $query = $this->db->query($sql, [$queue, $now]);
3✔
163
            $row   = $query->getRow();
3✔
164

165
            if (! $row) {
3✔
NEW
166
                return null;
×
167
            }
168

169
            $updateSql = "UPDATE {$table}
3✔
170
                          SET status = 'in_progress', updated_at = ?
171
                          WHERE id = ? AND status = 'pending'";
3✔
172

173
            $this->db->query($updateSql, [$now, $row->id]);
3✔
174

175
            if ($this->db->affectedRows() > 0) {
3✔
176
                return $this->find($row->id);
3✔
177
            }
178

UNCOV
179
            $attempts++;
×
180
            usleep(10000); // 10ms wait
×
181
        }
182

NEW
183
        return null;
×
184
    }
185

186
    /**
187
     * Reset SKIP LOCKED detection (for testing).
188
     */
189
    public static function resetSkipLockedDetection(): void
190
    {
NEW
191
        self::$supportsSkipLocked = null;
×
192
    }
193
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc