• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

daycry / jobs / 26886467550

03 Jun 2026 01:01PM UTC coverage: 88.948% (+14.0%) from 74.974%
26886467550

push

github

web-flow
v3.0: single clean architecture (remove V1, lease-based queues, secure-by-default)

Complete v3.0 rewrite into a single, clean architecture. The v1 API and the V2\ scaffolding
are removed (no facade, no dual code); the package passes PHPStan level 6 + strict-rules +
codeigniter with NO baseline.

- Definition: Jobs::define()->...->dispatch() fluent builder -> immutable JobDefinition.
- Handlers decoupled from the god-object (JobHandlerInterface / AbstractJobHandler / TypedJobHandler + JobContext).
- One QueueBackend contract (enqueue/fetch(lease)/ack/nack(delay)/abandon/reapExpired) with 5 backends:
  Sync, Database, Redis, Beanstalk, ServiceBus.
- Runtime: one attempt per fetch; real interrupting Timeout; opt-in idempotency; single-instance lock.
- Worker/Cron: jobs:queue:work, jobs:queue:reap, jobs:cronjob:run, jobs:queue:purge.
- Secure-by-default: HMAC-signed envelopes, per-queue handler allowlist, ShellHandler deny-by-default,
  EventHandler allowlist, UrlHandler anti-SSRF.

Resolves audit findings #1,#2,#3,#4,#5,#6,#7,#8,#10,#12,#13,#17,#18,#19,#20,#22.
Tests: 359 (Beanstalk live); line coverage 88.9%; PHPStan/Psalm/Rector/cs green on PHP 8.2-8.5.

BREAKING CHANGE: v1 API removed. See docs/MIGRATION-v1-to-v3.md.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

983 of 1103 new or added lines in 43 files covered. (89.12%)

15 existing lines in 3 files now uncovered.

1497 of 1683 relevant lines covered (88.95%)

7.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.75
/src/Queues/Backends/ServiceBusBackend.php
1
<?php
2

3
declare(strict_types=1);
4

5
/**
6
 * This file is part of Daycry Queues.
7
 *
8
 * (c) Daycry <daycry9@proton.me>
9
 *
10
 * For the full copyright and license information, please view
11
 * the LICENSE file that was distributed with this source code.
12
 */
13

14
namespace Daycry\Jobs\Queues\Backends;
15

16
use DateTime;
17
use DateTimeImmutable;
18
use DateTimeZone;
19
use Daycry\Jobs\Definition\JobDefinition;
20
use Daycry\Jobs\Libraries\ServiceBusHeaders;
21
use Daycry\Jobs\Queues\EnvelopeFactory;
22
use Daycry\Jobs\Queues\JobEnvelope;
23
use Daycry\Jobs\Queues\JobLease;
24
use Daycry\Jobs\Queues\QueueBackend;
25
use stdClass;
26
use Throwable;
27

28
/**
29
 * Azure Service Bus queue implementing the v3 {@see QueueBackend} contract.
30
 *
31
 * Ports the proven HTTP peek-lock logic of the legacy {@see \Daycry\Jobs\Queues\ServiceBusQueue}
32
 * (SAS auth + BrokerProperties built by {@see ServiceBusHeaders}) onto the lease-based contract:
33
 *  - enqueue(): POST the canonical wire payload to /<queue>/messages.
34
 *  - fetch():   POST /<queue>/messages/head?timeout=<lock> (peek-lock). 201 holds the message
35
 *               under a LockToken; 204/anything-else means the queue is empty. The LockToken +
36
 *               MessageId form the lease token (".../<messageId>/<lockToken>") used to settle.
37
 *  - ack():     DELETE the lock location (complete/settle the message).
38
 *  - nack():    Service Bus has no native "unlock with delay", so we re-enqueue a fresh copy of
39
 *               the wire with attempts+1 (mirroring RedisBackend) — optionally scheduled via the
40
 *               ScheduledEnqueueTimeUtc broker property when a delay is requested — and then settle
41
 *               the original lock. Enqueue-first/settle-after means a failure never loses the message.
42
 *  - abandon(): settle (DELETE) the lock; the broker dead-letters natively after MaxDeliveryCount.
43
 *               The runtime forwards the payload to the configured DeadLetterQueue before calling this.
44
 *  - reapExpired(): 0 — the broker releases the lock automatically once it expires (LockedUntilUtc),
45
 *               so crash recovery happens server-side without a client-side reaper.
46
 *
47
 * Delivery is at-least-once; handlers should be idempotent.
48
 *
49
 * The raw HTTP calls go through an injected {@see ServiceBusTransport}; the default
50
 * {@see CurlServiceBusTransport} talks to the broker, while tests inject a fake that records calls
51
 * and serves stub responses — keeping the class {@see final} without needing a subclass.
52
 */
53
final readonly class ServiceBusBackend implements QueueBackend
54
{
55
    private const BACKEND = 'servicebus';
56

57
    private string $baseUrl;
58

59
    /**
60
     * @var array{url: string, issuer: string, secret: string}
61
     */
62
    private array $credentials;
63

64
    private ServiceBusTransport $transport;
65

66
    public function __construct(?ServiceBusTransport $transport = null)
67
    {
68
        $cfg = config('Jobs')->serviceBus;
12✔
69

70
        $url    = isset($cfg['url']) && is_string($cfg['url']) ? $cfg['url'] : '';
12✔
71
        $issuer = isset($cfg['issuer']) && is_string($cfg['issuer']) ? $cfg['issuer'] : '';
12✔
72
        $secret = isset($cfg['secret']) && is_string($cfg['secret']) ? $cfg['secret'] : '';
12✔
73

74
        if ($url === '' || $issuer === '' || $secret === '') {
12✔
NEW
75
            log_message('warning', 'ServiceBusBackend: incomplete configuration (url, issuer, or secret missing).');
×
76
        }
77

78
        $this->credentials = ['url' => $url, 'issuer' => $issuer, 'secret' => $secret];
12✔
79
        $this->baseUrl     = rtrim($url, '/') . '/';
12✔
80
        $this->transport   = $transport ?? new CurlServiceBusTransport();
12✔
81
    }
82

83
    public function enqueue(JobDefinition $definition): string
84
    {
85
        $queue      = $definition->queue ?? 'default';
2✔
86
        $identifier = bin2hex(random_bytes(8));
2✔
87
        $wire       = EnvelopeFactory::toWire($definition, $identifier);
2✔
88
        $body       = json_encode($wire, JSON_THROW_ON_ERROR);
2✔
89

90
        $this->postMessage($queue, $body, $definition->scheduledAt);
2✔
91

92
        return $identifier;
2✔
93
    }
94

95
    public function fetch(string $queue): ?JobLease
96
    {
97
        $lockTimeout = max(1, (int) (config('Jobs')->serviceBusLockTimeout ?? 60));
8✔
98

99
        $response = $this->transport->post(
8✔
100
            $this->baseUrl . $queue . '/messages/head?timeout=' . $lockTimeout,
8✔
101
            $this->authHeaders(),
8✔
102
        );
8✔
103

104
        $status = $this->statusOf($response);
8✔
105
        // 201 Created = peek-lock succeeded (message held). Anything else (204, 4xx, 5xx) = no usable message.
106
        if ($status !== 201) {
8✔
107
            return null;
2✔
108
        }
109

110
        [$messageId, $lockToken] = $this->extractLockTokens($response);
6✔
111
        if ($messageId === null || $lockToken === null) {
6✔
NEW
112
            log_message('error', 'ServiceBusBackend::fetch missing LockToken/MessageId in BrokerProperties; cannot ack message safely.');
×
113

NEW
114
            return null;
×
115
        }
116

117
        $rawBody = $this->bodyOf($response);
6✔
118
        $decoded = json_decode($rawBody);
6✔
119
        if (! $decoded instanceof stdClass) {
6✔
120
            // We hold the lock but cannot interpret the payload. Leave it locked: the broker
121
            // redelivers after lock expiry and dead-letters after MaxDeliveryCount.
NEW
122
            log_message('error', 'ServiceBusBackend::fetch deserialization failed; leaving message locked.');
×
123

NEW
124
            return null;
×
125
        }
126

127
        $lockLocation = $this->baseUrl . $queue . '/messages/' . rawurlencode($messageId) . '/' . rawurlencode($lockToken);
6✔
128

129
        $raw               = new stdClass();
6✔
130
        $raw->queue        = $queue;
6✔
131
        $raw->messageId    = $messageId;
6✔
132
        $raw->lockToken    = $lockToken;
6✔
133
        $raw->lockLocation = $lockLocation;
6✔
134
        $raw->wire         = $decoded;
6✔
135

136
        $envelope = new JobEnvelope(
6✔
137
            id: isset($decoded->identifier) && is_scalar($decoded->identifier) ? (string) $decoded->identifier : $messageId,
6✔
138
            queue: $queue,
6✔
139
            payload: $decoded,
6✔
140
            name: isset($decoded->name) && is_string($decoded->name) ? $decoded->name : null,
6✔
141
            attempts: isset($decoded->attempts) ? (int) $decoded->attempts : 0,
6✔
142
            priority: isset($decoded->priority) ? (int) $decoded->priority : null,
6✔
143
            meta: ['backend' => self::BACKEND, 'messageId' => $messageId, 'lockToken' => $lockToken, 'status' => $status],
6✔
144
            raw: $raw,
6✔
145
        );
6✔
146

147
        $owner = bin2hex(random_bytes(16));
6✔
148

149
        return JobLease::withRelativeExpiry($envelope, $lockLocation, $owner, $lockTimeout, self::BACKEND);
6✔
150
    }
151

152
    public function ack(JobLease $lease): bool
153
    {
154
        $location = $this->lockLocationOf($lease);
1✔
155
        if ($location === null) {
1✔
NEW
156
            return false;
×
157
        }
158

159
        $this->transport->delete($location, $this->authHeaders());
1✔
160

161
        return true;
1✔
162
    }
163

164
    public function nack(JobLease $lease, ?int $delaySeconds = null): bool
165
    {
166
        $raw = $lease->envelope->raw;
2✔
167
        if (! $raw instanceof stdClass || ! isset($raw->wire) || ! $raw->wire instanceof stdClass) {
2✔
NEW
168
            return false;
×
169
        }
170

171
        $queue = isset($raw->queue) && is_string($raw->queue) ? $raw->queue : $lease->envelope->queue;
2✔
172

173
        // Re-serialise with attempts+1 so the incremented counter survives the requeue (the broker
174
        // has no in-place unlock-with-delay; we recreate the message exactly like RedisBackend).
175
        $wire           = clone $raw->wire;
2✔
176
        $wire->attempts = (isset($wire->attempts) ? (int) $wire->attempts : 0) + 1;
2✔
177
        $body           = json_encode($wire, JSON_THROW_ON_ERROR);
2✔
178

179
        $delay     = max(0, $delaySeconds ?? 0);
2✔
180
        $scheduled = $delay > 0
2✔
181
            ? (new DateTimeImmutable())->modify('+' . $delay . ' seconds')
1✔
182
            : null;
1✔
183

184
        // Enqueue the new copy FIRST so the message is never lost if the settle of the original fails.
185
        $this->postMessage($queue, $body, $scheduled);
2✔
186

187
        $location = $this->lockLocationOf($lease);
2✔
188
        if ($location !== null) {
2✔
189
            $this->transport->delete($location, $this->authHeaders());
2✔
190
        }
191

192
        return true;
2✔
193
    }
194

195
    public function abandon(JobLease $lease): bool
196
    {
197
        // No client-side retry: settle the lock so the message is removed. Service Bus dead-letters
198
        // natively after MaxDeliveryCount; the runtime forwards to the configured DLQ before this call.
199
        $location = $this->lockLocationOf($lease);
2✔
200
        if ($location === null) {
2✔
NEW
201
            return false;
×
202
        }
203

204
        $this->transport->delete($location, $this->authHeaders());
2✔
205

206
        return true;
2✔
207
    }
208

209
    public function reapExpired(string $queue, int $visibilityTimeout): int
210
    {
211
        // The broker releases the peek-lock automatically once it expires (LockedUntilUtc), so a
212
        // client-side reaper is unnecessary for crash recovery.
213
        return 0;
2✔
214
    }
215

216
    /**
217
     * POST a serialized message body to the queue, optionally scheduled for future delivery.
218
     */
219
    private function postMessage(string $queue, string $body, ?DateTimeImmutable $scheduledAt): void
220
    {
221
        $builder = $this->newHeadersBuilder();
4✔
222
        if ($scheduledAt instanceof DateTimeImmutable) {
4✔
223
            try {
224
                $builder->schedule(DateTime::createFromInterface($scheduledAt)->setTimezone(new DateTimeZone('UTC')));
2✔
NEW
225
            } catch (Throwable $e) {
×
NEW
226
                log_message('warning', 'ServiceBusBackend::postMessage failed to set schedule: ' . $e->getMessage());
×
227
            }
228
        }
229

230
        $headers = array_merge(['Content-Type' => 'application/json'], $builder->getHeaders());
4✔
231

232
        $this->transport->post($this->baseUrl . $queue . '/messages', $headers, $body);
4✔
233
    }
234

235
    /**
236
     * Authorization-only headers for receive/settle (BrokerProperties carries the outgoing
237
     * MessageId and must not be sent on peek-lock or delete requests).
238
     *
239
     * @return array<string, string>
240
     */
241
    private function authHeaders(): array
242
    {
243
        $headers = $this->newHeadersBuilder()->getHeaders();
8✔
244
        unset($headers['BrokerProperties']);
8✔
245

246
        /** @var array<string, string> $headers */
247
        return $headers;
8✔
248
    }
249

250
    private function newHeadersBuilder(): ServiceBusHeaders
251
    {
252
        return (new ServiceBusHeaders())
10✔
253
            ->generateMessageId()
10✔
254
            ->generateSasToken($this->credentials['url'], $this->credentials['issuer'], $this->credentials['secret']);
10✔
255
    }
256

257
    /**
258
     * Pull MessageId and LockToken out of the BrokerProperties response header.
259
     *
260
     * @return array{0: ?string, 1: ?string} [messageId, lockToken]
261
     */
262
    private function extractLockTokens(object $response): array
263
    {
264
        if (! method_exists($response, 'getHeader')) {
6✔
NEW
265
            return [null, null];
×
266
        }
267

268
        $header = $response->getHeader('BrokerProperties');
6✔
269
        if ($header === null) {
6✔
NEW
270
            return [null, null];
×
271
        }
272

273
        $value = is_object($header) && method_exists($header, 'getValue')
6✔
274
            ? $header->getValue()
6✔
NEW
275
            : $header;
×
276

277
        if (! is_string($value) || $value === '') {
6✔
NEW
278
            return [null, null];
×
279
        }
280

281
        $decoded = json_decode($value);
6✔
282
        if (! $decoded instanceof stdClass) {
6✔
NEW
283
            return [null, null];
×
284
        }
285

286
        return [
6✔
287
            isset($decoded->MessageId) && is_scalar($decoded->MessageId) ? (string) $decoded->MessageId : null,
6✔
288
            isset($decoded->LockToken) && is_scalar($decoded->LockToken) ? (string) $decoded->LockToken : null,
6✔
289
        ];
6✔
290
    }
291

292
    private function lockLocationOf(JobLease $lease): ?string
293
    {
294
        $raw = $lease->envelope->raw;
5✔
295
        if ($raw instanceof stdClass && isset($raw->lockLocation) && is_string($raw->lockLocation) && $raw->lockLocation !== '') {
5✔
296
            return $raw->lockLocation;
5✔
297
        }
298

NEW
299
        return $lease->token !== '' ? $lease->token : null;
×
300
    }
301

302
    private function statusOf(object $response): int
303
    {
304
        if (! method_exists($response, 'getStatusCode')) {
8✔
NEW
305
            return 0;
×
306
        }
307

308
        return (int) $response->getStatusCode();
8✔
309
    }
310

311
    private function bodyOf(object $response): string
312
    {
313
        if (! method_exists($response, 'getBody')) {
6✔
NEW
314
            return '';
×
315
        }
316

317
        return (string) $response->getBody();
6✔
318
    }
319
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc