• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sirn-se / websocket-php / 6517331921

14 Oct 2023 11:18AM UTC coverage: 94.946% (+12.4%) from 82.538%
6517331921

push

github

sirn-se
Client listener

120 of 120 new or added lines in 5 files covered. (100.0%)

789 of 831 relevant lines covered (94.95%)

18.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.3
/src/Client.php
1
<?php
2

3
/**
4
 * Copyright (C) 2014-2023 Textalk and contributors.
5
 *
6
 * This file is part of Websocket PHP and is free software under the ISC License.
7
 * License text: https://raw.githubusercontent.com/sirn-se/websocket-php/master/COPYING.md
8
 */
9

10
namespace WebSocket;
11

12
use InvalidArgumentException;
13
use Phrity\Net\{
14
    StreamFactory,
15
    Uri
16
};
17
use Psr\Http\Message\UriInterface;
18
use Psr\Log\{
19
    LoggerAwareInterface,
20
    LoggerInterface,
21
    NullLogger
22
};
23
use Stringable;
24
use Throwable;
25
use WebSocket\Exception\{
26
    BadUriException,
27
    ClientException,
28
    HandshakeException
29
};
30
use WebSocket\Http\{
31
    Request,
32
    Response
33
};
34
use WebSocket\Message\Message;
35
use WebSocket\Middleware\MiddlewareInterface;
36
use WebSocket\Trait\{
37
    ListenerTrait,
38
    OpcodeTrait,
39
    SendMethodsTrait
40
};
41

42
/**
43
 * WebSocket\Client class.
44
 * Entry class for WebSocket client.
45
 */
46
class Client implements LoggerAwareInterface, Stringable
47
{
48
    use ListenerTrait;
49
    use OpcodeTrait;
50
    use SendMethodsTrait;
51

52
    // Settings
53
    private $logger;
54
    private $timeout = 60;
55
    private $frameSize = 4096;
56
    private $persistent = false;
57
    private $context = [];
58
    private $headers = [];
59

60
    // Internal resources
61
    private $streamFactory;
62
    private $socketUri;
63
    private $connection;
64
    private $middlewares = [];
65
    private $streams;
66
    private $running = false;
67

68

69
    /* ---------- Magic methods ------------------------------------------------------------------------------------ */
70

71
    /**
72
     * @param UriInterface|string $uri A ws/wss-URI
73
     */
74
    public function __construct(UriInterface|string $uri)
75
    {
76
        $this->socketUri = $this->parseUri($uri);
38✔
77
        $this->logger = new NullLogger();
35✔
78
        $this->setStreamFactory(new StreamFactory());
35✔
79
    }
80

81
    /**
82
     * Get string representation of instance.
83
     * @return string String representation.
84
     */
85
    public function __toString(): string
86
    {
87
        return sprintf("Client(%s)", $this->connection ? $this->socketUri->__toString() : 'closed');
1✔
88
    }
89

90

91
    /* ---------- Configuration ------------------------------------------------------------------------------------ */
92

93
    /**
94
     * Set stream factory to use.
95
     * @param StreamFactory $streamFactory.
96
     */
97
    public function setStreamFactory(StreamFactory $streamFactory): self
98
    {
99
        $this->streamFactory = $streamFactory;
35✔
100
        return $this;
35✔
101
    }
102

103
    /**
104
     * Set logger.
105
     * @param Psr\Log\LoggerInterface $logger Logger implementation
106
     * @return self.
107
     */
108
    public function setLogger(LoggerInterface $logger): self
109
    {
110
        $this->logger = $logger;
2✔
111
        if ($this->connection) {
2✔
112
            $this->connection->setLogger($this->logger);
1✔
113
        }
114
        return $this;
2✔
115
    }
116

117
    /**
118
     * Set timeout.
119
     * @param int $timeout Timeout in seconds.
120
     */
121
    public function setTimeout(int $timeout): self
122
    {
123
        if ($timeout < 0) {
3✔
124
            throw new InvalidArgumentException("Invalid timeout '{$timeout}' provided");
×
125
        }
126
        $this->timeout = $timeout;
3✔
127
        if ($this->connection) {
3✔
128
            $this->connection->setTimeout($timeout);
1✔
129
        }
130
        return $this;
3✔
131
    }
132

133
    /**
134
     * Get timeout.
135
     * @return int Timeout in seconds.
136
     */
137
    public function getTimeout(): int
138
    {
139
        return $this->timeout;
1✔
140
    }
141

142
    /**
143
     * Set frame size.
144
     * @param int $frameSize Frame size in bytes.
145
     * @return self.
146
     */
147
    public function setFrameSize(int $frameSize): self
148
    {
149
        if ($frameSize < 1) {
5✔
150
            throw new InvalidArgumentException("Invalid frameSize '{$frameSize}' provided");
×
151
        }
152
        $this->frameSize = $frameSize;
5✔
153
        if ($this->connection) {
5✔
154
            $this->connection->setFrameSize($frameSize);
2✔
155
        }
156
        return $this;
5✔
157
    }
158

159
    /**
160
     * Get frame size.
161
     * @return int Frame size in bytes.
162
     */
163
    public function getFrameSize(): int
164
    {
165
        return $this->frameSize;
6✔
166
    }
167

168
    /**
169
     * Set connection persistence.
170
     * @param bool $persistent True for persistent connection.
171
     * @return self.
172
     */
173
    public function setPersistent(bool $persistent): self
174
    {
175
        $this->persistent = $persistent;
1✔
176
        return $this;
1✔
177
    }
178

179
    /**
180
     * Set connection context.
181
     * @param array $context Context as array, see https://www.php.net/manual/en/context.php.
182
     * @return self.
183
     */
184
    public function setContext(array $context): self
185
    {
186
        $this->context = $context;
1✔
187
        return $this;
1✔
188
    }
189

190
    /**
191
     * Add header for handshake.
192
     * @param string $name Header name
193
     * @param string $content Header content
194
     * @return self.
195
     */
196
    public function addHeader(string $name, string $content): self
197
    {
198
        $this->headers[$name] = $content;
1✔
199
        return $this;
1✔
200
    }
201

202
    /**
203
     * Add a middleware.
204
     * @param MiddlewareInterface $middleware
205
     * @return self.
206
     */
207
    public function addMiddleware(MiddlewareInterface $middleware): self
208
    {
209
        $this->middlewares[] = $middleware;
5✔
210
        if ($this->connection) {
5✔
211
            $this->connection->addMiddleware($middleware);
×
212
        }
213
        return $this;
5✔
214
    }
215

216

217
    /* ---------- Messaging operations ----------------------------------------------------------------------------- */
218

219
    /**
220
     * Send message.
221
     * @param Message $message Message to send.
222
     * @return Message Sent message
223
     */
224
    public function send(Message $message): Message
225
    {
226
        if (!$this->isConnected()) {
9✔
227
            $this->connect();
1✔
228
        }
229
        return $this->connection->pushMessage($message);
9✔
230
    }
231

232
    /**
233
     * Receive message.
234
     * Note that this operation will block reading.
235
     * @return Message|null
236
     */
237
    public function receive(): Message|null
238
    {
239
        if (!$this->isConnected()) {
11✔
240
            $this->connect();
1✔
241
        }
242
        return $this->connection->pullMessage();
11✔
243
    }
244

245

246
    /* ---------- Listener operations ------------------------------------------------------------------------------ */
247

248
    /**
249
     * Start client listener.
250
     */
251
    public function start(): void
252
    {
253
        if (!$this->isConnected()) {
×
254
            $this->connect();
×
255
        }
256

257
        // Check if running
258
        if ($this->running) {
×
259
            $this->logger->warning("[client] Client is already running");
×
260
            return;
×
261
        }
262
        $this->running = true;
×
263
        $this->logger->info("[client] Client is running");
×
264

265
        // Run handler
266
        while ($this->running) {
×
267
            try {
268
                // Get streams with readable content
269
                $readables = $this->streams->waitRead($this->timeout);
×
270
                foreach ($readables as $key => $readable) {
×
271
                    try {
272
                        // Read from connection
273
                        if ($message = $this->connection->pullMessage()) {
×
274
                            $this->dispatch($message->getOpcode(), [$this, $this->connection, $message]);
×
275
                        }
276
                    } catch (MessageLevelInterface $e) {
×
277
                        // Error, but keep connection open
278
                        $this->logger->error("[client] {$e->getMessage()}");
×
279
                        $this->dispatch('error', [$this, $this->connection, $e]);
×
280
                    } catch (ConnectionLevelInterface $e) {
×
281
                        // Error, disconnect connection
282
                        if ($this->connection) {
×
283
                            $this->connection->disconnect();
×
284
                        }
285
                        $this->logger->error("[client] {$e->getMessage()}");
×
286
                        $this->dispatch('error', [$this, $this->connection, $e]);
×
287
                    }
288
                }
289
                if (!$this->connection->isConnected()) {
×
290
                    $this->running = false;
×
291
                }
292
                $this->dispatch('tick', [$this]);
×
293
            } catch (Exception $e) {
×
294
                // Low-level error
295
                $this->logger->error("[client] {$e->getMessage()}");
×
296
                $this->dispatch('error', [$this, null, $e]);
×
297
            } catch (Throwable $e) {
×
298
                // Crash it
299
                $this->logger->error("[client] {$e->getMessage()}");
×
300
                $this->dispatch('error', [$this, null, $e]);
×
301
                $this->disconnect();
×
302
                throw $e;
×
303
            }
304
            gc_collect_cycles(); // Collect garbage
×
305
        }
306
    }
307

308
    /**
309
     * Stop client listener (resumable).
310
     */
311
    public function stop(): void
312
    {
313
        $this->running = false;
×
314
        $this->logger->info("[client] Client is stopped");
×
315
    }
316

317
    /**
318
     * If client is running (accepting messages).
319
     * @return bool.
320
     */
321
    public function isRunning(): bool
322
    {
323
        return $this->running;
×
324
    }
325

326

327
    /* ---------- Connection management ---------------------------------------------------------------------------- */
328

329
    /**
330
     * If Client has active connection.
331
     * @return bool True if active connection.
332
     */
333
    public function isConnected(): bool
334
    {
335
        return $this->connection && $this->connection->isConnected();
34✔
336
    }
337

338
    /**
339
     * If Client is readable.
340
     * @return bool
341
     */
342
    public function isReadable(): bool
343
    {
344
        return $this->connection && $this->connection->isReadable();
×
345
    }
346

347
    /**
348
     * If Client is writable.
349
     * @return bool
350
     */
351
    public function isWritable(): bool
352
    {
353
        return $this->connection && $this->connection->isWritable();
×
354
    }
355

356

357
    /**
358
     * Connect to server and perform upgrade.
359
     * @throws ClientException On failed connection
360
     */
361
    public function connect(): void
362
    {
363
        $this->disconnect();
33✔
364
        $this->streams = $this->streamFactory->createStreamCollection();
33✔
365

366
        $host_uri = (new Uri())
33✔
367
            ->withScheme($this->socketUri->getScheme() == 'wss' ? 'ssl' : 'tcp')
33✔
368
            ->withHost($this->socketUri->getHost(Uri::IDNA))
33✔
369
            ->withPort($this->socketUri->getPort(Uri::REQUIRE_PORT));
33✔
370

371
        $stream = null;
33✔
372

373
        try {
374
            $client = $this->streamFactory->createSocketClient($host_uri);
33✔
375
            $client->setPersistent($this->persistent);
33✔
376
            $client->setTimeout($this->timeout);
33✔
377
            $client->setContext($this->context);
33✔
378
            $stream = $client->connect();
33✔
379
        } catch (Throwable $e) {
1✔
380
            $error = "Could not open socket to \"{$host_uri}\": {$e->getMessage()}";
1✔
381
            $this->logger->error("[client] {$error}", []);
1✔
382
            throw new ClientException($error);
1✔
383
        }
384
        $name = $stream->getRemoteName();
32✔
385
        $this->streams->attach($stream, $name);
32✔
386
        $this->connection = new Connection($stream, true, false);
32✔
387
        $this->connection->setFrameSize($this->frameSize);
32✔
388
        $this->connection->setTimeout($this->timeout);
32✔
389
        $this->connection->setLogger($this->logger);
32✔
390
        foreach ($this->middlewares as $middleware) {
32✔
391
            $this->connection->addMiddleware($middleware);
5✔
392
        }
393

394
        if (!$this->isConnected()) {
32✔
395
            $error = "Invalid stream on \"{$host_uri}\".";
1✔
396
            $this->logger->error("[client] {$error}");
1✔
397
            throw new ClientException($error);
1✔
398
        }
399

400
        if (!$this->persistent || $stream->tell() == 0) {
31✔
401
            $response = $this->performHandshake($host_uri);
31✔
402
        }
403

404
        $this->dispatch('connect', [$this, $this->connection, $response]);
27✔
405
        $this->logger->info("[client] Client connected to {$this->socketUri}");
27✔
406
    }
407

408
    /**
409
     * Disconnect from server.
410
     */
411
    public function disconnect(): void
412
    {
413
        if ($this->isConnected()) {
33✔
414
            $this->connection->disconnect();
1✔
415
            $this->dispatch('disconnect', [$this, $this->connection]);
1✔
416
            $this->logger->info('[client] Client disconnected');
1✔
417
        }
418
    }
419

420

421
    /* ---------- Connection wrapper methods ----------------------------------------------------------------------- */
422

423
    /**
424
     * Get name of local socket, or null if not connected.
425
     * @return string|null
426
     */
427
    public function getName(): ?string
428
    {
429
        return $this->isConnected() ? $this->connection->getName() : null;
1✔
430
    }
431

432
    /**
433
     * Get name of remote socket, or null if not connected.
434
     * @return string|null
435
     */
436
    public function getRemoteName(): ?string
437
    {
438
        return $this->isConnected() ? $this->connection->getRemoteName() : null;
1✔
439
    }
440

441
    /**
442
     * Get Response for handshake procedure.
443
     * @return Response|null Handshake.
444
     */
445
    public function getHandshakeResponse(): ?Response
446
    {
447
        return $this->connection ? $this->connection->getHandshakeResponse() : null;
1✔
448
    }
449

450

451
    /* ---------- Internal helper methods -------------------------------------------------------------------------- */
452

453
    /**
454
     * Perform upgrade handshake on new connections.
455
     * @throws HandshakeException On failed handshake
456
     */
457
    protected function performHandshake(Uri $host_uri): Response
458
    {
459
        $http_uri = (new Uri())
31✔
460
            ->withPath($this->socketUri->getPath(), Uri::ABSOLUTE_PATH)
31✔
461
            ->withQuery($this->socketUri->getQuery());
31✔
462

463
        // Generate the WebSocket key.
464
        $key = $this->generateKey();
31✔
465

466
        $request = new Request('GET', $http_uri);
31✔
467

468
        $request = $request
31✔
469
            ->withHeader('Host', $host_uri->getAuthority())
31✔
470
            ->withHeader('User-Agent', 'websocket-client-php')
31✔
471
            ->withHeader('Connection', 'Upgrade')
31✔
472
            ->withHeader('Upgrade', 'websocket')
31✔
473
            ->withHeader('Sec-WebSocket-Key', $key)
31✔
474
            ->withHeader('Sec-WebSocket-Version', '13');
31✔
475

476
        // Handle basic authentication.
477
        if ($userinfo = $this->socketUri->getUserInfo()) {
31✔
478
            $request = $request->withHeader('authorization', 'Basic ' . base64_encode($userinfo));
1✔
479
        }
480

481
        // Add and override with headers.
482
        foreach ($this->headers as $name => $content) {
31✔
483
            $request = $request->withHeader($name, $content);
1✔
484
        }
485

486
        $this->connection->pushHttp($request);
31✔
487
        $response = $this->connection->pullHttp();
31✔
488

489
        try {
490
            if ($response->getStatusCode() != 101) {
30✔
491
                throw new HandshakeException("Invalid status code {$response->getStatusCode()}.", $response);
1✔
492
            }
493

494
            if (empty($response->getHeaderLine('Sec-WebSocket-Accept'))) {
29✔
495
                throw new HandshakeException(
1✔
496
                    "Connection to '{$this->socketUri}' failed: Server sent invalid upgrade response.",
1✔
497
                    $response
1✔
498
                );
1✔
499
            }
500

501
            $response_key = trim($response->getHeaderLine('Sec-WebSocket-Accept'));
28✔
502
            $expected_key = base64_encode(
28✔
503
                pack('H*', sha1($key . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'))
28✔
504
            );
28✔
505

506
            if ($response_key !== $expected_key) {
28✔
507
                throw new HandshakeException("Server sent bad upgrade response.", $response);
28✔
508
            }
509
        } catch (HandshakeException $e) {
3✔
510
            $this->logger->error("[client] {$e->getMessage()}");
3✔
511
            throw $e;
3✔
512
        }
513

514
        $this->logger->debug("[server] Handshake on {$http_uri->getPath()}");
27✔
515
        $this->connection->setHandshakeRequest($request);
27✔
516
        $this->connection->setHandshakeResponse($response);
27✔
517

518
        return $response;
27✔
519
    }
520

521
    /**
522
     * Generate a random string for WebSocket key.
523
     * @return string Random string
524
     */
525
    protected function generateKey(): string
526
    {
527
        $key = '';
31✔
528
        for ($i = 0; $i < 16; $i++) {
31✔
529
            $key .= chr(rand(33, 126));
31✔
530
        }
531
        return base64_encode($key);
31✔
532
    }
533

534
    /**
535
     * Ensure URI insatnce to use in client.
536
     * @param UriInterface|string $uri A ws/wss-URI
537
     * @return Uri
538
     * @throws BadUriException On invalid URI
539
     */
540
    protected function parseUri($uri): UriInterface
541
    {
542
        if ($uri instanceof Uri) {
38✔
543
            $uri_instance = $uri;
3✔
544
        } elseif ($uri instanceof UriInterface) {
35✔
545
            $uri_instance = new Uri("{$uri}");
1✔
546
        } elseif (is_string($uri)) {
34✔
547
            try {
548
                $uri_instance = new Uri($uri);
34✔
549
            } catch (InvalidArgumentException $e) {
1✔
550
                throw new BadUriException("Invalid URI '{$uri}' provided.");
1✔
551
            }
552
        } else {
553
            throw new BadUriException("Provided URI must be a UriInterface or string.");
×
554
        }
555
        if (!in_array($uri_instance->getScheme(), ['ws', 'wss'])) {
37✔
556
            throw new BadUriException("Invalid URI scheme, must be 'ws' or 'wss'.");
1✔
557
        }
558
        if (!$uri_instance->getHost()) {
36✔
559
            throw new BadUriException("Invalid URI host.");
1✔
560
        }
561
        return $uri_instance;
35✔
562
    }
563
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc