• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

daycry / jobs / 24850441053

23 Apr 2026 05:54PM UTC coverage: 52.404% (-1.5%) from 53.938%
24850441053

push

github

daycry
Fixes

104 of 219 new or added lines in 42 files covered. (47.49%)

14 existing lines in 9 files now uncovered.

1210 of 2309 relevant lines covered (52.4%)

4.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.07
/src/Models/QueueModel.php
1
<?php
2

3
declare(strict_types=1);
4

5
/**
6
 * This file is part of Daycry Queues.
7
 *
8
 * (c) Daycry <daycry9@proton.me>
9
 *
10
 * For the full copyright and license information, please view
11
 * the LICENSE file that was distributed with this source code.
12
 */
13

14
namespace Daycry\Jobs\Models;
15

16
use Throwable;
17
use CodeIgniter\Database\ConnectionInterface;
18
use CodeIgniter\Model;
19
use CodeIgniter\Validation\ValidationInterface;
20
use Config\Database;
21
use DateTime;
22
use DateTimeZone;
23
use Daycry\Jobs\Entities\Queue;
24

25
/**
26
 * Model for interacting with queued job records (insertion, fetching next pending job).
27
 */
28
class QueueModel extends Model
29
{
30
    protected $primaryKey     = 'id';
31
    protected $returnType     = Queue::class;
32
    protected $useSoftDeletes = false;
33
    protected $allowedFields  = [
34
        'identifier',
35
        'queue',
36
        'payload',
37
        'priority',
38
        'schedule',
39
        'status',
40
        'max_retries',
41
        'attempts',
42
    ];
43
    protected $useTimestamps = true;
44
    protected $createdField  = 'created_at';
45
    protected $updatedField  = 'updated_at';
46
    protected $deletedField  = 'deleted_at';
47

48
    public function __construct(?ConnectionInterface &$db = null, ?ValidationInterface $validation = null)
49
    {
50
        if (!$db instanceof ConnectionInterface) {
6✔
51
            $db            = Database::connect(config('Jobs')->database['group']);
6✔
52
            $this->DBGroup = config('Jobs')->database['group'];
6✔
53
        }
54

55
        parent::__construct($db, $validation);
6✔
56
    }
57

58
    protected function initialize(): void
59
    {
60
        parent::initialize();
6✔
61

62
        $this->table = config('Jobs')->database['table'];
6✔
63
    }
64

65
    /**
66
     * Fetch next pending job ready for execution ordered by priority then schedule.
67
     */
68
    public function getJob(): ?Queue
69
    {
70
        $now = (new DateTime('now', new DateTimeZone(config('App')->appTimezone)))->format('Y-m-d H:i:s');
×
71

72
        return $this->where('status', 'pending')
×
73
            ->where('schedule <=', $now)
×
74
            ->orderBy('priority ASC, schedule ASC')
×
75
            ->first();
×
76
    }
77

78
    /**
79
     * Reserve a job from the queue safely (Atomic operation).
80
     * Attempts FOR UPDATE SKIP LOCKED first (MySQL 8+, PostgreSQL 9.5+),
81
     * falls back to optimistic locking for older databases or SQLite.
82
     */
83
    public function reserveJob(string $queue): ?Queue
84
    {
85
        // Try atomic locking first (best for concurrency)
86
        if (self::$supportsSkipLocked !== false) {
3✔
87
            $result = $this->reserveJobSkipLocked($queue);
1✔
88
            if ($result instanceof Queue || self::$supportsSkipLocked === true) {
1✔
89
                return $result;
×
90
            }
91
        }
92

93
        return $this->reserveJobOptimistic($queue);
3✔
94
    }
95

96
    private static ?bool $supportsSkipLocked = null;
97

98
    /**
99
     * Reserve using FOR UPDATE SKIP LOCKED (MySQL 8+, PostgreSQL 9.5+).
100
     */
101
    private function reserveJobSkipLocked(string $queue): ?Queue
102
    {
103
        $table = $this->db->prefixTable($this->table);
1✔
104
        $now   = (new DateTime('now', new DateTimeZone(config('App')->appTimezone)))->format('Y-m-d H:i:s');
1✔
105

106
        try {
107
            $this->db->transStart();
1✔
108

109
            $sql = "SELECT id FROM {$table}
1✔
110
                    WHERE queue = ? AND status = 'pending' AND schedule <= ?
111
                    ORDER BY priority ASC, schedule ASC
112
                    LIMIT 1
113
                    FOR UPDATE SKIP LOCKED";
1✔
114

115
            $query = $this->db->query($sql, [$queue, $now]);
1✔
116
            $row   = $query->getRow();
1✔
117

118
            if (! $row) {
×
119
                $this->db->transComplete();
×
120
                self::$supportsSkipLocked = true;
×
121

122
                return null;
×
123
            }
124

125
            $updateSql = "UPDATE {$table}
×
126
                          SET status = 'in_progress', updated_at = ?
127
                          WHERE id = ?";
×
128

129
            $this->db->query($updateSql, [$now, $row->id]);
×
130
            $this->db->transComplete();
×
131

132
            self::$supportsSkipLocked = true;
×
133

134
            /** @var Queue|null */
UNCOV
135
            return $this->find($row->id);
×
136
        } catch (Throwable) {
1✔
137
            try {
138
                $this->db->transRollback();
1✔
NEW
139
            } catch (Throwable) {
×
140
            }
141
            // Database doesn't support SKIP LOCKED — fall back permanently
142
            self::$supportsSkipLocked = false;
1✔
143

144
            return null;
1✔
145
        }
146
    }
147

148
    /**
149
     * Reserve using optimistic locking (fallback for older databases).
150
     */
151
    private function reserveJobOptimistic(string $queue): ?Queue
152
    {
153
        $table       = $this->db->prefixTable($this->table);
3✔
154
        $attempts    = 0;
3✔
155
        $maxAttempts = 3;
3✔
156

157
        while ($attempts < $maxAttempts) {
3✔
158
            $now = (new DateTime('now', new DateTimeZone(config('App')->appTimezone)))->format('Y-m-d H:i:s');
3✔
159

160
            $sql = "SELECT id FROM {$table}
3✔
161
                    WHERE queue = ? AND status = 'pending' AND schedule <= ?
162
                    ORDER BY priority ASC, schedule ASC LIMIT 1";
3✔
163

164
            $query = $this->db->query($sql, [$queue, $now]);
3✔
165
            $row   = $query->getRow();
3✔
166

167
            if (! $row) {
3✔
168
                return null;
×
169
            }
170

171
            $updateSql = "UPDATE {$table}
3✔
172
                          SET status = 'in_progress', updated_at = ?
173
                          WHERE id = ? AND status = 'pending'";
3✔
174

175
            $this->db->query($updateSql, [$now, $row->id]);
3✔
176

177
            if ($this->db->affectedRows() > 0) {
3✔
178
                /** @var Queue|null */
179
                return $this->find($row->id);
3✔
180
            }
181

182
            $attempts++;
×
183
            usleep(10000); // 10ms wait
×
184
        }
185

186
        return null;
×
187
    }
188

189
    /**
190
     * Reset SKIP LOCKED detection (for testing).
191
     */
192
    public static function resetSkipLockedDetection(): void
193
    {
194
        self::$supportsSkipLocked = null;
×
195
    }
196
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc