• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

azjezz / psl / 23094701668

14 Mar 2026 07:24PM UTC coverage: 98.527% (+0.07%) from 98.459%
23094701668

Pull #626

github

azjezz
feat(async): introduce `TaskGroup` and `WaitGroup`

Signed-off-by: azjezz <azjezz@protonmail.com>
Pull Request #626: feat(async): introduce `TaskGroup` and `WaitGroup`

53 of 53 new or added lines in 2 files covered. (100.0%)

11 existing lines in 2 files now uncovered.

9830 of 9977 relevant lines covered (98.53%)

34.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.84
/src/Psl/UDP/ConnectedSocket.php
1
<?php
2

3
declare(strict_types=1);
4

5
namespace Psl\UDP;
6

7
use Override;
8
use Psl\Async\CancellationTokenInterface;
9
use Psl\Async\Exception\CancelledException;
10
use Psl\Async\NullCancellationToken;
11
use Psl\IO;
12
use Psl\Network;
13

14
use function fclose;
15
use function is_resource;
16
use function stream_set_blocking;
17
use function stream_socket_get_name;
18
use function stream_socket_recvfrom;
19
use function stream_socket_sendto;
20

21
use const STREAM_PEEK;
22

23
/**
24
 * A connected UDP socket for communicating with a single peer.
25
 *
26
 * Obtained via {@see Socket::connect()} or {@see connect()}.
27
 */
28
final class ConnectedSocket implements Network\SocketInterface, IO\StreamHandleInterface
29
{
30
    /**
31
     * @var resource|closed-resource|null
32
     */
33
    private mixed $stream;
34

35
    /**
36
     * @internal Use {@see Socket::connect()} or {@see connect()} to obtain a ConnectedSocket.
37
     *
38
     * @param resource $stream
39
     */
40
    public function __construct(
41
        mixed $stream,
42
        private readonly Network\Address $peerAddress,
43
    ) {
44
        $this->stream = $stream;
20✔
45
        stream_set_blocking($stream, false);
20✔
46
    }
47

48
    /**
49
     * Send a datagram to the connected peer.
50
     *
51
     * @return int<0, max> Number of bytes sent.
52
     *
53
     * @throws Network\Exception\RuntimeException If the send fails.
54
     * @throws Network\Exception\InvalidArgumentException If the datagram exceeds the maximum size.
55
     * @throws CancelledException If the operation is cancelled.
56
     * @throws IO\Exception\AlreadyClosedException If the socket has already been closed.
57
     */
58
    public function send(string $data, CancellationTokenInterface $cancellation = new NullCancellationToken()): int
59
    {
60
        Internal\validate_payload_size($data);
7✔
61
        $stream = $this->getResource();
6✔
62

63
        Internal\wait_writable($stream, $cancellation);
5✔
64

65
        $result = @stream_socket_sendto($stream, $data);
5✔
66
        if ($result === false || $result === -1) {
5✔
UNCOV
67
            throw new Network\Exception\RuntimeException('Failed to send UDP datagram.');
×
68
        }
69

70
        /** @var int<0, max> */
71
        return $result;
5✔
72
    }
73

74
    /**
75
     * Receive a datagram from the connected peer.
76
     *
77
     * @param positive-int $max_bytes
78
     *
79
     * @throws Network\Exception\RuntimeException If the receive fails.
80
     * @throws CancelledException If the operation is cancelled.
81
     * @throws IO\Exception\AlreadyClosedException If the socket has already been closed.
82
     */
83
    public function receive(
84
        int $max_bytes,
85
        CancellationTokenInterface $cancellation = new NullCancellationToken(),
86
    ): string {
87
        $stream = $this->getResource();
5✔
88

89
        Internal\await_readable($stream, $cancellation);
4✔
90

91
        $data = @stream_socket_recvfrom($stream, $max_bytes, 0);
3✔
92
        if ($data === false) {
3✔
UNCOV
93
            throw new Network\Exception\RuntimeException('Failed to receive UDP datagram.');
×
94
        }
95

96
        return $data;
3✔
97
    }
98

99
    /**
100
     * Peek at an incoming datagram without consuming it.
101
     *
102
     * @param positive-int $max_bytes
103
     *
104
     * @throws Network\Exception\RuntimeException If the peek fails.
105
     * @throws CancelledException If the operation is cancelled.
106
     * @throws IO\Exception\AlreadyClosedException If the socket has already been closed.
107
     */
108
    public function peek(int $max_bytes, CancellationTokenInterface $cancellation = new NullCancellationToken()): string
109
    {
110
        $stream = $this->getResource();
3✔
111

112
        Internal\await_readable($stream, $cancellation);
2✔
113

114
        $data = @stream_socket_recvfrom($stream, $max_bytes, STREAM_PEEK);
1✔
115
        if ($data === false) {
1✔
UNCOV
116
            throw new Network\Exception\RuntimeException('Failed to peek UDP datagram.');
×
117
        }
118

119
        return $data;
1✔
120
    }
121

122
    /**
123
     * Get the local address this socket is bound to.
124
     *
125
     * @throws Network\Exception\RuntimeException If unable to retrieve local address.
126
     * @throws IO\Exception\AlreadyClosedException If the socket has already been closed.
127
     */
128
    #[Override]
129
    public function getLocalAddress(): Network\Address
130
    {
131
        $stream = $this->getResource();
2✔
132
        $name = @stream_socket_get_name($stream, false);
1✔
133
        if ($name === false) {
1✔
UNCOV
134
            throw new Network\Exception\RuntimeException('Failed to get local address.');
×
135
        }
136

137
        return Internal\parse_address($name);
1✔
138
    }
139

140
    /**
141
     * Get the peer address this socket is connected to.
142
     */
143
    public function getPeerAddress(): Network\Address
144
    {
145
        return $this->peerAddress;
3✔
146
    }
147

148
    /**
149
     * @return resource|object|null
150
     */
151
    #[Override]
152
    public function getStream(): mixed
153
    {
154
        if (!is_resource($this->stream)) {
3✔
155
            return null;
2✔
156
        }
157

158
        return $this->stream;
1✔
159
    }
160

161
    #[Override]
162
    public function isClosed(): bool
163
    {
UNCOV
164
        return !is_resource($this->stream);
×
165
    }
166

167
    #[Override]
168
    public function close(): void
169
    {
170
        if (is_resource($this->stream)) {
20✔
171
            fclose($this->stream);
20✔
172
        }
173

174
        $this->stream = null;
20✔
175
    }
176

177
    public function __destruct()
178
    {
179
        $this->close();
20✔
180
    }
181

182
    /**
183
     * @return resource
184
     *
185
     * @throws IO\Exception\AlreadyClosedException If the socket has already been closed.
186
     */
187
    private function getResource(): mixed
188
    {
189
        if (!is_resource($this->stream)) {
12✔
190
            throw new IO\Exception\AlreadyClosedException('UDP socket has already been closed.');
4✔
191
        }
192

193
        return $this->stream;
8✔
194
    }
195
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc