• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sirn-se / websocket-php / 8346487915

19 Mar 2024 04:15PM UTC coverage: 22.584% (-77.4%) from 100.0%
8346487915

push

github

sirn-se
Temp test verification

2 of 2 new or added lines in 1 file covered. (100.0%)

742 existing lines in 32 files now uncovered.

222 of 983 relevant lines covered (22.58%)

0.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

43.26
/src/Client.php
1
<?php
2

3
/**
4
 * Copyright (C) 2014-2024 Textalk and contributors.
5
 * This file is part of Websocket PHP and is free software under the ISC License.
6
 */
7

8
namespace WebSocket;
9

10
use InvalidArgumentException;
11
use Phrity\Net\{
12
    StreamFactory,
13
    Uri
14
};
15
use Psr\Http\Message\UriInterface;
16
use Psr\Log\{
17
    LoggerAwareInterface,
18
    LoggerInterface,
19
    NullLogger
20
};
21
use Stringable;
22
use Throwable;
23
use WebSocket\Exception\{
24
    BadUriException,
25
    ClientException,
26
    ConnectionLevelInterface,
27
    Exception,
28
    HandshakeException,
29
    MessageLevelInterface,
30
    ReconnectException
31
};
32
use WebSocket\Http\{
33
    Request,
34
    Response
35
};
36
use WebSocket\Message\Message;
37
use WebSocket\Middleware\MiddlewareInterface;
38
use WebSocket\Trait\{
39
    ListenerTrait,
40
    SendMethodsTrait,
41
    StringableTrait
42
};
43

44
/**
45
 * WebSocket\Client class.
46
 * Entry class for WebSocket client.
47
 */
48
class Client implements LoggerAwareInterface, Stringable
49
{
50
    use ListenerTrait;
51
    use SendMethodsTrait;
52
    use StringableTrait;
53

54
    // Settings
55
    private $logger;
56
    private $timeout = 60;
57
    private $frameSize = 4096;
58
    private $persistent = false;
59
    private $context = [];
60
    private $headers = [];
61

62
    // Internal resources
63
    private $streamFactory;
64
    private $socketUri;
65
    private $connection;
66
    private $middlewares = [];
67
    private $streams;
68
    private $running = false;
69

70

71
    /* ---------- Magic methods ------------------------------------------------------------------------------------ */
72

73
    /**
74
     * @param Psr\Http\Message\UriInterface|string $uri A ws/wss-URI
75
     */
76
    public function __construct(UriInterface|string $uri)
77
    {
78
        $this->socketUri = $this->parseUri($uri);
1✔
79
        $this->logger = new NullLogger();
1✔
80
        $this->setStreamFactory(new StreamFactory());
1✔
81
    }
82

83
    /**
84
     * Get string representation of instance.
85
     * @return string String representation
86
     */
87
    public function __toString(): string
88
    {
UNCOV
89
        return $this->stringable('%s', $this->connection ? $this->socketUri->__toString() : 'closed');
×
90
    }
91

92

93
    /* ---------- Configuration ------------------------------------------------------------------------------------ */
94

95
    /**
96
     * Set stream factory to use.
97
     * @param Phrity\Net\StreamFactory $streamFactory
98
     * @return self
99
     */
100
    public function setStreamFactory(StreamFactory $streamFactory): self
101
    {
102
        $this->streamFactory = $streamFactory;
1✔
103
        return $this;
1✔
104
    }
105

106
    /**
107
     * Set logger.
108
     * @param Psr\Log\LoggerInterface $logger Logger implementation
109
     * @return self.
110
     */
111
    public function setLogger(LoggerInterface $logger): self
112
    {
UNCOV
113
        $this->logger = $logger;
×
UNCOV
114
        if ($this->connection) {
×
UNCOV
115
            $this->connection->setLogger($this->logger);
×
116
        }
UNCOV
117
        return $this;
×
118
    }
119

120
    /**
121
     * Set timeout.
122
     * @param int $timeout Timeout in seconds
123
     * @return self
124
     * @throws InvalidArgumentException If invalid timeout provided
125
     */
126
    public function setTimeout(int $timeout): self
127
    {
UNCOV
128
        if ($timeout < 0) {
×
UNCOV
129
            throw new InvalidArgumentException("Invalid timeout '{$timeout}' provided");
×
130
        }
UNCOV
131
        $this->timeout = $timeout;
×
UNCOV
132
        if ($this->connection) {
×
UNCOV
133
            $this->connection->setTimeout($timeout);
×
134
        }
UNCOV
135
        return $this;
×
136
    }
137

138
    /**
139
     * Get timeout.
140
     * @return int Timeout in seconds
141
     */
142
    public function getTimeout(): int
143
    {
UNCOV
144
        return $this->timeout;
×
145
    }
146

147
    /**
148
     * Set frame size.
149
     * @param int $frameSize Frame size in bytes
150
     * @return self
151
     * @throws InvalidArgumentException If invalid frameSize provided
152
     */
153
    public function setFrameSize(int $frameSize): self
154
    {
UNCOV
155
        if ($frameSize < 1) {
×
UNCOV
156
            throw new InvalidArgumentException("Invalid frameSize '{$frameSize}' provided");
×
157
        }
UNCOV
158
        $this->frameSize = $frameSize;
×
UNCOV
159
        if ($this->connection) {
×
UNCOV
160
            $this->connection->setFrameSize($frameSize);
×
161
        }
UNCOV
162
        return $this;
×
163
    }
164

165
    /**
166
     * Get frame size.
167
     * @return int Frame size in bytes
168
     */
169
    public function getFrameSize(): int
170
    {
UNCOV
171
        return $this->frameSize;
×
172
    }
173

174
    /**
175
     * Set connection persistence.
176
     * @param bool $persistent True for persistent connection.
177
     * @return self.
178
     */
179
    public function setPersistent(bool $persistent): self
180
    {
UNCOV
181
        $this->persistent = $persistent;
×
UNCOV
182
        return $this;
×
183
    }
184

185
    /**
186
     * Set connection context.
187
     * @param array $context Context as array, see https://www.php.net/manual/en/context.php
188
     * @return self
189
     */
190
    public function setContext(array $context): self
191
    {
UNCOV
192
        $this->context = $context;
×
UNCOV
193
        return $this;
×
194
    }
195

196
    /**
197
     * Add header for handshake.
198
     * @param string $name Header name
199
     * @param string $content Header content
200
     * @return self
201
     */
202
    public function addHeader(string $name, string $content): self
203
    {
UNCOV
204
        $this->headers[$name] = $content;
×
UNCOV
205
        return $this;
×
206
    }
207

208
    /**
209
     * Add a middleware.
210
     * @param WebSocket\Middleware\MiddlewareInterface $middleware
211
     * @return self
212
     */
213
    public function addMiddleware(MiddlewareInterface $middleware): self
214
    {
UNCOV
215
        $this->middlewares[] = $middleware;
×
UNCOV
216
        if ($this->connection) {
×
UNCOV
217
            $this->connection->addMiddleware($middleware);
×
218
        }
UNCOV
219
        return $this;
×
220
    }
221

222

223
    /* ---------- Messaging operations ----------------------------------------------------------------------------- */
224

225
    /**
226
     * Send message.
227
     * @param Message $message Message to send.
228
     * @return Message Sent message
229
     */
230
    public function send(Message $message): Message
231
    {
UNCOV
232
        if (!$this->isConnected()) {
×
UNCOV
233
            $this->connect();
×
234
        }
UNCOV
235
        return $this->connection->pushMessage($message);
×
236
    }
237

238
    /**
239
     * Receive message.
240
     * Note that this operation will block reading.
241
     * @return Message|null
242
     */
243
    public function receive(): Message|null
244
    {
UNCOV
245
        if (!$this->isConnected()) {
×
UNCOV
246
            $this->connect();
×
247
        }
UNCOV
248
        return $this->connection->pullMessage();
×
249
    }
250

251

252
    /* ---------- Listener operations ------------------------------------------------------------------------------ */
253

254
    /**
255
     * Start client listener.
256
     * @throws Throwable On low level error
257
     */
258
    public function start(): void
259
    {
260
        // Check if running
UNCOV
261
        if ($this->running) {
×
UNCOV
262
            $this->logger->warning("[client] Client is already running");
×
UNCOV
263
            return;
×
264
        }
UNCOV
265
        $this->running = true;
×
UNCOV
266
        $this->logger->info("[client] Client is running");
×
267

UNCOV
268
        if (!$this->isConnected()) {
×
UNCOV
269
            $this->connect();
×
270
        }
271

272
        // Run handler
UNCOV
273
        while ($this->running) {
×
274
            try {
275
                // Get streams with readable content
UNCOV
276
                $readables = $this->streams->waitRead($this->timeout);
×
UNCOV
277
                foreach ($readables as $key => $readable) {
×
278
                    try {
279
                        // Read from connection
UNCOV
280
                        if ($message = $this->connection->pullMessage()) {
×
UNCOV
281
                            $this->dispatch($message->getOpcode(), [$this, $this->connection, $message]);
×
282
                        }
UNCOV
283
                    } catch (MessageLevelInterface $e) {
×
284
                        // Error, but keep connection open
UNCOV
285
                        $this->logger->error("[client] {$e->getMessage()}");
×
UNCOV
286
                        $this->dispatch('error', [$this, $this->connection, $e]);
×
UNCOV
287
                    } catch (ConnectionLevelInterface $e) {
×
288
                        // Error, disconnect connection
UNCOV
289
                        $this->disconnect();
×
UNCOV
290
                        $this->logger->error("[client] {$e->getMessage()}");
×
UNCOV
291
                        $this->dispatch('error', [$this, $this->connection, $e]);
×
292
                    }
293
                }
UNCOV
294
                if (!$this->connection->isConnected()) {
×
UNCOV
295
                    $this->running = false;
×
296
                }
UNCOV
297
                $this->connection->tick();
×
UNCOV
298
                $this->dispatch('tick', [$this]);
×
UNCOV
299
            } catch (Exception $e) {
×
UNCOV
300
                $this->disconnect();
×
UNCOV
301
                $this->running = false;
×
302

303
                // Low-level error
UNCOV
304
                $this->logger->error("[client] {$e->getMessage()}");
×
UNCOV
305
                $this->dispatch('error', [$this, null, $e]);
×
UNCOV
306
            } catch (Throwable $e) {
×
UNCOV
307
                $this->disconnect();
×
UNCOV
308
                $this->running = false;
×
309

310
                // Crash it
UNCOV
311
                $this->logger->error("[client] {$e->getMessage()}");
×
UNCOV
312
                $this->dispatch('error', [$this, null, $e]);
×
UNCOV
313
                throw $e;
×
314
            }
UNCOV
315
            gc_collect_cycles(); // Collect garbage
×
316
        }
317
    }
318

319
    /**
320
     * Stop client listener (resumable).
321
     */
322
    public function stop(): void
323
    {
UNCOV
324
        $this->running = false;
×
UNCOV
325
        $this->logger->info("[client] Client is stopped");
×
326
    }
327

328
    /**
329
     * If client is running (accepting messages).
330
     * @return bool
331
     */
332
    public function isRunning(): bool
333
    {
UNCOV
334
        return $this->running;
×
335
    }
336

337

338
    /* ---------- Connection management ---------------------------------------------------------------------------- */
339

340
    /**
341
     * If Client has active connection.
342
     * @return bool True if active connection.
343
     */
344
    public function isConnected(): bool
345
    {
346
        return $this->connection && $this->connection->isConnected();
1✔
347
    }
348

349
    /**
350
     * If Client is readable.
351
     * @return bool
352
     */
353
    public function isReadable(): bool
354
    {
UNCOV
355
        return $this->connection && $this->connection->isReadable();
×
356
    }
357

358
    /**
359
     * If Client is writable.
360
     * @return bool
361
     */
362
    public function isWritable(): bool
363
    {
UNCOV
364
        return $this->connection && $this->connection->isWritable();
×
365
    }
366

367

368
    /**
369
     * Connect to server and perform upgrade.
370
     * @throws ClientException On failed connection
371
     */
372
    public function connect(): void
373
    {
374
        $this->disconnect();
1✔
375
        $this->streams = $this->streamFactory->createStreamCollection();
1✔
376

377
        $host_uri = (new Uri())
1✔
378
            ->withScheme($this->socketUri->getScheme() == 'wss' ? 'ssl' : 'tcp')
1✔
379
            ->withHost($this->socketUri->getHost(Uri::IDN_ENCODE))
1✔
380
            ->withPort($this->socketUri->getPort(Uri::REQUIRE_PORT));
1✔
381

382
        $stream = null;
1✔
383

384
        try {
385
            $client = $this->streamFactory->createSocketClient($host_uri);
1✔
386
            $client->setPersistent($this->persistent);
1✔
387
            $client->setTimeout($this->timeout);
1✔
388
            $client->setContext($this->context);
1✔
389
            $stream = $client->connect();
1✔
UNCOV
390
        } catch (Throwable $e) {
×
UNCOV
391
            $error = "Could not open socket to \"{$host_uri}\": {$e->getMessage()}";
×
UNCOV
392
            $this->logger->error("[client] {$error}", []);
×
UNCOV
393
            throw new ClientException($error);
×
394
        }
395
        $name = $stream->getRemoteName();
1✔
396
        $this->streams->attach($stream, $name);
1✔
397
        $this->connection = new Connection($stream, true, false, $host_uri->getScheme() === 'ssl');
1✔
398
        $this->connection->setFrameSize($this->frameSize);
1✔
399
        $this->connection->setTimeout($this->timeout);
1✔
400
        $this->connection->setLogger($this->logger);
1✔
401
        foreach ($this->middlewares as $middleware) {
1✔
UNCOV
402
            $this->connection->addMiddleware($middleware);
×
403
        }
404

405
        if (!$this->isConnected()) {
1✔
UNCOV
406
            $error = "Invalid stream on \"{$host_uri}\".";
×
UNCOV
407
            $this->logger->error("[client] {$error}");
×
UNCOV
408
            throw new ClientException($error);
×
409
        }
410
        try {
411
            if (!$this->persistent || $stream->tell() == 0) {
1✔
412
                $response = $this->performHandshake($this->socketUri);
1✔
413
            }
414
        } catch (ReconnectException $e) {
1✔
415
            $this->logger->info("[client] {$e->getMessage()}");
1✔
416
            if ($uri = $e->getUri()) {
1✔
417
                $this->socketUri = $uri;
1✔
418
            }
419
            $this->connect();
1✔
420
            return;
1✔
421
        }
422
        $this->logger->info("[client] Client connected to {$this->socketUri}");
1✔
423
        $this->dispatch('connect', [$this, $this->connection, $response]);
1✔
424
    }
425

426
    /**
427
     * Disconnect from server.
428
     */
429
    public function disconnect(): void
430
    {
431
        if ($this->isConnected()) {
1✔
432
            $this->connection->disconnect();
1✔
433
            $this->logger->info('[client] Client disconnected');
1✔
434
            $this->dispatch('disconnect', [$this, $this->connection]);
1✔
435
        }
436
    }
437

438

439
    /* ---------- Connection wrapper methods ----------------------------------------------------------------------- */
440

441
    /**
442
     * Get name of local socket, or null if not connected.
443
     * @return string|null
444
     */
445
    public function getName(): string|null
446
    {
UNCOV
447
        return $this->isConnected() ? $this->connection->getName() : null;
×
448
    }
449

450
    /**
451
     * Get name of remote socket, or null if not connected.
452
     * @return string|null
453
     */
454
    public function getRemoteName(): string|null
455
    {
UNCOV
456
        return $this->isConnected() ? $this->connection->getRemoteName() : null;
×
457
    }
458

459
    /**
460
     * Get meta value on connection.
461
     * @param string $key Meta key
462
     * @return mixed Meta value
463
     */
464
    public function getMeta(string $key): mixed
465
    {
UNCOV
466
        return $this->isConnected() ? $this->connection->getMeta($key) : null;
×
467
    }
468

469
    /**
470
     * Get Response for handshake procedure.
471
     * @return Response|null Handshake.
472
     */
473
    public function getHandshakeResponse(): Response|null
474
    {
475
        return $this->connection ? $this->connection->getHandshakeResponse() : null;
1✔
476
    }
477

478

479
    /* ---------- Internal helper methods -------------------------------------------------------------------------- */
480

481
    /**
482
     * Perform upgrade handshake on new connections.
483
     * @throws HandshakeException On failed handshake
484
     */
485
    protected function performHandshake(Uri $uri): Response
486
    {
487
        // Generate the WebSocket key.
488
        $key = $this->generateKey();
1✔
489

490
        $request = new Request('GET', $uri);
1✔
491

492
        $request = $request
1✔
493
            ->withHeader('User-Agent', 'websocket-client-php')
1✔
494
            ->withHeader('Connection', 'Upgrade')
1✔
495
            ->withHeader('Upgrade', 'websocket')
1✔
496
            ->withHeader('Sec-WebSocket-Key', $key)
1✔
497
            ->withHeader('Sec-WebSocket-Version', '13');
1✔
498

499
        // Handle basic authentication.
500
        if ($userinfo = $uri->getUserInfo()) {
1✔
UNCOV
501
            $request = $request->withHeader('Authorization', 'Basic ' . base64_encode($userinfo));
×
502
        }
503

504
        // Add and override with headers.
505
        foreach ($this->headers as $name => $content) {
1✔
UNCOV
506
            $request = $request->withHeader($name, $content);
×
507
        }
508

509
        try {
510
            $request = $this->connection->pushHttp($request);
1✔
511
            $response = $this->connection->pullHttp();
1✔
512

513
            if ($response->getStatusCode() != 101) {
1✔
UNCOV
514
                throw new HandshakeException("Invalid status code {$response->getStatusCode()}.", $response);
×
515
            }
516

517
            if (empty($response->getHeaderLine('Sec-WebSocket-Accept'))) {
1✔
UNCOV
518
                throw new HandshakeException(
×
UNCOV
519
                    "Connection to '{$uri}' failed: Server sent invalid upgrade response.",
×
UNCOV
520
                    $response
×
UNCOV
521
                );
×
522
            }
523

524
            $response_key = trim($response->getHeaderLine('Sec-WebSocket-Accept'));
1✔
525
            $expected_key = base64_encode(
1✔
526
                pack('H*', sha1($key . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'))
1✔
527
            );
1✔
528

529
            if ($response_key !== $expected_key) {
1✔
530
                throw new HandshakeException("Server sent bad upgrade response.", $response);
1✔
531
            }
532
        } catch (HandshakeException $e) {
1✔
UNCOV
533
            $this->logger->error("[client] {$e->getMessage()}");
×
UNCOV
534
            throw $e;
×
535
        }
536

537
        $this->logger->debug("[client] Handshake on {$uri->getPath()}");
1✔
538
        $this->connection->setHandshakeRequest($request);
1✔
539
        $this->connection->setHandshakeResponse($response);
1✔
540

541
        return $response;
1✔
542
    }
543

544
    /**
545
     * Generate a random string for WebSocket key.
546
     * @return string Random string
547
     */
548
    protected function generateKey(): string
549
    {
550
        $key = '';
1✔
551
        for ($i = 0; $i < 16; $i++) {
1✔
552
            $key .= chr(rand(33, 126));
1✔
553
        }
554
        return base64_encode($key);
1✔
555
    }
556

557
    /**
558
     * Ensure URI instance to use in client.
559
     * @param UriInterface|string $uri A ws/wss-URI
560
     * @return Uri
561
     * @throws BadUriException On invalid URI
562
     */
563
    protected function parseUri(UriInterface|string $uri): Uri
564
    {
565
        if ($uri instanceof Uri) {
1✔
UNCOV
566
            $uri_instance = $uri;
×
567
        } elseif ($uri instanceof UriInterface) {
1✔
UNCOV
568
            $uri_instance = new Uri("{$uri}");
×
569
        } elseif (is_string($uri)) {
1✔
570
            try {
571
                $uri_instance = new Uri($uri);
1✔
UNCOV
572
            } catch (InvalidArgumentException $e) {
×
UNCOV
573
                throw new BadUriException("Invalid URI '{$uri}' provided.");
×
574
            }
575
        }
576
        if (!in_array($uri_instance->getScheme(), ['ws', 'wss'])) {
1✔
UNCOV
577
            throw new BadUriException("Invalid URI scheme, must be 'ws' or 'wss'.");
×
578
        }
579
        if (!$uri_instance->getHost()) {
1✔
UNCOV
580
            throw new BadUriException("Invalid URI host.");
×
581
        }
582
        return $uri_instance;
1✔
583
    }
584
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc