• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

azjezz / psl / 23094701668

14 Mar 2026 07:24PM UTC coverage: 98.527% (+0.07%) from 98.459%
23094701668

Pull #626

github

azjezz
feat(async): introduce `TaskGroup` and `WaitGroup`

Signed-off-by: azjezz <azjezz@protonmail.com>
Pull Request #626: feat(async): introduce `TaskGroup` and `WaitGroup`

53 of 53 new or added lines in 2 files covered. (100.0%)

11 existing lines in 2 files now uncovered.

9830 of 9977 relevant lines covered (98.53%)

34.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.78
/src/Psl/UDP/Socket.php
1
<?php
2

3
declare(strict_types=1);
4

5
namespace Psl\UDP;
6

7
use Override;
8
use Psl\Async\CancellationTokenInterface;
9
use Psl\Async\Exception\CancelledException;
10
use Psl\Async\NullCancellationToken;
11
use Psl\IO;
12
use Psl\Network;
13

14
use function fclose;
15
use function is_resource;
16
use function stream_context_create;
17
use function stream_set_blocking;
18
use function stream_socket_client;
19
use function stream_socket_get_name;
20
use function stream_socket_recvfrom;
21
use function stream_socket_sendto;
22
use function stream_socket_server;
23

24
use const STREAM_CLIENT_CONNECT;
25
use const STREAM_PEEK;
26
use const STREAM_SERVER_BIND;
27

28
/**
29
 * An unconnected UDP socket for sending and receiving datagrams.
30
 *
31
 * Use {@see sendTo()} and {@see receiveFrom()} to communicate with arbitrary addresses.
32
 *
33
 * To switch to connected mode, call {@see connect()} which returns a {@see ConnectedSocket}.
34
 */
35
final class Socket implements Network\SocketInterface, IO\StreamHandleInterface
36
{
37
    /**
38
     * @var resource|closed-resource|null
39
     */
40
    private mixed $stream;
41

42
    /**
43
     * @param resource $stream
44
     */
45
    private function __construct(mixed $stream)
46
    {
47
        $this->stream = $stream;
39✔
48
        stream_set_blocking($stream, false);
39✔
49
    }
50

51
    /**
52
     * Create a UDP socket bound to the given address.
53
     *
54
     * @param non-empty-string $host
55
     * @param int<0, 65535> $port
56
     *
57
     * @throws Network\Exception\RuntimeException If failed to create or bind the socket.
58
     */
59
    public static function bind(
60
        string $host = '0.0.0.0',
61
        int $port = 0,
62
        bool $reuse_address = false,
63
        bool $reuse_port = false,
64
        bool $broadcast = false,
65
    ): self {
66
        $context = ['socket' => [
40✔
67
            'so_reuseaddr' => $reuse_address,
40✔
68
            'so_reuseport' => $reuse_port,
40✔
69
            'so_broadcast' => $broadcast,
40✔
70
        ]];
40✔
71

72
        $ctx = stream_context_create($context);
40✔
73
        $errno = 0;
40✔
74
        $errstr = '';
40✔
75
        $socket = @stream_socket_server("udp://{$host}:{$port}", $errno, $errstr, STREAM_SERVER_BIND, $ctx);
40✔
76

77
        if ($socket === false) {
40✔
78
            throw new Network\Exception\RuntimeException("Failed to bind UDP socket: {$errstr}", (int) $errno);
1✔
79
        }
80

81
        return new self($socket);
39✔
82
    }
83

84
    /**
85
     * Connect to a remote address, returning a {@see ConnectedSocket}.
86
     *
87
     * This socket is closed after connecting. Use the returned {@see ConnectedSocket} for further communication.
88
     *
89
     * @param non-empty-string $host
90
     * @param int<0, 65535> $port
91
     *
92
     * @throws Network\Exception\RuntimeException If the connect fails.
93
     * @throws IO\Exception\AlreadyClosedException If the socket has already been closed.
94
     */
95
    public function connect(string $host, int $port): ConnectedSocket
96
    {
97
        $old_stream = $this->getResource();
21✔
98

99
        $local_name = @stream_socket_get_name($old_stream, false);
20✔
100
        $bindto = $local_name !== false ? $local_name : '0.0.0.0:0';
20✔
101

102
        fclose($old_stream);
20✔
103
        $this->stream = null;
20✔
104

105
        $context = stream_context_create(['socket' => [
20✔
106
            'bindto' => $bindto,
20✔
107
        ]]);
20✔
108

109
        $errno = 0;
20✔
110
        $errstr = '';
20✔
111
        $new_stream = @stream_socket_client(
20✔
112
            "udp://{$host}:{$port}",
20✔
113
            $errno,
20✔
114
            $errstr,
20✔
115
            null,
20✔
116
            STREAM_CLIENT_CONNECT,
20✔
117
            $context,
20✔
118
        );
20✔
119

120
        if ($new_stream === false) {
20✔
UNCOV
121
            throw new Network\Exception\RuntimeException("Failed to connect UDP socket to {$host}:{$port}: {$errstr}");
×
122
        }
123

124
        return new ConnectedSocket($new_stream, Network\Address::udp($host, $port));
20✔
125
    }
126

127
    /**
128
     * Send a datagram to a specific address.
129
     *
130
     * @return int<0, max> Number of bytes sent.
131
     *
132
     * @throws Network\Exception\RuntimeException If the send fails.
133
     * @throws Network\Exception\InvalidArgumentException If the datagram exceeds the maximum size.
134
     * @throws CancelledException If the operation is cancelled.
135
     * @throws IO\Exception\AlreadyClosedException If the socket has already been closed.
136
     */
137
    public function sendTo(
138
        string $data,
139
        Network\Address $address,
140
        CancellationTokenInterface $cancellation = new NullCancellationToken(),
141
    ): int {
142
        Internal\validate_payload_size($data);
10✔
143
        $stream = $this->getResource();
9✔
144

145
        $target = "{$address->host}:{$address->port}";
8✔
146

147
        Internal\wait_writable($stream, $cancellation);
8✔
148

149
        $result = @stream_socket_sendto($stream, $data, 0, $target);
8✔
150
        if ($result === false || $result === -1) {
8✔
UNCOV
151
            throw new Network\Exception\RuntimeException('Failed to send UDP datagram.');
×
152
        }
153

154
        /** @var int<0, max> */
155
        return $result;
8✔
156
    }
157

158
    /**
159
     * Receive a datagram and the sender's address.
160
     *
161
     * @param positive-int $max_bytes
162
     *
163
     * @return array{string, Network\Address} [data, sender_address]
164
     *
165
     * @throws Network\Exception\RuntimeException If the receive fails.
166
     * @throws CancelledException If the operation is cancelled.
167
     * @throws IO\Exception\AlreadyClosedException If the socket has already been closed.
168
     */
169
    public function receiveFrom(
170
        int $max_bytes,
171
        CancellationTokenInterface $cancellation = new NullCancellationToken(),
172
    ): array {
173
        $stream = $this->getResource();
11✔
174

175
        Internal\await_readable($stream, $cancellation);
10✔
176

177
        $address = '';
9✔
178
        $data = @stream_socket_recvfrom($stream, $max_bytes, 0, $address);
9✔
179
        if ($data === false) {
9✔
UNCOV
180
            throw new Network\Exception\RuntimeException('Failed to receive UDP datagram.');
×
181
        }
182

183
        return [$data, Internal\parse_address($address)];
9✔
184
    }
185

186
    /**
187
     * Peek at an incoming datagram and get the sender's address, without consuming it.
188
     *
189
     * @param positive-int $max_bytes
190
     *
191
     * @return array{string, Network\Address} [data, sender_address]
192
     *
193
     * @throws Network\Exception\RuntimeException If the peek fails.
194
     * @throws CancelledException If the operation is cancelled.
195
     * @throws IO\Exception\AlreadyClosedException If the socket has already been closed.
196
     */
197
    public function peekFrom(
198
        int $max_bytes,
199
        CancellationTokenInterface $cancellation = new NullCancellationToken(),
200
    ): array {
201
        $stream = $this->getResource();
3✔
202

203
        Internal\await_readable($stream, $cancellation);
2✔
204

205
        $address = '';
1✔
206
        $data = @stream_socket_recvfrom($stream, $max_bytes, STREAM_PEEK, $address);
1✔
207
        if ($data === false) {
1✔
UNCOV
208
            throw new Network\Exception\RuntimeException('Failed to peek UDP datagram.');
×
209
        }
210

211
        return [$data, Internal\parse_address($address)];
1✔
212
    }
213

214
    /**
215
     * Get the local address this socket is bound to.
216
     *
217
     * @throws Network\Exception\RuntimeException If unable to retrieve local address.
218
     * @throws IO\Exception\AlreadyClosedException If the socket has already been closed.
219
     */
220
    #[Override]
221
    public function getLocalAddress(): Network\Address
222
    {
223
        $stream = $this->getResource();
28✔
224
        $name = @stream_socket_get_name($stream, false);
27✔
225
        if ($name === false) {
27✔
UNCOV
226
            throw new Network\Exception\RuntimeException('Failed to get local address.');
×
227
        }
228

229
        return Internal\parse_address($name);
27✔
230
    }
231

232
    /**
233
     * @return resource|object|null
234
     */
235
    #[Override]
236
    public function getStream(): mixed
237
    {
238
        if (!is_resource($this->stream)) {
3✔
239
            return null;
2✔
240
        }
241

242
        return $this->stream;
1✔
243
    }
244

245
    #[Override]
246
    public function isClosed(): bool
247
    {
UNCOV
248
        return !is_resource($this->stream);
×
249
    }
250

251
    #[Override]
252
    public function close(): void
253
    {
254
        if (is_resource($this->stream)) {
39✔
255
            fclose($this->stream);
39✔
256
        }
257

258
        $this->stream = null;
39✔
259
    }
260

261
    public function __destruct()
262
    {
263
        $this->close();
39✔
264
    }
265

266
    /**
267
     * @return resource
268
     *
269
     * @throws IO\Exception\AlreadyClosedException If the socket has already been closed.
270
     */
271
    private function getResource(): mixed
272
    {
273
        if (!is_resource($this->stream)) {
35✔
274
            throw new IO\Exception\AlreadyClosedException('UDP socket has already been closed.');
6✔
275
        }
276

277
        return $this->stream;
30✔
278
    }
279
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc