• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

daycry / jobs / 19498141349

19 Nov 2025 10:30AM UTC coverage: 62.5% (+0.7%) from 61.815%
19498141349

push

github

daycry
- Fixes

4 of 4 new or added lines in 4 files covered. (100.0%)

41 existing lines in 9 files now uncovered.

1145 of 1832 relevant lines covered (62.5%)

4.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.23
/src/Queues/ServiceBusQueue.php
1
<?php
2

3
declare(strict_types=1);
4

5
/**
6
 * This file is part of Daycry Queues.
7
 *
8
 * (c) Daycry <daycry9@proton.me>
9
 *
10
 * For the full copyright and license information, please view
11
 * the LICENSE file that was distributed with this source code.
12
 */
13

14
namespace Daycry\Jobs\Queues;
15

16
use Daycry\Jobs\Interfaces\QueueInterface;
17
use Daycry\Jobs\Interfaces\WorkerInterface;
18
use Daycry\Jobs\Job as QueuesJob;
19
use Daycry\Jobs\Libraries\DateTimeHelper;
20
use Daycry\Jobs\Libraries\ServiceBusHeaders;
21
use Throwable;
22

23
/**
24
 * Azure Service Bus queue (simplified HTTP interface using CodeIgniter curlrequest).
25
 *
26
 * Contract notes:
27
 *  - enqueue(): sends a message, returns generated MessageId string or empty string on failure.
28
 *  - watch(): retrieves (and deletes) the head message (destructive read) and returns JobEnvelope or null.
29
 *  - removeJob(): optionally re-enqueues the original Job (no lock renewal / settlement implemented).
30
 *
31
 * Limitations / TODO:
32
 *  - Real SAS token lifetime / caching not implemented (token built per instance).
33
 *  - Proper peek-lock flow (receive + settle) not implemented; current watch() is destructive (DELETE head).
34
 *  - Scheduling uses x-ms-scheduled-enqueue-time only if provided.
35
 */
36
class ServiceBusQueue extends BaseQueue implements QueueInterface, WorkerInterface
37
{
38
    private string $baseUrl;
39
    private array $auth; // ['issuer' => '', 'secret' => '']
40
    private ?array $job = null; // ['body' => object, 'headers' => [], 'status' => int]
41
    private ServiceBusHeaders $headersBuilder;
42

43
    public function __construct()
44
    {
45
        $cfg = config('Jobs')->serviceBus ?? [
4✔
46
            'url'    => getenv('SERVICEBUS_URL') ?: '',
4✔
47
            'issuer' => getenv('SERVICEBUS_ISSUER') ?: '',
4✔
48
            'secret' => getenv('SERVICEBUS_SECRET') ?: '',
4✔
49
        ];
50
        $this->baseUrl        = rtrim($cfg['url'] ?? '', '/') . '/';
4✔
51
        $this->auth           = $cfg;
4✔
52
        $this->headersBuilder = (new ServiceBusHeaders())
4✔
53
            ->generateMessageId()
4✔
54
            ->generateSasToken($cfg['url'] ?? '', $cfg['issuer'] ?? '', $cfg['secret'] ?? '');
4✔
55
    }
56

57
    public function enqueue(object $data): string
58
    {
59
        $queue = $data->queue ?? 'default';
2✔
60
        $delay = $this->calculateDelay($data);
2✔
61
        // BrokerProperties via ServiceBusHeaders
62
        if (! $delay->isImmediate() && $delay->scheduledAt !== null) {
2✔
63
            // schedule->getTimestamp ya es DateTime; usar schedule builder
64
            try {
65
                $this->headersBuilder->schedule($delay->scheduledAt);
×
66
            } catch (Throwable) { // ignore
×
67
            }
68
        }
69
        if (isset($data->label)) {
2✔
70
            try {
71
                $this->headersBuilder->setLabel((string) $data->label);
×
72
            } catch (Throwable) { // ignore
×
73
            }
74
        }
75
        $headers = array_merge(['Content-Type' => 'application/json'], $this->headersBuilder->getHeaders());
2✔
76
        $resp    = $this->client()->post($this->baseUrl . $queue . '/messages', [
2✔
77
            'headers' => $headers,
2✔
78
            'body'    => $this->getSerializer()->serialize($data),
2✔
79
        ]);
2✔
80

81
        return ($resp->getStatusCode() >= 200 && $resp->getStatusCode() < 300)
2✔
82
            ? $this->headersBuilder->getMessageId()
1✔
83
            : '';
2✔
84
    }
85

86
    public function watch(string $queue)
87
    {
88
        $resp = $this->client()->delete($this->baseUrl . $queue . '/messages/head', [
1✔
89
            'headers' => array_merge(['Content-Type' => 'application/json'], $this->headersBuilder->getHeaders()),
1✔
90
        ]);
1✔
91
        if (method_exists($resp, 'getStatusCode') && $resp->getStatusCode() === 200) {
1✔
92
            $body      = $this->getSerializer()->deserialize((string) $resp->getBody());
1✔
93
            if (! $body) {
1✔
UNCOV
94
                return null;
×
95
            }
96
            $this->job = ['body' => $body, 'status' => 200, 'headers' => []];
1✔
97

98
            return JobEnvelope::fromBackend(
1✔
99
                backend: 'servicebus',
1✔
100
                id: $this->headersBuilder->getMessageId(),
1✔
101
                queue: $queue,
1✔
102
                payload: $body,
1✔
103
                extraMeta: ['status' => 200],
1✔
104
                raw: $this->job,
1✔
105
            );
1✔
106
        }
107

108
        return null;
1✔
109
    }
110

111
    public function removeJob(QueuesJob $job, bool $recreate = false): bool
112
    {
113
        if ($recreate) {
1✔
114
            // Re-enqueue directly to this service bus backend instead of using enqueue()
115
            // which might cause issues with queue configuration
116
            $this->enqueue($job->toObject());
1✔
117
        }
118
        $this->job = null;
1✔
119

120
        return true;
1✔
121
    }
122

123
    /**
124
     * HTTP client accessor (overridable for tests).
125
     * Returns an object implementing post(string $url, array $options = []) and
126
     * delete(string $url, array $options = []) with ->getStatusCode() / ->getBody().
127
     *
128
     * @return object
129
     */
130
    protected function client()
131
    {
132
        // Servicio por defecto; opciones globales deberían configurarse en Config\CURLRequest si se requiere.
UNCOV
133
        return service('curlrequest');
×
134
    }
135

136
    // buildHeaders eliminado; ahora se usa ServiceBusHeaders
137
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc