• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

daycry / jobs / 24850441053

23 Apr 2026 05:54PM UTC coverage: 52.404% (-1.5%) from 53.938%
24850441053

push

github

daycry
Fixes

104 of 219 new or added lines in 42 files covered. (47.49%)

14 existing lines in 9 files now uncovered.

1210 of 2309 relevant lines covered (52.4%)

4.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.0
/src/Queues/ServiceBusQueue.php
1
<?php
2

3
declare(strict_types=1);
4

5
/**
6
 * This file is part of Daycry Queues.
7
 *
8
 * (c) Daycry <daycry9@proton.me>
9
 *
10
 * For the full copyright and license information, please view
11
 * the LICENSE file that was distributed with this source code.
12
 */
13

14
namespace Daycry\Jobs\Queues;
15

16
use DateTimeInterface;
17
use Daycry\Jobs\Interfaces\QueueInterface;
18
use Daycry\Jobs\Interfaces\WorkerInterface;
19
use Daycry\Jobs\Job as QueuesJob;
20
use Daycry\Jobs\Libraries\ServiceBusHeaders;
21
use Throwable;
22

23
/**
24
 * Azure Service Bus queue (simplified HTTP interface using CodeIgniter curlrequest).
25
 *
26
 * Contract notes:
27
 *  - enqueue(): sends a message, returns generated MessageId string or empty string on failure.
28
 *  - watch(): retrieves (and deletes) the head message (destructive read) and returns JobEnvelope or null.
29
 *  - removeJob(): optionally re-enqueues the original Job (no lock renewal / settlement implemented).
30
 *
31
 * Limitations / TODO:
32
 *  - Real SAS token lifetime / caching not implemented (token built per instance).
33
 *  - Proper peek-lock flow (receive + settle) not implemented; current watch() is destructive (DELETE head).
34
 *  - Scheduling uses x-ms-scheduled-enqueue-time only if provided.
35
 */
36
class ServiceBusQueue extends BaseQueue implements QueueInterface, WorkerInterface
37
{
38
    private readonly string $baseUrl; // ['issuer' => '', 'secret' => '']
39
    private ?array $job = null; // ['body' => object, 'headers' => [], 'status' => int]
40
    private readonly ServiceBusHeaders $headersBuilder;
41

42
    public function __construct()
43
    {
44
        $cfg = config('Jobs')->serviceBus ?? [
4✔
45
            'url'    => getenv('SERVICEBUS_URL') ?: '',
4✔
46
            'issuer' => getenv('SERVICEBUS_ISSUER') ?: '',
4✔
47
            'secret' => getenv('SERVICEBUS_SECRET') ?: '',
4✔
48
        ];
49

50
        if (empty($cfg['url']) || empty($cfg['issuer']) || empty($cfg['secret'])) {
4✔
51
            log_message('warning', 'ServiceBusQueue: incomplete configuration (url, issuer, or secret missing).');
3✔
52
        }
53

54
        $this->baseUrl        = rtrim($cfg['url'] ?? '', '/') . '/';
4✔
55
        $this->headersBuilder = (new ServiceBusHeaders())
4✔
56
            ->generateMessageId()
4✔
57
            ->generateSasToken($cfg['url'] ?? '', $cfg['issuer'] ?? '', $cfg['secret'] ?? '');
4✔
58
    }
59

60
    public function enqueue(object $data): string
61
    {
62
        $queue = $data->queue ?? 'default';
2✔
63
        $delay = $this->calculateDelay($data);
2✔
64
        // BrokerProperties via ServiceBusHeaders
65
        if (! $delay->isImmediate() && $delay->scheduledAt instanceof DateTimeInterface) {
2✔
66
            // schedule->getTimestamp ya es DateTime; usar schedule builder
67
            try {
68
                $this->headersBuilder->schedule($delay->scheduledAt);
×
69
            } catch (Throwable) { // ignore
×
70
            }
71
        }
72
        if (isset($data->label)) {
2✔
73
            try {
74
                $this->headersBuilder->setLabel((string) $data->label);
×
75
            } catch (Throwable) { // ignore
×
76
            }
77
        }
78
        $headers = array_merge(['Content-Type' => 'application/json'], $this->headersBuilder->getHeaders());
2✔
79
        $resp    = $this->client()->post($this->baseUrl . $queue . '/messages', [
2✔
80
            'headers' => $headers,
2✔
81
            'body'    => $this->getSerializer()->serialize($data),
2✔
82
        ]);
2✔
83

84
        return ($resp->getStatusCode() >= 200 && $resp->getStatusCode() < 300)
2✔
85
            ? $this->headersBuilder->getMessageId()
1✔
86
            : '';
2✔
87
    }
88

89
    public function watch(string $queue): mixed
90
    {
91
        $resp = $this->client()->delete($this->baseUrl . $queue . '/messages/head', [
1✔
92
            'headers' => array_merge(['Content-Type' => 'application/json'], $this->headersBuilder->getHeaders()),
1✔
93
        ]);
1✔
94
        if (method_exists($resp, 'getStatusCode') && $resp->getStatusCode() === 200) {
1✔
95
            $rawBody = (string) $resp->getBody();
1✔
96
            $body    = $this->getSerializer()->deserialize($rawBody);
1✔
97
            if (! $body) {
1✔
98
                // Destructive read succeeded but deserialization failed — log to prevent silent data loss
NEW
99
                log_message('error', 'ServiceBusQueue::watch deserialization failed after destructive read. Raw body: ' . mb_substr($rawBody, 0, 500));
×
100

UNCOV
101
                return null;
×
102
            }
103
            $this->job = ['body' => $body, 'status' => 200, 'headers' => []];
1✔
104

105
            return JobEnvelope::fromBackend(
1✔
106
                backend: 'servicebus',
1✔
107
                id: $this->headersBuilder->getMessageId(),
1✔
108
                queue: $queue,
1✔
109
                payload: $body,
1✔
110
                extraMeta: ['status' => 200],
1✔
111
                raw: $this->job,
1✔
112
            );
1✔
113
        }
114

115
        return null;
1✔
116
    }
117

118
    public function removeJob(QueuesJob $job, bool $recreate = false): bool
119
    {
120
        if ($recreate) {
1✔
121
            // Re-enqueue directly to this service bus backend instead of using enqueue()
122
            // which might cause issues with queue configuration
123
            $this->enqueue($job->toObject());
1✔
124
        }
125
        $this->job = null;
1✔
126

127
        return true;
1✔
128
    }
129

130
    /**
131
     * HTTP client accessor (overridable for tests).
132
     * Returns an object implementing post(string $url, array $options = []) and
133
     * delete(string $url, array $options = []) with ->getStatusCode() / ->getBody().
134
     */
135
    protected function client(): object
136
    {
137
        // Servicio por defecto; opciones globales deberían configurarse en Config\CURLRequest si se requiere.
138
        return service('curlrequest');
×
139
    }
140

141
    // buildHeaders eliminado; ahora se usa ServiceBusHeaders
142
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc