• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

azjezz / psl / 22519606807

28 Feb 2026 11:11AM UTC coverage: 97.532% (-1.2%) from 98.733%
22519606807

push

github

web-flow
feat(network): rewrite networking stack with TLS, UDP, SOCKS5, CIDR, and IO utilities (#585)

860 of 937 new or added lines in 31 files covered. (91.78%)

15 existing lines in 6 files now uncovered.

7470 of 7659 relevant lines covered (97.53%)

42.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.0
/src/Psl/UDP/Socket.php
1
<?php
2

3
declare(strict_types=1);
4

5
namespace Psl\UDP;
6

7
use Override;
8
use Psl\DateTime\Duration;
9
use Psl\IO;
10
use Psl\Network;
11
use Revolt\EventLoop;
12

13
use function fclose;
14
use function is_resource;
15
use function str_starts_with;
16
use function stream_context_create;
17
use function stream_set_blocking;
18
use function stream_socket_client;
19
use function stream_socket_get_name;
20
use function stream_socket_recvfrom;
21
use function stream_socket_sendto;
22
use function stream_socket_server;
23
use function strlen;
24
use function strpos;
25
use function strrpos;
26
use function substr;
27

28
use const STREAM_CLIENT_CONNECT;
29
use const STREAM_PEEK;
30
use const STREAM_SERVER_BIND;
31

32
/**
33
 * A UDP socket for sending and receiving datagrams.
34
 *
35
 * Supports both connected and unconnected modes and peek.
36
 */
37
final class Socket implements IO\CloseHandleInterface, IO\StreamHandleInterface
38
{
39
    /**
40
     * Maximum IPv4 UDP payload size (65535 - 20 IP header - 8 UDP header).
41
     */
42
    private const int MAX_DATAGRAM_SIZE = 65_507;
43

44
    /**
45
     * @var resource|closed-resource|null
46
     */
47
    private mixed $stream;
48
    private bool $connected = false;
49
    private null|Network\Address $peerAddress = null;
50

51
    /**
52
     * @param resource $stream
53
     */
54
    private function __construct(mixed $stream)
55
    {
56
        $this->stream = $stream;
14✔
57
        stream_set_blocking($stream, false);
14✔
58
    }
59

60
    /**
61
     * Create a UDP socket bound to the given address.
62
     *
63
     * @param non-empty-string $host
64
     * @param int<0, 65535> $port
65
     *
66
     * @throws Network\Exception\RuntimeException If failed to create or bind the socket.
67
     */
68
    public static function bind(
69
        string $host = '0.0.0.0',
70
        int $port = 0,
71
        bool $reuse_address = false,
72
        bool $reuse_port = false,
73
        bool $broadcast = false,
74
    ): self {
75
        $context = ['socket' => [
14✔
76
            'so_reuseaddr' => $reuse_address,
14✔
77
            'so_reuseport' => $reuse_port,
14✔
78
            'so_broadcast' => $broadcast,
14✔
79
        ]];
14✔
80

81
        $ctx = stream_context_create($context);
14✔
82
        $errno = 0;
14✔
83
        $errstr = '';
14✔
84
        $socket = @stream_socket_server("udp://{$host}:{$port}", $errno, $errstr, STREAM_SERVER_BIND, $ctx);
14✔
85

86
        if ($socket === false) {
14✔
NEW
87
            throw new Network\Exception\RuntimeException("Failed to bind UDP socket: {$errstr}", (int) $errno);
×
88
        }
89

90
        return new self($socket);
14✔
91
    }
92

93
    /**
94
     * Connect the socket to a remote address.
95
     *
96
     * After connecting, you must use send()/receive() instead of sendTo()/receiveFrom().
97
     *
98
     * @param non-empty-string $host
99
     * @param int<0, 65535> $port
100
     *
101
     * @throws Network\Exception\RuntimeException If the connect fails.
102
     */
103
    public function connect(string $host, int $port): void
104
    {
105
        $old_stream = $this->getResource();
4✔
106

107
        // Get the local address to rebind
108
        $local_name = @stream_socket_get_name($old_stream, false);
4✔
109
        $bindto = $local_name !== false ? $local_name : '0.0.0.0:0';
4✔
110

111
        // Close old stream
112
        fclose($old_stream);
4✔
113

114
        // Create new connected UDP stream
115
        $context = stream_context_create(['socket' => [
4✔
116
            'bindto' => $bindto,
4✔
117
        ]]);
4✔
118

119
        $errno = 0;
4✔
120
        $errstr = '';
4✔
121
        $new_stream = @stream_socket_client(
4✔
122
            "udp://{$host}:{$port}",
4✔
123
            $errno,
4✔
124
            $errstr,
4✔
125
            null,
4✔
126
            STREAM_CLIENT_CONNECT,
4✔
127
            $context,
4✔
128
        );
4✔
129

130
        if ($new_stream === false) {
4✔
NEW
131
            $this->stream = null;
×
NEW
132
            throw new Network\Exception\RuntimeException("Failed to connect UDP socket to {$host}:{$port}: {$errstr}");
×
133
        }
134

135
        stream_set_blocking($new_stream, false);
4✔
136
        $this->stream = $new_stream;
4✔
137
        $this->connected = true;
4✔
138
        $this->peerAddress = Network\Address::udp($host, $port);
4✔
139
    }
140

141
    /**
142
     * Send data to a specific address (unconnected mode).
143
     *
144
     * @return int<0, max> Number of bytes sent.
145
     *
146
     * @throws Network\Exception\RuntimeException If the send fails or the socket is connected.
147
     * @throws Network\Exception\InvalidArgumentException If the datagram exceeds the maximum size.
148
     * @throws IO\Exception\TimeoutException If the operation times out.
149
     */
150
    public function sendTo(string $data, Network\Address $address, null|Duration $timeout = null): int
151
    {
152
        if ($this->connected) {
6✔
153
            throw new Network\Exception\RuntimeException(
1✔
154
                'Cannot use sendTo() on a connected socket. Use send() instead.',
1✔
155
            );
1✔
156
        }
157

158
        $this->validatePayloadSize($data);
5✔
159
        $stream = $this->getResource();
4✔
160

161
        $target = "{$address->host}:{$address->port}";
4✔
162

163
        if ($timeout !== null) {
4✔
NEW
164
            $this->waitWritable($stream, $timeout);
×
165
        }
166

167
        $result = @stream_socket_sendto($stream, $data, 0, $target);
4✔
168
        if ($result === false || $result === -1) {
4✔
NEW
169
            throw new Network\Exception\RuntimeException('Failed to send UDP datagram.');
×
170
        }
171

172
        /** @var int<0, max> */
173
        return $result;
4✔
174
    }
175

176
    /**
177
     * Receive data and the sender's address (unconnected mode).
178
     *
179
     * @param positive-int $max_bytes
180
     *
181
     * @return array{string, Network\Address} [data, sender_address]
182
     *
183
     * @throws Network\Exception\RuntimeException If the receive fails or the socket is connected.
184
     * @throws IO\Exception\TimeoutException If the operation times out.
185
     */
186
    public function receiveFrom(int $max_bytes, null|Duration $timeout = null): array
187
    {
188
        if ($this->connected) {
6✔
189
            throw new Network\Exception\RuntimeException(
1✔
190
                'Cannot use receiveFrom() on a connected socket. Use receive() instead.',
1✔
191
            );
1✔
192
        }
193

194
        $stream = $this->getResource();
5✔
195

196
        $this->awaitReadable($stream, $timeout);
5✔
197

198
        $address = '';
4✔
199
        $data = @stream_socket_recvfrom($stream, $max_bytes, 0, $address);
4✔
200
        if ($data === false) {
4✔
NEW
201
            throw new Network\Exception\RuntimeException('Failed to receive UDP datagram.');
×
202
        }
203

204
        return [$data, $this->parseAddress($address)];
4✔
205
    }
206

207
    /**
208
     * Send data on a connected socket.
209
     *
210
     * @return int<0, max> Number of bytes sent.
211
     *
212
     * @throws Network\Exception\RuntimeException If the send fails or the socket is not connected.
213
     * @throws Network\Exception\InvalidArgumentException If the datagram exceeds the maximum size.
214
     * @throws IO\Exception\TimeoutException If the operation times out.
215
     */
216
    public function send(string $data, null|Duration $timeout = null): int
217
    {
218
        if (!$this->connected) {
2✔
219
            throw new Network\Exception\RuntimeException(
1✔
220
                'Cannot send on an unconnected socket. Use sendTo() or call connect() first.',
1✔
221
            );
1✔
222
        }
223

224
        $this->validatePayloadSize($data);
1✔
225
        $stream = $this->getResource();
1✔
226

227
        if ($timeout !== null) {
1✔
NEW
228
            $this->waitWritable($stream, $timeout);
×
229
        }
230

231
        $result = @stream_socket_sendto($stream, $data);
1✔
232
        if ($result === false || $result === -1) {
1✔
NEW
233
            throw new Network\Exception\RuntimeException('Failed to send UDP datagram.');
×
234
        }
235

236
        /** @var int<0, max> */
237
        return $result;
1✔
238
    }
239

240
    /**
241
     * Receive data on a connected socket.
242
     *
243
     * @param positive-int $max_bytes
244
     *
245
     * @throws Network\Exception\RuntimeException If the receive fails or the socket is not connected.
246
     * @throws IO\Exception\TimeoutException If the operation times out.
247
     */
248
    public function receive(int $max_bytes, null|Duration $timeout = null): string
249
    {
250
        if (!$this->connected) {
2✔
251
            throw new Network\Exception\RuntimeException(
1✔
252
                'Cannot receive on an unconnected socket. Use receiveFrom() or call connect() first.',
1✔
253
            );
1✔
254
        }
255

256
        $stream = $this->getResource();
1✔
257

258
        $this->awaitReadable($stream, $timeout);
1✔
259

260
        $data = @stream_socket_recvfrom($stream, $max_bytes, 0);
1✔
261
        if ($data === false) {
1✔
NEW
262
            throw new Network\Exception\RuntimeException('Failed to receive UDP datagram.');
×
263
        }
264

265
        return $data;
1✔
266
    }
267

268
    /**
269
     * Peek at incoming data without consuming it.
270
     *
271
     * @param positive-int $max_bytes
272
     *
273
     * @throws Network\Exception\RuntimeException If the peek fails.
274
     * @throws IO\Exception\TimeoutException If the operation times out.
275
     */
276
    public function peek(int $max_bytes, null|Duration $timeout = null): string
277
    {
278
        $stream = $this->getResource();
1✔
279

280
        $this->awaitReadable($stream, $timeout);
1✔
281

282
        $data = @stream_socket_recvfrom($stream, $max_bytes, STREAM_PEEK);
1✔
283
        if ($data === false) {
1✔
NEW
284
            throw new Network\Exception\RuntimeException('Failed to peek UDP datagram.');
×
285
        }
286

287
        return $data;
1✔
288
    }
289

290
    /**
291
     * Peek at incoming data and get the sender's address.
292
     *
293
     * @param positive-int $max_bytes
294
     *
295
     * @return array{string, Network\Address} [data, sender_address]
296
     *
297
     * @throws Network\Exception\RuntimeException If the peek fails.
298
     * @throws IO\Exception\TimeoutException If the operation times out.
299
     */
300
    public function peekFrom(int $max_bytes, null|Duration $timeout = null): array
301
    {
302
        $stream = $this->getResource();
1✔
303

304
        $this->awaitReadable($stream, $timeout);
1✔
305

306
        $address = '';
1✔
307
        $data = @stream_socket_recvfrom($stream, $max_bytes, STREAM_PEEK, $address);
1✔
308
        if ($data === false) {
1✔
NEW
309
            throw new Network\Exception\RuntimeException('Failed to peek UDP datagram.');
×
310
        }
311

312
        return [$data, $this->parseAddress($address)];
1✔
313
    }
314

315
    /**
316
     * Get the local address this socket is bound to.
317
     *
318
     * @throws Network\Exception\RuntimeException If unable to retrieve local address.
319
     */
320
    public function getLocalAddress(): Network\Address
321
    {
322
        $stream = $this->getResource();
9✔
323
        $name = @stream_socket_get_name($stream, false);
8✔
324
        if ($name === false) {
8✔
NEW
325
            throw new Network\Exception\RuntimeException('Failed to get local address.');
×
326
        }
327

328
        return $this->parseAddress($name);
8✔
329
    }
330

331
    /**
332
     * Get the peer address this socket is connected to, or null if unconnected.
333
     */
334
    public function getPeerAddress(): null|Network\Address
335
    {
336
        return $this->peerAddress;
2✔
337
    }
338

339
    /**
340
     * @return resource|object|null
341
     */
342
    #[Override]
343
    public function getStream(): mixed
344
    {
NEW
345
        if (!is_resource($this->stream)) {
×
NEW
346
            return null;
×
347
        }
348

NEW
349
        return $this->stream;
×
350
    }
351

352
    #[Override]
353
    public function close(): void
354
    {
355
        if (is_resource($this->stream)) {
14✔
356
            fclose($this->stream);
14✔
357
        }
358

359
        $this->stream = null;
14✔
360
    }
361

362
    public function __destruct()
363
    {
364
        $this->close();
14✔
365
    }
366

367
    /**
368
     * @return resource
369
     */
370
    private function getResource(): mixed
371
    {
372
        if (!is_resource($this->stream)) {
10✔
373
            throw new IO\Exception\AlreadyClosedException('UDP socket has already been closed.');
1✔
374
        }
375

376
        return $this->stream;
9✔
377
    }
378

379
    /**
380
     * Validate that the payload does not exceed the maximum UDP datagram size.
381
     *
382
     * @throws Network\Exception\InvalidArgumentException If the payload is too large.
383
     */
384
    private function validatePayloadSize(string $data): void
385
    {
386
        if (strlen($data) > self::MAX_DATAGRAM_SIZE) {
5✔
387
            throw new Network\Exception\InvalidArgumentException('UDP datagram payload exceeds maximum size of '
1✔
388
            . self::MAX_DATAGRAM_SIZE
1✔
389
            . ' bytes.');
1✔
390
        }
391
    }
392

393
    /**
394
     * Wait for the stream to become readable.
395
     *
396
     * @param resource $stream
397
     */
398
    private function awaitReadable(mixed $stream, null|Duration $timeout): void
399
    {
400
        $suspension = EventLoop::getSuspension();
5✔
401
        $timeout_watcher = null;
5✔
402

403
        if ($timeout !== null) {
5✔
404
            $timeout_watcher = EventLoop::delay($timeout->getTotalSeconds(), static function () use (
1✔
405
                $suspension,
1✔
406
            ): void {
1✔
407
                $suspension->resume(true);
1✔
408
            });
1✔
409
        }
410

411
        $read_watcher = EventLoop::onReadable($stream, static function (string $watcher) use ($suspension): void {
5✔
412
            EventLoop::cancel($watcher);
4✔
413
            $suspension->resume(false);
4✔
414
        });
5✔
415

416
        /** @var bool $timed_out */
417
        $timed_out = $suspension->suspend();
5✔
418
        if ($timeout_watcher !== null) {
5✔
419
            EventLoop::cancel($timeout_watcher);
1✔
420
        }
421

422
        EventLoop::cancel($read_watcher);
5✔
423

424
        if ($timed_out) {
5✔
425
            throw new IO\Exception\TimeoutException('UDP receive operation timed out.');
1✔
426
        }
427
    }
428

429
    /**
430
     * Wait for the stream to be writable with a timeout.
431
     *
432
     * @param resource $stream
433
     */
434
    private function waitWritable(mixed $stream, Duration $timeout): void
435
    {
NEW
436
        $suspension = EventLoop::getSuspension();
×
NEW
437
        $timeout_watcher = EventLoop::delay($timeout->getTotalSeconds(), static function () use ($suspension): void {
×
NEW
438
            $suspension->resume(true);
×
NEW
439
        });
×
440

NEW
441
        $write_watcher = EventLoop::onWritable($stream, static function (string $watcher) use ($suspension): void {
×
NEW
442
            EventLoop::cancel($watcher);
×
NEW
443
            $suspension->resume(false);
×
NEW
444
        });
×
445

446
        /** @var bool $timed_out */
NEW
447
        $timed_out = $suspension->suspend();
×
NEW
448
        EventLoop::cancel($timeout_watcher);
×
NEW
449
        EventLoop::cancel($write_watcher);
×
450

NEW
451
        if ($timed_out) {
×
NEW
452
            throw new IO\Exception\TimeoutException('UDP send operation timed out.');
×
453
        }
454
    }
455

456
    /**
457
     * Parse a "host:port" or "[host]:port" string into an Address.
458
     *
459
     * Handles both IPv4 ("127.0.0.1:8080") and IPv6 ("[::1]:8080") formats.
460
     *
461
     * @throws Network\Exception\RuntimeException If the address is invalid.
462
     */
463
    private function parseAddress(string $address): Network\Address
464
    {
465
        if ($address === '') {
8✔
NEW
466
            return Network\Address::udp('0.0.0.0', 0);
×
467
        }
468

469
        // IPv6 bracket notation: [host]:port
470
        if (str_starts_with($address, '[')) {
8✔
NEW
471
            $close_bracket = strpos($address, ']');
×
NEW
472
            if ($close_bracket === false) {
×
NEW
473
                return Network\Address::udp($address, 0);
×
474
            }
475

NEW
476
            $host = substr($address, 1, $close_bracket - 1);
×
NEW
477
            if ($host === '') {
×
NEW
478
                $host = '::';
×
479
            }
480

481
            // Check for :port after the closing bracket
NEW
482
            $port = 0;
×
NEW
483
            if (($close_bracket + 1) < strlen($address) && $address[$close_bracket + 1] === ':') {
×
NEW
484
                $port = (int) substr($address, $close_bracket + 2);
×
485
            }
486

NEW
487
            if ($port < 0 || $port > 65_535) {
×
NEW
488
                throw new Network\Exception\RuntimeException("Invalid port number in address: {$port}");
×
489
            }
490

NEW
491
            return Network\Address::udp($host, $port);
×
492
        }
493

494
        // IPv4: host:port
495
        $last_colon = strrpos($address, ':');
8✔
496
        if ($last_colon === false) {
8✔
NEW
497
            return Network\Address::udp($address, 0);
×
498
        }
499

500
        $host = substr($address, 0, $last_colon);
8✔
501
        $port = (int) substr($address, $last_colon + 1);
8✔
502

503
        $host = $host !== '' ? $host : '0.0.0.0';
8✔
504
        if ($port < 0 || $port > 65_535) {
8✔
NEW
505
            throw new Network\Exception\RuntimeException("Invalid port number in address: {$port}");
×
506
        }
507

508
        return Network\Address::udp($host, $port);
8✔
509
    }
510
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc