• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sirn-se / websocket-php / 7557012541

17 Jan 2024 02:15PM UTC coverage: 99.466% (-0.5%) from 100.0%
7557012541

Pull #34

github

web-flow
Merge 75fa38531 into b59cf5de3
Pull Request #34: Add supported protocols to server and client

11 of 16 new or added lines in 2 files covered. (68.75%)

4 existing lines in 1 file now uncovered.

931 of 936 relevant lines covered (99.47%)

42.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.92
/src/Server.php
1
<?php
2

3
/**
4
 * Copyright (C) 2014-2023 Textalk and contributors.
5
 *
6
 * This file is part of Websocket PHP and is free software under the ISC License.
7
 * License text: https://raw.githubusercontent.com/sirn-se/websocket-php/master/COPYING.md
8
 */
9

10
namespace WebSocket;
11

12
use InvalidArgumentException;
13
use Phrity\Net\{
14
    SocketServer,
15
    StreamFactory,
16
    Uri
17
};
18
use Psr\Log\{
19
    LoggerAwareInterface,
20
    LoggerInterface,
21
    NullLogger
22
};
23
use Stringable;
24
use Throwable;
25
use WebSocket\Exception\{
26
    CloseException,
27
    ConnectionLevelInterface,
28
    Exception,
29
    HandshakeException,
30
    MessageLevelInterface,
31
    ServerException
32
};
33
use WebSocket\Http\{
34
    Response,
35
    ServerRequest
36
};
37
use WebSocket\Message\Message;
38
use WebSocket\Middleware\MiddlewareInterface;
39
use WebSocket\Trait\{
40
    ListenerTrait,
41
    SendMethodsTrait,
42
    StringableTrait
43
};
44

45
/**
46
 * WebSocket\Server class.
47
 * Entry class for WebSocket server.
48
 */
49
class Server implements LoggerAwareInterface, Stringable
50
{
51
    use ListenerTrait;
52
    use SendMethodsTrait;
53
    use StringableTrait;
54

55
    private const GUID = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11';
56

57
    // Settings
58
    private $port;
59
    private $scheme;
60
    private $logger;
61
    private $timeout = 60;
62
    private $frameSize = 4096;
63

64
    // Internal resources
65
    private $streamFactory;
66
    private $server;
67
    private $streams;
68
    private $running = false;
69
    private $connections = [];
70
    private $middlewares = [];
71
    private $supportedSubProtocols = [];
72

73

74
    /* ---------- Magic methods ------------------------------------------------------------------------------------ */
75

76
    /**
77
     * @param int $port Socket port to listen to
78
     * @param string $scheme Scheme (tcp or ssl)
79
     * @throws InvalidArgumentException If invalid port provided
80
     */
81
    public function __construct(int $port = 80, bool $ssl = false)
82
    {
83
        if ($port < 0 || $port > 65535) {
52✔
84
            throw new InvalidArgumentException("Invalid port '{$port}' provided");
4✔
85
        }
86
        $this->port = $port;
48✔
87
        $this->scheme = $ssl ? 'ssl' : 'tcp';
48✔
88
        $this->logger = new NullLogger();
48✔
89
        $this->setStreamFactory(new StreamFactory());
48✔
90
    }
91

92
    /**
93
     * Get string representation of instance.
94
     * @return string String representation
95
     */
96
    public function __toString(): string
97
    {
98
        return $this->stringable('%s', $this->server ? "{$this->scheme}://0.0.0.0:{$this->port}" : 'closed');
6✔
99
    }
100

101

102
    /* ---------- Configuration ------------------------------------------------------------------------------------ */
103

104
    /**
105
     * Set stream factory to use.
106
     * @param Phrity\Net\StreamFactory $streamFactory
107
     * @return self
108
     */
109
    public function setStreamFactory(StreamFactory $streamFactory): self
110
    {
111
        $this->streamFactory = $streamFactory;
48✔
112
        return $this;
48✔
113
    }
114

115
    /**
116
     * Set logger.
117
     * @param Psr\Log\LoggerInterface $logger Logger implementation
118
     * @return self
119
     */
120
    public function setLogger(LoggerInterface $logger): self
121
    {
122
        $this->logger = $logger;
2✔
123
        foreach ($this->connections as $connection) {
2✔
124
            $connection->setLogger($this->logger);
2✔
125
        }
126
        return $this;
2✔
127
    }
128

129
    /**
130
     * Set timeout.
131
     * @param int $timeout Timeout in seconds
132
     * @return self
133
     * @throws InvalidArgumentException If invalid timeout provided
134
     */
135
    public function setTimeout(int $timeout): self
136
    {
137
        if ($timeout < 0) {
4✔
138
            throw new InvalidArgumentException("Invalid timeout '{$timeout}' provided");
2✔
139
        }
140
        $this->timeout = $timeout;
2✔
141
        foreach ($this->connections as $connection) {
2✔
142
            $connection->setTimeout($timeout);
2✔
143
        }
144
        return $this;
2✔
145
    }
146

147
    /**
148
     * Get timeout.
149
     * @return int Timeout in seconds
150
     */
151
    public function getTimeout(): int
152
    {
153
        return $this->timeout;
4✔
154
    }
155

156
    /**
157
     * Set frame size.
158
     * @param int $frameSize Frame size in bytes
159
     * @return self
160
     * @throws InvalidArgumentException If invalid frameSize provided
161
     */
162
    public function setFrameSize(int $frameSize): self
163
    {
164
        if ($frameSize < 1) {
4✔
165
            throw new InvalidArgumentException("Invalid frameSize '{$frameSize}' provided");
2✔
166
        }
167
        $this->frameSize = $frameSize;
2✔
168
        foreach ($this->connections as $connection) {
2✔
169
            $connection->setFrameSize($frameSize);
2✔
170
        }
171
        return $this;
2✔
172
    }
173

174
    /**
175
     * Get frame size.
176
     * @return int Frame size in bytes
177
     */
178
    public function getFrameSize(): int
179
    {
180
        return $this->frameSize;
4✔
181
    }
182

183
    /**
184
     * Get socket port number.
185
     * @return int port
186
     */
187
    public function getPort(): int
188
    {
189
        return $this->port;
4✔
190
    }
191

192
    /**
193
     * Get connection scheme.
194
     * @return string scheme
195
     */
196
    public function getScheme(): string
197
    {
198
        return $this->scheme;
4✔
199
    }
200

201
    /**
202
     * Number of currently connected clients.
203
     * @return int Connection count
204
     */
205
    public function getConnectionCount(): int
206
    {
207
        return count($this->connections);
10✔
208
    }
209

210
    /**
211
     * Add a middleware.
212
     * @param WebSocket\Middleware\MiddlewareInterface $middleware
213
     * @return self
214
     */
215
    public function addMiddleware(MiddlewareInterface $middleware): self
216
    {
217
        $this->middlewares[] = $middleware;
6✔
218
        foreach ($this->connections as $connection) {
6✔
219
            $connection->addMiddleware($middleware);
4✔
220
        }
221
        return $this;
6✔
222
    }
223

224
    public function getSupportedSubProtocols(): array
225
    {
226
        return $this->supportedSubProtocols;
4✔
227
    }
228

229
    public function setSupportedSubProtocols(array $supportedSubProtocols): self
230
    {
231
        $this->supportedSubProtocols = $supportedSubProtocols;
2✔
232

233
        return $this;
2✔
234
    }
235

236
    /* ---------- Messaging operations ----------------------------------------------------------------------------- */
237

238
    /**
239
     * Send message (broadcast to all connected clients).
240
     * @param WebSocket\Message\Message $message Message to send
241
     */
242
    public function send(Message $message): Message
243
    {
244
        foreach ($this->connections as $connection) {
2✔
245
            if ($connection->isWritable()) {
2✔
246
                $connection->send($message);
2✔
247
            }
248
        }
249
        return $message;
2✔
250
    }
251

252

253
    /* ---------- Listener operations ------------------------------------------------------------------------------ */
254

255
    /**
256
     * Start server listener.
257
     * @throws Throwable On low level error
258
     */
259
    public function start(): void
260
    {
261
        // Create socket server
262
        if (empty($this->server)) {
44✔
263
            $this->createSocketServer();
44✔
264
        }
265

266
        // Check if running
267
        if ($this->running) {
42✔
268
            $this->logger->warning("[server] Server is already running");
2✔
269
            return;
2✔
270
        }
271
        $this->running = true;
42✔
272
        $this->logger->info("[server] Server is running");
42✔
273

274
        // Run handler
275
        while ($this->running) {
42✔
276
            try {
277
                // Clear closed connections
278
                $this->detachUnconnected();
42✔
279

280
                // Get streams with readable content
281
                $readables = $this->streams->waitRead($this->timeout);
42✔
282
                foreach ($readables as $key => $readable) {
42✔
283
                    try {
284
                        $connection = null;
40✔
285
                        // Accept new client connection
286
                        if ($key == '@server') {
40✔
287
                            $this->acceptSocket($readable);
40✔
288
                            continue;
24✔
289
                        }
290
                        // Read from connection
291
                        $connection = $this->connections[$key];
10✔
292
                        if ($message = $connection->pullMessage()) {
10✔
293
                            $this->dispatch($message->getOpcode(), [$this, $connection, $message]);
2✔
294
                        }
295
                    } catch (MessageLevelInterface $e) {
26✔
296
                        // Error, but keep connection open
297
                        $this->logger->error("[server] {$e->getMessage()}");
4✔
298
                        $this->dispatch('error', [$this, $connection, $e]);
4✔
299
                    } catch (ConnectionLevelInterface $e) {
22✔
300
                        // Error, disconnect connection
301
                        if ($connection) {
18✔
302
                            $this->streams->detach($key);
2✔
303
                            unset($this->connections[$key]);
2✔
304
                            $connection->disconnect();
2✔
305
                        }
306
                        $this->logger->error("[server] {$e->getMessage()}");
18✔
307
                        $this->dispatch('error', [$this, $connection, $e]);
18✔
308
                    } catch (CloseException $e) {
4✔
309
                        // Should close
310
                        if ($connection) {
2✔
311
                            $connection->close($e->getCloseStatus(), $e->getMessage());
2✔
312
                        }
313
                        $this->logger->error("[server] sss {$e->getMessage()}");
2✔
314
                        $this->dispatch('error', [$this, $connection, $e]);
2✔
315
                    }
316
                }
317
                foreach ($this->connections as $connection) {
42✔
318
                    $connection->tick();
24✔
319
                }
320
                $this->dispatch('tick', [$this]);
42✔
321
            } catch (Exception $e) {
4✔
322
                // Low-level error
323
                $this->logger->error("[server] {$e->getMessage()}");
2✔
324
                $this->dispatch('error', [$this, null, $e]);
2✔
325
            } catch (Throwable $e) {
2✔
326
                // Crash it
327
                $this->logger->error("[server] {$e->getMessage()}");
2✔
328
                $this->dispatch('error', [$this, null, $e]);
2✔
329
                $this->disconnect();
2✔
330
                throw $e;
2✔
331
            }
332
            gc_collect_cycles(); // Collect garbage
42✔
333
        }
334
    }
335

336
    /**
337
     * Stop server listener (resumable).
338
     */
339
    public function stop(): void
340
    {
341
        $this->running = false;
42✔
342
        $this->logger->info("[server] Server is stopped");
42✔
343
    }
344

345
    /**
346
     * If server is running (accepting connections and messages).
347
     * @return bool
348
     */
349
    public function isRunning(): bool
350
    {
351
        return $this->running;
4✔
352
    }
353

354

355
    /* ---------- Connection management ---------------------------------------------------------------------------- */
356

357
    /**
358
     * Disconnect all connections and stop server.
359
     */
360
    public function disconnect(): void
361
    {
362
        $this->running = false;
10✔
363
        foreach ($this->connections as $connection) {
10✔
364
            $connection->disconnect();
8✔
365
            $this->dispatch('disconnect', [$this, $connection]);
8✔
366
        }
367
        $this->connections = [];
10✔
368
        $this->server->close();
10✔
369
        $this->server = $this->streams = null;
10✔
370
        $this->logger->info('[server] Server disconnected');
10✔
371
    }
372

373

374
    /* ---------- Internal helper methods -------------------------------------------------------------------------- */
375

376
    // Create socket server
377
    protected function createSocketServer(): void
378
    {
379
        try {
380
            $uri = new Uri("{$this->scheme}://0.0.0.0:{$this->port}");
44✔
381
            $this->server = $this->streamFactory->createSocketServer($uri);
44✔
382
            $this->streams = $this->streamFactory->createStreamCollection();
42✔
383
            $this->streams->attach($this->server, '@server');
42✔
384
            $this->logger->info("[server] Starting server on {$uri}.");
42✔
385
        } catch (Throwable $e) {
2✔
386
            $error = "Server failed to start: {$e->getMessage()}";
2✔
387
            throw new ServerException($error);
2✔
388
        }
389
    }
390

391
    // Accept connection on socket server
392
    protected function acceptSocket(SocketServer $socket): void
393
    {
394
        try {
395
            $stream = $socket->accept();
40✔
396
            $name = $stream->getRemoteName();
40✔
397
            $this->streams->attach($stream, $name);
40✔
398
            $connection = new Connection($stream, false, true);
40✔
399
            $connection
40✔
400
                ->setLogger($this->logger)
40✔
401
                ->setFrameSize($this->frameSize)
40✔
402
                ->setTimeout($this->timeout)
40✔
403
                ;
40✔
404
            foreach ($this->middlewares as $middleware) {
40✔
405
                $connection->addMiddleware($middleware);
2✔
406
            }
407
            $request = $this->performHandshake($connection);
40✔
408
            $this->connections[$name] = $connection;
24✔
409
            $this->logger->info("[server] Accepted connection from {$name}.");
24✔
410
            $this->dispatch('connect', [$this, $connection, $request]);
24✔
411
        } catch (Exception $e) {
16✔
412
            if ($connection) {
16✔
413
                $connection->disconnect();
16✔
414
            }
415
            $error = "Server failed to accept: {$e->getMessage()}";
16✔
416
            throw $e;
16✔
417
        }
418
    }
419

420
    // Detach connections no longer available
421
    protected function detachUnconnected(): void
422
    {
423
        foreach ($this->connections as $key => $connection) {
42✔
424
            if (!$connection->isConnected()) {
14✔
425
                $this->streams->detach($key);
2✔
426
                unset($this->connections[$key]);
2✔
427
                $this->logger->info("[server] Disconnected {$key}.");
2✔
428
                $this->dispatch('disconnect', [$this, $connection]);
2✔
429
            }
430
        }
431
    }
432

433
    // Perform upgrade handshake on new connections.
434
    protected function performHandshake(Connection $connection): ServerRequest
435
    {
436
        $response = new Response(101);
40✔
437
        $exception = null;
40✔
438

439
        // Read handshake request
440
        $request = $connection->pullHttp();
40✔
441

442
        // Verify handshake request
443
        try {
444
            if ($request->getMethod() != 'GET') {
38✔
445
                throw new HandshakeException(
2✔
446
                    "Handshake request with invalid method: '{$request->getMethod()}'",
2✔
447
                    $response->withStatus(405)
2✔
448
                );
2✔
449
            }
450
            $connectionHeader = trim($request->getHeaderLine('Connection'));
36✔
451
            if (strtolower($connectionHeader) != 'upgrade') {
36✔
452
                throw new HandshakeException(
2✔
453
                    "Handshake request with invalid Connection header: '{$connectionHeader}'",
2✔
454
                    $response->withStatus(426)
2✔
455
                );
2✔
456
            }
457
            $upgradeHeader = trim($request->getHeaderLine('Upgrade'));
34✔
458
            if (strtolower($upgradeHeader) != 'websocket') {
34✔
459
                throw new HandshakeException(
2✔
460
                    "Handshake request with invalid Upgrade header: '{$upgradeHeader}'",
2✔
461
                    $response->withStatus(426)
2✔
462
                );
2✔
463
            }
464
            $versionHeader = trim($request->getHeaderLine('Sec-WebSocket-Version'));
32✔
465
            if ($versionHeader != '13') {
32✔
466
                throw new HandshakeException(
2✔
467
                    "Handshake request with invalid Sec-WebSocket-Version header: '{$versionHeader}'",
2✔
468
                    $response->withStatus(426)->withHeader('Sec-WebSocket-Version', '13')
2✔
469
                );
2✔
470
            }
471
            $keyHeader = trim($request->getHeaderLine('Sec-WebSocket-Key'));
30✔
472
            if (empty($keyHeader)) {
30✔
473
                throw new HandshakeException(
2✔
474
                    "Handshake request with invalid Sec-WebSocket-Key header: '{$keyHeader}'",
2✔
475
                    $response->withStatus(426)
2✔
476
                );
2✔
477
            }
478
            if (strlen(base64_decode($keyHeader)) != 16) {
28✔
479
                throw new HandshakeException(
2✔
480
                    "Handshake request with invalid Sec-WebSocket-Key header: '{$keyHeader}'",
2✔
481
                    $response->withStatus(426)
2✔
482
                );
2✔
483
            }
484
            $protocolHeader = trim($request->getHeaderLine('Sec-WebSocket-protocol'));
26✔
485
            if ($this->supportedSubProtocols && false === in_array($protocolHeader, $this->supportedSubProtocols)) {
26✔
NEW
UNCOV
486
                throw new HandshakeException(
×
NEW
UNCOV
487
                    "Handshake request with unsupported Sec-WebSocket-Protocol header: '{$keyHeader}'",
×
NEW
UNCOV
488
                    $response->withStatus(426)
×
NEW
UNCOV
489
                );
×
490
            }
491

492
            $responseKey = base64_encode(pack('H*', sha1($keyHeader . self::GUID)));
26✔
493
            $response = $response
26✔
494
                ->withHeader('Upgrade', 'websocket')
26✔
495
                ->withHeader('Connection', 'Upgrade')
26✔
496
                ->withHeader('Sec-WebSocket-Accept', $responseKey);
26✔
497

498
            if ($this->supportedSubProtocols) {
26✔
499
                $response = $response->withHeader('Sec-WebSocket-Protocol', $protocolHeader);
26✔
500
            }
501
        } catch (HandshakeException $e) {
12✔
502
            $this->logger->warning("[server] {$e->getMessage()}");
12✔
503
            $response = $e->getResponse();
12✔
504
            $exception = $e;
12✔
505
        }
506

507
        // Respond to handshake
508
        $connection->pushHttp($response);
38✔
509
        if ($exception) {
36✔
510
            throw $exception;
12✔
511
        }
512

513
        $this->logger->debug("[server] Handshake on {$request->getUri()->getPath()}");
24✔
514
        $connection->setHandshakeRequest($request);
24✔
515
        $connection->setHandshakeResponse($response);
24✔
516

517
        return $request;
24✔
518
    }
519
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc