• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

prooph / event-store-client / 11262629555

09 Oct 2024 08:33PM UTC coverage: 70.107% (-0.2%) from 70.262%
11262629555

push

github

prolic
cs-fixer corrections

2 of 2 new or added lines in 1 file covered. (100.0%)

9 existing lines in 2 files now uncovered.

3457 of 4931 relevant lines covered (70.11%)

67.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.26
/src/Internal/EventStoreConnectionLogicHandler.php
1
<?php
2

3
/**
4
 * This file is part of `prooph/event-store-client`.
5
 * (c) 2018-2024 Alexander Miertsch <kontakt@codeliner.ws>
6
 * (c) 2018-2024 Sascha-Oliver Prolic <saschaprolic@googlemail.com>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11

12
declare(strict_types=1);
13

14
namespace Prooph\EventStoreClient\Internal;
15

16
use Amp\DeferredFuture;
17
use Closure;
18
use Exception;
19
use Prooph\EventStore\ClientAuthenticationFailedEventArgs;
20
use Prooph\EventStore\ClientClosedEventArgs;
21
use Prooph\EventStore\ClientConnectionEventArgs;
22
use Prooph\EventStore\ClientErrorEventArgs;
23
use Prooph\EventStore\ClientReconnectingEventArgs;
24
use Prooph\EventStore\EndPoint;
25
use Prooph\EventStore\EventStoreConnection;
26
use Prooph\EventStore\EventStoreSubscription;
27
use Prooph\EventStore\Exception\CannotEstablishConnection;
28
use Prooph\EventStore\Exception\EventStoreConnectionException;
29
use Prooph\EventStore\Exception\InvalidOperationException;
30
use Prooph\EventStore\Exception\ObjectDisposed;
31
use Prooph\EventStore\Internal\Consts;
32
use Prooph\EventStore\Internal\EventHandler;
33
use Prooph\EventStore\ListenerHandler;
34
use Prooph\EventStore\ResolvedEvent;
35
use Prooph\EventStore\SubscriptionDropReason;
36
use Prooph\EventStore\Util\Guid;
37
use Prooph\EventStoreClient\ClientOperations\ClientOperation;
38
use Prooph\EventStoreClient\ClientOperations\ConnectToPersistentSubscriptionOperation;
39
use Prooph\EventStoreClient\ClientOperations\VolatileSubscriptionOperation;
40
use Prooph\EventStoreClient\ConnectionSettings;
41
use Prooph\EventStoreClient\Internal\Message\CloseConnectionMessage;
42
use Prooph\EventStoreClient\Internal\Message\EstablishTcpConnectionMessage;
43
use Prooph\EventStoreClient\Internal\Message\HandleTcpPackageMessage;
44
use Prooph\EventStoreClient\Internal\Message\Message;
45
use Prooph\EventStoreClient\Internal\Message\StartConnectionMessage;
46
use Prooph\EventStoreClient\Internal\Message\StartOperationMessage;
47
use Prooph\EventStoreClient\Internal\Message\StartPersistentSubscriptionMessage;
48
use Prooph\EventStoreClient\Internal\Message\StartSubscriptionMessage;
49
use Prooph\EventStoreClient\Internal\Message\TcpConnectionClosedMessage;
50
use Prooph\EventStoreClient\Internal\Message\TcpConnectionErrorMessage;
51
use Prooph\EventStoreClient\Internal\Message\TcpConnectionEstablishedMessage;
52
use Prooph\EventStoreClient\Messages\ClientMessages\IdentifyClient;
53
use Prooph\EventStoreClient\SystemData\InspectionDecision;
54
use Prooph\EventStoreClient\SystemData\TcpCommand;
55
use Prooph\EventStoreClient\SystemData\TcpFlags;
56
use Prooph\EventStoreClient\SystemData\TcpPackage;
57
use Prooph\EventStoreClient\Transport\Tcp\TcpPackageConnection;
58
use Revolt\EventLoop;
59
use Throwable;
60

61
/** @internal */
62
class EventStoreConnectionLogicHandler
63
{
64
    private const ClientVersion = 1;
65

66
    private readonly EventStoreConnection $esConnection;
67

68
    private ?TcpPackageConnection $connection = null;
69

70
    private readonly ConnectionSettings $settings;
71

72
    private ConnectionState $state;
73

74
    private ConnectingPhase $connectingPhase;
75

76
    /** @psalm-suppress PropertyNotSetInConstructor */
77
    private EndPointDiscoverer $endPointDiscoverer;
78

79
    private readonly MessageHandler $handler;
80

81
    private readonly OperationsManager $operations;
82

83
    private readonly SubscriptionsManager $subscriptions;
84

85
    private readonly EventHandler $eventHandler;
86

87
    private StopWatch $stopWatch;
88

89
    private string $timerTickWatcherId = '';
90

91
    private ?ReconnectionInfo $reconnInfo = null;
92

93
    /** @psalm-suppress PropertyNotSetInConstructor */
94
    private HeartbeatInfo $heartbeatInfo;
95

96
    /** @psalm-suppress PropertyNotSetInConstructor */
97
    private AuthInfo $authInfo;
98

99
    /** @psalm-suppress PropertyNotSetInConstructor */
100
    private IdentifyInfo $identityInfo;
101

102
    private bool $wasConnected = false;
103

104
    private int $packageNumber = 0;
105

106
    private float $lastTimeoutsTimeStamp;
107

108
    public function __construct(EventStoreConnection $connection, ConnectionSettings $settings)
109
    {
110
        $this->esConnection = $connection;
420✔
111
        $this->settings = $settings;
420✔
112
        $this->state = ConnectionState::Init;
420✔
113
        $this->connectingPhase = ConnectingPhase::Invalid;
420✔
114
        $this->handler = new MessageHandler();
420✔
115
        $this->operations = new OperationsManager($connection->connectionName(), $settings);
420✔
116
        $this->subscriptions = new SubscriptionsManager($connection->connectionName(), $settings);
420✔
117
        $this->eventHandler = new EventHandler();
420✔
118
        $this->stopWatch = StopWatch::startNew();
420✔
119
        // this allows first connection to connect quick
120
        $this->lastTimeoutsTimeStamp = -$this->settings->operationTimeoutCheckPeriod();
420✔
121

122
        $this->handler->registerHandler(
420✔
123
            StartConnectionMessage::class,
420✔
124
            function (StartConnectionMessage $message): void {
420✔
125
                $this->startConnection($message->deferred(), $message->endPointDiscoverer());
405✔
126
            }
420✔
127
        );
420✔
128
        $this->handler->registerHandler(
420✔
129
            CloseConnectionMessage::class,
420✔
130
            function (CloseConnectionMessage $message): void {
420✔
131
                $this->closeConnection($message->reason(), $message->exception());
408✔
132
            }
420✔
133
        );
420✔
134

135
        $this->handler->registerHandler(
420✔
136
            StartOperationMessage::class,
420✔
137
            function (StartOperationMessage $message): void {
420✔
138
                $this->startOperation($message->operation(), $message->maxRetries(), $message->timeout());
395✔
139
            }
420✔
140
        );
420✔
141
        $this->handler->registerHandler(
420✔
142
            StartSubscriptionMessage::class,
420✔
143
            function (StartSubscriptionMessage $message): void {
420✔
144
                $this->startSubscription($message);
54✔
145
            }
420✔
146
        );
420✔
147
        $this->handler->registerHandler(
420✔
148
            StartPersistentSubscriptionMessage::class,
420✔
149
            function (StartPersistentSubscriptionMessage $message): void {
420✔
150
                $this->startPersistentSubscription($message);
34✔
151
            }
420✔
152
        );
420✔
153

154
        $this->handler->registerHandler(
420✔
155
            EstablishTcpConnectionMessage::class,
420✔
156
            function (EstablishTcpConnectionMessage $message): void {
420✔
157
                $this->establishTcpConnection($message->deferred(), $message->nodeEndPoints());
405✔
158
            }
420✔
159
        );
420✔
160
        $this->handler->registerHandler(
420✔
161
            TcpConnectionEstablishedMessage::class,
420✔
162
            function (TcpConnectionEstablishedMessage $message): void {
420✔
163
                $this->tcpConnectionEstablished($message->tcpPackageConnection());
405✔
164
            }
420✔
165
        );
420✔
166
        $this->handler->registerHandler(
420✔
167
            TcpConnectionErrorMessage::class,
420✔
168
            function (TcpConnectionErrorMessage $message): void {
420✔
169
                $this->tcpConnectionError($message->tcpPackageConnection(), $message->exception());
×
170
            }
420✔
171
        );
420✔
172
        $this->handler->registerHandler(
420✔
173
            TcpConnectionClosedMessage::class,
420✔
174
            function (TcpConnectionClosedMessage $message): void {
420✔
175
                $this->tcpConnectionClosed($message->tcpPackageConnection(), $message->exception());
19✔
176
            }
420✔
177
        );
420✔
178
        $this->handler->registerHandler(
420✔
179
            HandleTcpPackageMessage::class,
420✔
180
            function (HandleTcpPackageMessage $message): void {
420✔
181
                $this->handleTcpPackage($message->tcpPackageConnection(), $message->tcpPackage());
399✔
182
            }
420✔
183
        );
420✔
184
    }
185

186
    public function totalOperationCount(): int
187
    {
188
        return $this->operations->totalOperationCount();
395✔
189
    }
190

191
    public function enqueueMessage(Message $message): void
192
    {
193
        $this->logDebug(\sprintf('enqueuing message %s', (string) $message));
408✔
194

195
        $this->handler->handle($message);
408✔
196
    }
197

198
    private function startConnection(DeferredFuture $deferred, EndPointDiscoverer $endPointDiscoverer): void
199
    {
200
        $this->logDebug('startConnection');
405✔
201

202
        switch ($this->state) {
405✔
203
            case ConnectionState::Init:
405✔
204
                $this->timerTickWatcherId = EventLoop::repeat(Consts::TimerPeriod, function (): void {
405✔
205
                    $this->timerTick();
16✔
206
                });
405✔
207
                EventLoop::unreference($this->timerTickWatcherId);
405✔
208

209
                $this->endPointDiscoverer = $endPointDiscoverer;
405✔
210
                $this->state = ConnectionState::Connecting;
405✔
211
                $this->connectingPhase = ConnectingPhase::Reconnecting;
405✔
212
                $this->discoverEndPoint($deferred);
405✔
213

214
                break;
405✔
215
            case ConnectionState::Connecting:
×
216
            case ConnectionState::Connected:
×
217
                $deferred->error(new InvalidOperationException(\sprintf(
×
218
                    'EventStoreNodeConnection \'%s\' is already active',
×
219
                    $this->esConnection->connectionName()
×
220
                )));
×
221

222
                break;
×
223
            case ConnectionState::Closed:
×
224
                $deferred->error(new ObjectDisposed(\sprintf(
×
225
                    'EventStoreNodeConnection \'%s\' is closed',
×
226
                    $this->esConnection->connectionName()
×
227
                )));
×
228

229
                break;
×
230
        }
231
    }
232

233
    private function discoverEndPoint(?DeferredFuture $deferred): void
234
    {
235
        $this->logDebug('discoverEndPoint');
405✔
236

237
        if ($this->state !== ConnectionState::Connecting
405✔
238
            || $this->connectingPhase !== ConnectingPhase::Reconnecting
405✔
239
        ) {
240
            return;
×
241
        }
242

243
        $this->connectingPhase = ConnectingPhase::EndPointDiscovery;
405✔
244

245
        try {
246
            $endpoints = $this->endPointDiscoverer->discover($this->connection?->remoteEndPoint());
405✔
247
        } catch (Throwable $e) {
×
248
            $this->enqueueMessage(new CloseConnectionMessage(
×
249
                'Failed to resolve TCP end point to which to connect',
×
250
                $e
×
251
            ));
×
252

253
            $deferred?->error(new CannotEstablishConnection('Cannot resolve target end point'));
×
254

255
            return;
×
256
        }
257

258
        $this->enqueueMessage(new EstablishTcpConnectionMessage($deferred, $endpoints));
405✔
259
    }
260

261
    /** @throws Exception */
262
    private function closeConnection(string $reason, ?Throwable $exception = null): void
263
    {
264
        if ($this->timerTickWatcherId) {
408✔
265
            EventLoop::cancel($this->timerTickWatcherId);
405✔
266
        }
267

268
        if ($this->state === ConnectionState::Closed) {
408✔
269
            if ($exception) {
2✔
270
                $this->logDebug('CloseConnection IGNORED because is ESConnection is CLOSED, reason %s, exception %s', $reason, $exception->getMessage());
×
271
            } else {
272
                $this->logDebug('CloseConnection IGNORED because is ESConnection is CLOSED, reason %s', $reason);
2✔
273
            }
274

275
            return;
2✔
276
        }
277

278
        $this->logDebug('CloseConnection, reason %s, exception %s', $reason, $exception ? $exception->getMessage() : '<none>');
408✔
279

280
        $this->state = ConnectionState::Closed;
408✔
281

282
        $this->operations->cleanUp();
408✔
283
        $this->subscriptions->cleanUp();
408✔
284
        $this->closeTcpConnection();
408✔
285

286
        $this->logInfo('Closed. Reason: %s', $reason);
408✔
287

288
        if (null !== $exception) {
408✔
289
            $this->raiseErrorOccurred($exception);
×
290
        }
291

292
        $this->raiseClosed($reason);
408✔
293
    }
294

295
    /** @throws \Exception */
296
    private function establishTcpConnection(?DeferredFuture $deferred, NodeEndPoints $endPoints): void
297
    {
298
        $endPoint = $this->settings->useSslConnection()
405✔
299
            ? $endPoints->secureTcpEndPoint() ?? $endPoints->tcpEndPoint()
×
300
            : $endPoints->tcpEndPoint();
405✔
301

302
        if (null === $endPoint) {
405✔
303
            $this->closeConnection('No end point to node specified');
×
304

305
            $deferred?->complete();
×
306

307
            return;
×
308
        }
309

310
        $this->logDebug('EstablishTcpConnection to [%s]', (string) $endPoint);
405✔
311

312
        if ($this->state !== ConnectionState::Connecting
405✔
313
            || $this->connectingPhase !== ConnectingPhase::EndPointDiscovery
405✔
314
        ) {
315
            $deferred?->complete();
×
316

317
            return;
×
318
        }
319

320
        $this->connectingPhase = ConnectingPhase::ConnectionEstablishing;
405✔
321

322
        $this->connection = new TcpPackageConnection(
405✔
323
            $this->settings->log(),
405✔
324
            $endPoint,
405✔
325
            Guid::generateAsHex(),
405✔
326
            $this->settings->useSslConnection(),
405✔
327
            $this->settings->targetHost(),
405✔
328
            $this->settings->validateServer(),
405✔
329
            $this->settings->clientConnectionTimeout(),
405✔
330
            function (TcpPackageConnection $connection, TcpPackage $package): void {
405✔
331
                $this->enqueueMessage(new HandleTcpPackageMessage($connection, $package));
399✔
332
            },
405✔
333
            function (TcpPackageConnection $connection, Throwable $exception): void {
405✔
334
                $this->enqueueMessage(new TcpConnectionErrorMessage($connection, $exception));
×
335
            },
405✔
336
            function (TcpPackageConnection $connection): void {
405✔
337
                $this->enqueueMessage(new TcpConnectionEstablishedMessage($connection));
405✔
338
            },
405✔
339
            function (TcpPackageConnection $connection, Throwable $exception): void {
405✔
340
                $this->enqueueMessage(new TcpConnectionClosedMessage($connection, $exception));
19✔
341
            }
405✔
342
        );
405✔
343

344
        try {
345
            $this->connection->connect();
405✔
346
        } catch (Throwable $e) {
×
347
            $deferred?->error($e);
×
348

349
            return;
×
350
        }
351

352
        if (! $this->connection->isClosed()) {
405✔
353
            $this->connection->startReceiving();
405✔
354
        }
355

356
        if (! $deferred?->isComplete()) {
405✔
357
            $deferred?->complete();
405✔
358
        }
359
    }
360

361
    /** @throws \Exception */
362
    public function tcpConnectionError(TcpPackageConnection $tcpPackageConnection, Throwable $exception): void
363
    {
364
        if ($this->connection !== $tcpPackageConnection
×
365
            || $this->state === ConnectionState::Closed
×
366
        ) {
367
            return;
×
368
        }
369

370
        /** @psalm-suppress PossiblyNullReference */
371
        $this->logDebug('TcpConnectionError connId %s, exception %s', $this->connection->connectionId(), $exception->getMessage());
×
372
        $this->closeConnection('TCP connection error occurred', $exception);
×
373
    }
374

375
    /** @throws \Exception */
376
    private function closeTcpConnection(): void
377
    {
378
        if (null === $this->connection) {
408✔
379
            $this->logDebug('CloseTcpConnection IGNORED because connection === null');
3✔
380

381
            return;
3✔
382
        }
383

384
        $this->logDebug('CloseTcpConnection');
405✔
385
        $this->connection->close();
405✔
386

387
        $this->tcpConnectionClosed($this->connection);
405✔
388

389
        $this->connection = null;
405✔
390
    }
391

392
    /** @throws \Exception */
393
    private function tcpConnectionClosed(TcpPackageConnection $connection): void
394
    {
395
        if ($this->state === ConnectionState::Init) {
405✔
396
            throw new \Exception();
×
397
        }
398

399
        if ($this->connection !== $connection
405✔
400
            || $this->state === ConnectionState::Closed
405✔
401
        ) {
402
            $this->logDebug(
405✔
403
                'IGNORED (state: %s, internal conn.ID: {1:B}, conn.ID: %s): TCP connection to [%s] closed',
405✔
404
                $this->state->name,
405✔
405
                null === $this->connection ? Guid::empty() : $this->connection->connectionId(),
405✔
406
                $connection->connectionId(),
405✔
407
                (string) $connection->remoteEndPoint()
405✔
408
            );
405✔
409

410
            return;
405✔
411
        }
412

413
        $this->state = ConnectionState::Connecting;
×
414
        $this->connectingPhase = ConnectingPhase::Reconnecting;
×
415

416
        $this->logDebug(
×
417
            'TCP connection to [%s, %s] closed',
×
418
            (string) $connection->remoteEndPoint(),
×
419
            $connection->connectionId()
×
420
        );
×
421

422
        /** @psalm-suppress PossiblyNullReference */
423
        $this->subscriptions->purgeSubscribedAndDroppedSubscriptions($this->connection->connectionId());
×
424

425
        if (null === $this->reconnInfo) {
×
426
            $this->reconnInfo = new ReconnectionInfo(0, $this->stopWatch->elapsed());
×
427
        } else {
428
            $this->reconnInfo = new ReconnectionInfo($this->reconnInfo->reconnectionAttempt(), $this->stopWatch->elapsed());
×
429
        }
430

431
        if ($this->wasConnected) {
×
432
            $this->wasConnected = false;
×
433
            $this->raiseDisconnected($connection->remoteEndPoint());
×
434
        }
435
    }
436

437
    private function tcpConnectionEstablished(TcpPackageConnection $connection): void
438
    {
439
        /** @psalm-suppress PossiblyNullReference */
440
        if ($this->state !== ConnectionState::Connecting
405✔
441
            || $this->connection !== $connection
405✔
442
            || $this->connection->isClosed()
405✔
443
        ) {
444
            $this->logDebug(
×
445
                'IGNORED (state %s, internal conn.Id %s, conn.Id %s, conn.closed %s): TCP connection to [%s] established',
×
446
                $this->state->name,
×
447
                null === $this->connection ? Guid::empty() : $this->connection->connectionId(),
×
448
                $connection->connectionId(),
×
449
                $connection->isClosed() ? 'yes' : 'no',
×
450
                (string) $connection->remoteEndPoint()
×
451
            );
×
452

453
            return;
×
454
        }
455

456
        $this->logDebug(
405✔
457
            'TCP connection to [%s, %s] established',
405✔
458
            (string) $connection->remoteEndPoint(),
405✔
459
            $connection->connectionId()
405✔
460
        );
405✔
461
        $elapsed = $this->stopWatch->elapsed();
405✔
462

463
        $this->heartbeatInfo = new HeartbeatInfo($this->packageNumber, true, $elapsed);
405✔
464

465
        if ($this->settings->defaultUserCredentials() !== null) {
405✔
466
            $this->connectingPhase = ConnectingPhase::Authentication;
135✔
467

468
            $this->authInfo = new AuthInfo(Guid::generateAsHex(), $elapsed);
135✔
469

470
            $login = null;
135✔
471
            $pass = null;
135✔
472

473
            if ($this->settings->defaultUserCredentials()) {
135✔
474
                $login = $this->settings->defaultUserCredentials()->username();
135✔
475
                $pass = $this->settings->defaultUserCredentials()->password();
135✔
476
            }
477

478
            /** @psalm-suppress PossiblyNullReference */
479
            $this->connection->enqueueSend(new TcpPackage(
135✔
480
                TcpCommand::Authenticate,
135✔
481
                TcpFlags::Authenticated,
135✔
482
                $this->authInfo->correlationId(),
135✔
483
                '',
135✔
484
                $login,
135✔
485
                $pass
135✔
486
            ));
135✔
487
        } else {
488
            $this->goToIdentifyState();
402✔
489
        }
490
    }
491

492
    private function goToIdentifyState(): void
493
    {
494
        $this->connectingPhase = ConnectingPhase::Identification;
405✔
495
        $this->identityInfo = new IdentifyInfo(Guid::generateAsHex(), $this->stopWatch->elapsed());
405✔
496

497
        $message = new IdentifyClient();
405✔
498
        $message->setVersion(self::ClientVersion);
405✔
499
        $message->setConnectionName($this->esConnection->connectionName());
405✔
500

501
        /** @psalm-suppress PossiblyNullReference */
502
        $this->connection->enqueueSend(new TcpPackage(
405✔
503
            TcpCommand::IdentifyClient,
405✔
504
            TcpFlags::None,
405✔
505
            $this->identityInfo->correlationId(),
405✔
506
            $message->serializeToString()
405✔
507
        ));
405✔
508
    }
509

510
    private function goToConnectedState(): void
511
    {
512
        $this->state = ConnectionState::Connected;
399✔
513
        $this->connectingPhase = ConnectingPhase::Connected;
399✔
514
        $this->wasConnected = true;
399✔
515

516
        /** @psalm-suppress PossiblyNullReference */
517
        $this->raiseConnectedEvent($this->connection->remoteEndPoint());
399✔
518

519
        if ($this->stopWatch->elapsed() - $this->lastTimeoutsTimeStamp >= $this->settings->operationTimeoutCheckPeriod()) {
399✔
520
            $this->operations->checkTimeoutsAndRetry($this->connection);
399✔
521
            $this->subscriptions->checkTimeoutsAndRetry($this->connection);
399✔
522
            $this->lastTimeoutsTimeStamp = $this->stopWatch->elapsed();
399✔
523
        }
524
    }
525

526
    /** @throws Exception */
527
    private function timerTick(): void
528
    {
529
        $elapsed = $this->stopWatch->elapsed();
16✔
530

531
        switch ($this->state) {
16✔
532
            case ConnectionState::Init:
16✔
533
            case ConnectionState::Closed:
16✔
534
                break;
×
535
            case ConnectionState::Connecting:
16✔
536
                /** @psalm-suppress PossiblyNullReference */
537
                if ($this->connectingPhase === ConnectingPhase::Reconnecting
×
538
                    && $elapsed - $this->reconnInfo->timestamp() >= $this->settings->reconnectionDelay()
×
539
                ) {
540
                    $this->logDebug('TimerTick checking reconnection...');
×
541

542
                    $this->reconnInfo = new ReconnectionInfo($this->reconnInfo->reconnectionAttempt() + 1, $this->stopWatch->elapsed());
×
543

544
                    $maxReconnections = $this->settings->maxReconnections();
×
545

546
                    if ($maxReconnections >= 0 && $this->reconnInfo->reconnectionAttempt() > $maxReconnections) {
×
547
                        $this->closeConnection('Reconnection limit reached');
×
548
                    } else {
549
                        $this->raiseReconnecting();
×
550
                        $this->discoverEndPoint(null);
×
551
                    }
552
                }
553

554
                if ($this->connectingPhase === ConnectingPhase::Authentication
×
555
                    && $elapsed - $this->authInfo->timestamp() >= $this->settings->operationTimeout()
×
556
                ) {
557
                    $this->raiseAuthenticationFailed('Authentication timed out');
×
558
                    $this->goToIdentifyState();
×
559
                }
560

561
                if ($this->connectingPhase === ConnectingPhase::Identification
×
562
                    && $elapsed - $this->identityInfo->timestamp() >= $this->settings->operationTimeout()
×
563
                ) {
564
                    $this->logDebug('Timed out waiting for client to be identified');
×
565
                    $this->closeTcpConnection();
×
566
                }
567

568
                if ($this->connectingPhase->value > ConnectingPhase::ConnectionEstablishing->value) {
×
569
                    $this->manageHeartbeats();
×
570
                }
571

572
                break;
×
573
            case ConnectionState::Connected:
16✔
574
                /** @psalm-suppress PossiblyNullArgument */
575
                if ($elapsed - $this->lastTimeoutsTimeStamp >= $this->settings->operationTimeoutCheckPeriod()) {
16✔
576
                    $this->reconnInfo = new ReconnectionInfo(0, $elapsed);
7✔
577
                    $this->operations->checkTimeoutsAndRetry($this->connection);
7✔
578
                    $this->subscriptions->checkTimeoutsAndRetry($this->connection);
7✔
579
                    $this->lastTimeoutsTimeStamp = $elapsed;
7✔
580
                }
581

582
                $this->manageHeartbeats();
16✔
583

584
                break;
16✔
585
        }
586
    }
587

588
    /** @throws Exception */
589
    private function manageHeartbeats(): void
590
    {
591
        if (null === $this->connection) {
16✔
592
            throw new \Exception('Cannot manage heartbeats when no connection available');
×
593
        }
594

595
        $timeout = $this->heartbeatInfo->isIntervalStage() ? $this->settings->heartbeatInterval() : $this->settings->heartbeatTimeout();
16✔
596

597
        $elapsed = $this->stopWatch->elapsed();
16✔
598

599
        if ($elapsed - $this->heartbeatInfo->timestamp() < $timeout) {
16✔
600
            return;
16✔
601
        }
602

603
        $packageNumber = $this->packageNumber;
7✔
604

605
        if ($this->heartbeatInfo->lastPackageNumber() !== $packageNumber) {
7✔
606
            $this->heartbeatInfo = new HeartbeatInfo($packageNumber, true, $elapsed);
7✔
607

608
            return;
7✔
609
        }
610

UNCOV
611
        if ($this->heartbeatInfo->isIntervalStage()) {
×
UNCOV
612
            $this->connection->enqueueSend(new TcpPackage(
×
UNCOV
613
                TcpCommand::HeartbeatRequestCommand,
×
UNCOV
614
                TcpFlags::None,
×
UNCOV
615
                Guid::generateAsHex()
×
UNCOV
616
            ));
×
617

UNCOV
618
            $this->heartbeatInfo = new HeartbeatInfo($this->heartbeatInfo->lastPackageNumber(), false, $elapsed);
×
619
        } else {
620
            $msg = \sprintf(
×
621
                'EventStoreNodeConnection \'%s\': closing TCP connection [%s, %s] due to HEARTBEAT TIMEOUT at pkgNum %s',
×
622
                $this->esConnection->connectionName(),
×
623
                (string) $this->connection->remoteEndPoint(),
×
624
                $this->connection->connectionId(),
×
625
                $this->packageNumber
×
626
            );
×
627

628
            $this->settings->log()->info($msg);
×
629
            $this->closeTcpConnection();
×
630
        }
631
    }
632

633
    private function startOperation(ClientOperation $operation, int $maxRetries, float $timeout): void
634
    {
635
        switch ($this->state) {
395✔
636
            case ConnectionState::Init:
395✔
637
                $operation->fail(new InvalidOperationException(
1✔
638
                    \sprintf(
1✔
639
                        'EventStoreNodeConnection \'%s\' is not active',
1✔
640
                        $this->esConnection->connectionName()
1✔
641
                    )
1✔
642
                ));
1✔
643

644
                break;
1✔
645
            case ConnectionState::Connecting:
394✔
646
                $this->logDebug(
390✔
647
                    'StartOperation enqueue %s, %s, %s, %s',
390✔
648
                    $operation->name(),
390✔
649
                    (string) $operation,
390✔
650
                    (string) $maxRetries,
390✔
651
                    (string) $timeout
390✔
652
                );
390✔
653
                $this->operations->enqueueOperation(new OperationItem($operation, $maxRetries, $timeout));
390✔
654

655
                break;
390✔
656
            case ConnectionState::Connected:
364✔
657
                $this->logDebug(
364✔
658
                    'StartOperation schedule %s, %s, %s, %s',
364✔
659
                    $operation->name(),
364✔
660
                    (string) $operation,
364✔
661
                    (string) $maxRetries,
364✔
662
                    (string) $timeout
364✔
663
                );
364✔
664
                /** @psalm-suppress PossiblyNullArgument */
665
                $this->operations->scheduleOperation(new OperationItem($operation, $maxRetries, $timeout), $this->connection);
364✔
666

667
                break;
364✔
668
            case ConnectionState::Closed:
×
669
                $operation->fail(new ObjectDisposed(\sprintf(
×
670
                    'EventStoreNodeConnection \'%s\' is closed',
×
671
                    $this->esConnection->connectionName()
×
672
                )));
×
673

674
                break;
×
675
        }
676
    }
677

678
    private function startSubscription(StartSubscriptionMessage $message): void
679
    {
680
        switch ($this->state) {
54✔
681
            case ConnectionState::Init:
54✔
682
                $message->deferred()->error(new InvalidOperationException(\sprintf(
1✔
683
                    'EventStoreNodeConnection \'%s\' is not active',
1✔
684
                    $this->esConnection->connectionName()
1✔
685
                )));
1✔
686

687
                break;
1✔
688
            case ConnectionState::Connecting:
53✔
689
            case ConnectionState::Connected:
34✔
690
                $operation = new VolatileSubscriptionOperation(
54✔
691
                    $this->settings->log(),
54✔
692
                    $message->deferred(),
54✔
693
                    $message->streamId(),
54✔
694
                    $message->resolveTo(),
54✔
695
                    $message->userCredentials(),
54✔
696
                    function (EventStoreSubscription $subscription, ResolvedEvent $resolvedEvent) use ($message): void {
54✔
697
                        ($message->eventAppeared())($subscription, $resolvedEvent);
17✔
698
                    },
54✔
699
                    function (EventStoreSubscription $subscription, SubscriptionDropReason $reason, ?Throwable $exception = null) use ($message): void {
54✔
700
                        $subscriptionDroppedHandler = $message->subscriptionDropped();
40✔
701

702
                        if (null !== $subscriptionDroppedHandler) {
40✔
703
                            $subscriptionDroppedHandler($subscription, $reason, $exception);
13✔
704
                        }
705
                    },
54✔
706
                    $this->settings->verboseLogging(),
54✔
707
                    fn (): ?TcpPackageConnection => $this->connection
54✔
708
                );
54✔
709

710
                $this->logDebug(
53✔
711
                    'StartSubscription %s %s, %s, MaxRetries: %d, Timeout: %d',
53✔
712
                    $this->state === ConnectionState::Connected ? 'fire' : 'enqueue',
53✔
713
                    $operation->name(),
53✔
714
                    (string) $operation,
53✔
715
                    (string) $message->maxRetries(),
53✔
716
                    (string) $message->timeout()
53✔
717
                );
53✔
718

719
                $subscription = new SubscriptionItem($operation, $message->maxRetries(), $message->timeout());
53✔
720

721
                if ($this->state === ConnectionState::Connecting) {
53✔
722
                    $this->subscriptions->enqueueSubscription($subscription);
22✔
723
                } else {
724
                    /** @psalm-suppress PossiblyNullArgument */
725
                    $this->subscriptions->startSubscription($subscription, $this->connection);
34✔
726
                }
727

728
                break;
53✔
729
            case ConnectionState::Closed:
×
730
                $message->deferred()->error(new ObjectDisposed(\sprintf(
×
731
                    'EventStoreNodeConnection \'%s\' is closed',
×
732
                    $this->esConnection->connectionName()
×
733
                )));
×
734

735
                break;
×
736
        }
737
    }
738

739
    private function startPersistentSubscription(StartPersistentSubscriptionMessage $message): void
740
    {
741
        switch ($this->state) {
34✔
742
            case ConnectionState::Init:
34✔
743
                $message->deferred()->error(new InvalidOperationException(\sprintf(
×
744
                    'EventStoreNodeConnection \'%s\' is not active',
×
745
                    $this->esConnection->connectionName()
×
746
                )));
×
747

748
                break;
×
749
            case ConnectionState::Connecting:
34✔
750
            case ConnectionState::Connected:
33✔
751
                $operation = new ConnectToPersistentSubscriptionOperation(
37✔
752
                    $this->settings->log(),
37✔
753
                    $message->deferred(),
37✔
754
                    $message->subscriptionId(),
37✔
755
                    $message->bufferSize(),
37✔
756
                    $message->streamId(),
37✔
757
                    $message->userCredentials(),
37✔
758
                    $message->eventAppeared(),
37✔
759
                    $message->subscriptionDropped(),
37✔
760
                    $this->settings->verboseLogging(),
37✔
761
                    fn (): ?TcpPackageConnection => $this->connection
37✔
762
                );
37✔
763

764
                $this->logDebug(
34✔
765
                    'StartSubscription %s %s, %s, MaxRetries: %d, Timeout: %d',
34✔
766
                    $this->state === ConnectionState::Connected ? 'fire' : 'enqueue',
34✔
767
                    $operation->name(),
34✔
768
                    (string) $operation,
34✔
769
                    (string) $message->maxRetries(),
34✔
770
                    (string) $message->timeout()
34✔
771
                );
34✔
772

773
                $subscription = new SubscriptionItem($operation, $message->maxRetries(), $message->timeout());
34✔
774

775
                if ($this->state === ConnectionState::Connecting) {
34✔
776
                    $this->subscriptions->enqueueSubscription($subscription);
1✔
777
                } else {
778
                    /** @psalm-suppress PossiblyNullArgument */
779
                    $this->subscriptions->startSubscription($subscription, $this->connection);
33✔
780
                }
781

782
                break;
34✔
783
            case ConnectionState::Closed:
×
784
                $message->deferred()->error(new ObjectDisposed(\sprintf(
×
785
                    'EventStoreNodeConnection \'%s\' is closed',
×
786
                    $this->esConnection->connectionName()
×
787
                )));
×
788

789
                break;
×
790
        }
791
    }
792

793
    /** @throws Exception */
794
    private function handleTcpPackage(TcpPackageConnection $connection, TcpPackage $package): void
795
    {
796
        if ($this->connection !== $connection
399✔
797
            || $this->state === ConnectionState::Closed
399✔
798
            || $this->state === ConnectionState::Init
399✔
799
        ) {
800
            $this->logDebug(
×
801
                'IGNORED: HandleTcpPackage connId %s, package %s, %s',
×
802
                $connection->connectionId(),
×
803
                $package->command()->name,
×
804
                $package->correlationId()
×
805
            );
×
806

807
            return;
×
808
        }
809

810
        /** @psalm-suppress PossiblyNullReference */
811
        $this->logDebug(
399✔
812
            'HandleTcpPackage connId %s, package %s, %s',
399✔
813
            $this->connection->connectionId(),
399✔
814
            $package->command()->name,
399✔
815
            $package->correlationId()
399✔
816
        );
399✔
817

818
        ++$this->packageNumber;
399✔
819

820
        if ($package->command() === TcpCommand::HeartbeatResponseCommand) {
399✔
UNCOV
821
            return;
×
822
        }
823

824
        if ($package->command() === TcpCommand::HeartbeatRequestCommand) {
399✔
825
            $this->connection->enqueueSend(new TcpPackage(
×
826
                TcpCommand::HeartbeatResponseCommand,
×
827
                TcpFlags::None,
×
828
                $package->correlationId()
×
829
            ));
×
830

831
            return;
×
832
        }
833

834
        if ($package->command() === TcpCommand::Authenticated
399✔
835
            || $package->command() === TcpCommand::NotAuthenticatedException
399✔
836
        ) {
837
            if ($this->state === ConnectionState::Connecting
135✔
838
                && $this->connectingPhase === ConnectingPhase::Authentication
135✔
839
                && $this->authInfo->correlationId() === $package->correlationId()
135✔
840
            ) {
841
                if ($package->command() === TcpCommand::NotAuthenticatedException) {
135✔
842
                    $this->raiseAuthenticationFailed('Not authenticated');
×
843
                }
844

845
                $this->goToIdentifyState();
135✔
846

847
                return;
135✔
848
            }
849
        }
850

851
        if ($package->command() === TcpCommand::ClientIdentified
399✔
852
            && $this->state === ConnectionState::Connecting
399✔
853
            && $this->identityInfo->correlationId() === $package->correlationId()
399✔
854
        ) {
855
            $this->goToConnectedState();
399✔
856

857
            return;
399✔
858
        }
859

860
        if ($package->command() === TcpCommand::BadRequest
396✔
861
            && $package->correlationId() === ''
396✔
862
        ) {
863
            $exception = new EventStoreConnectionException('Bad request received from server');
×
864
            $this->closeConnection('Connection-wide BadRequest received. Too dangerous to continue', $exception);
×
865

866
            return;
×
867
        }
868

869
        if ($operation = $this->operations->getActiveOperation($package->correlationId())) {
396✔
870
            $result = $operation->operation()->inspectPackage($package);
394✔
871

872
            $this->logDebug(
394✔
873
                'HandleTcpPackage OPERATION DECISION %s (%s), %s',
394✔
874
                $result->decision()->name,
394✔
875
                $result->description(),
394✔
876
                (string) $operation
394✔
877
            );
394✔
878

879
            switch ($result->decision()) {
394✔
880
                case InspectionDecision::DoNothing:
394✔
881
                    break;
×
882
                case InspectionDecision::EndOperation:
394✔
883
                    $this->operations->removeOperation($operation);
394✔
884

885
                    break;
394✔
886
                case InspectionDecision::Retry:
×
887
                    $this->operations->scheduleOperationRetry($operation);
×
888

889
                    break;
×
890
                case InspectionDecision::Reconnect:
×
891
                    $this->reconnectTo(new NodeEndPoints($result->tcpEndPoint(), $result->secureTcpEndPoint()));
×
892
                    $this->operations->scheduleOperationRetry($operation);
×
893

894
                    break;
×
895
            }
896

897
            if ($this->state === ConnectionState::Connected) {
394✔
898
                $this->operations->tryScheduleWaitingOperations($connection);
394✔
899
            }
900
        } elseif ($subscription = $this->subscriptions->getActiveSubscription($package->correlationId())) {
87✔
901
            $result = $subscription->operation()->inspectPackage($package);
87✔
902

903
            $this->logDebug(
87✔
904
                'HandleTcpPackage %s SUBSCRIPTION DECISION %s (%s), %s',
87✔
905
                $package->correlationId(),
87✔
906
                $result->decision()->name,
87✔
907
                $result->description(),
87✔
908
                (string) $operation
87✔
909
            );
87✔
910

911
            switch ($result->decision()) {
87✔
912
                case InspectionDecision::DoNothing:
87✔
913
                    break;
43✔
914
                case InspectionDecision::EndOperation:
87✔
915
                    $this->subscriptions->removeSubscription($subscription);
19✔
916

917
                    break;
19✔
918
                case InspectionDecision::Retry:
77✔
919
                    $this->subscriptions->scheduleSubscriptionRetry($subscription);
×
920

921
                    break;
×
922
                case InspectionDecision::Reconnect:
77✔
923
                    $this->reconnectTo(new NodeEndPoints($result->tcpEndPoint(), $result->secureTcpEndPoint()));
×
924
                    $this->subscriptions->scheduleSubscriptionRetry($subscription);
×
925

926
                    break;
×
927
                case InspectionDecision::Subscribed:
77✔
928
                    $subscription->setIsSubscribed(true);
77✔
929

930
                    break;
77✔
931
            }
932
        } else {
933
            $this->logDebug(
×
934
                'HandleTcpPackage UNMAPPED PACKAGE with CorrelationId %s, Command: %s',
×
935
                $package->correlationId(),
×
936
                $package->command()->name
×
937
            );
×
938
        }
939
    }
940

941
    /** @throws Exception */
942
    private function reconnectTo(NodeEndPoints $endPoints): void
943
    {
944
        $endPoint = $this->settings->useSslConnection()
×
945
            ? $endPoints->secureTcpEndPoint() ?? $endPoints->tcpEndPoint()
×
946
            : $endPoints->tcpEndPoint();
×
947

948
        if (null === $endPoint) {
×
949
            $this->closeConnection('No end point is specified while trying to reconnect');
×
950

951
            return;
×
952
        }
953

954
        /** @psalm-suppress PossiblyNullReference */
955
        if ($this->state !== ConnectionState::Connected
×
956
            || $this->connection->remoteEndPoint()->equals($endPoint)
×
957
        ) {
958
            return;
×
959
        }
960

961
        /** @psalm-suppress PossiblyNullReference */
962
        $msg = \sprintf(
×
963
            'EventStoreNodeConnection \'%s\': going to reconnect to [%s]. Current end point: [%s]',
×
964
            $this->esConnection->connectionName(),
×
965
            (string) $endPoint,
×
966
            (string) $this->connection->remoteEndPoint()
×
967
        );
×
968

969
        if ($this->settings->verboseLogging()) {
×
970
            $this->settings->log()->info($msg);
×
971
        }
972

973
        $this->closeTcpConnection();
×
974

975
        $this->state = ConnectionState::Connecting;
×
976
        $this->connectingPhase = ConnectingPhase::EndPointDiscovery;
×
977

978
        $this->establishTcpConnection(null, $endPoints);
×
979
    }
980

981
    private function logDebug(string $message, string ...$parameters): void
982
    {
983
        if ($this->settings->verboseLogging()) {
408✔
984
            $message = empty($parameters)
×
985
                ? $message
×
986
                : \sprintf($message, ...$parameters);
×
987

988
            $this->settings->log()->debug(\sprintf(
×
989
                'EventStoreNodeConnection \'%s\': %s',
×
990
                $this->esConnection->connectionName(),
×
991
                $message
×
992
            ));
×
993
        }
994
    }
995

996
    private function logInfo(string $message, string ...$parameters): void
997
    {
998
        if ($this->settings->verboseLogging()) {
408✔
999
            $message = empty($parameters)
×
1000
                ? $message
×
1001
                : \sprintf($message, ...$parameters);
×
1002

1003
            $this->settings->log()->info(\sprintf(
×
1004
                'EventStoreNodeConnection \'%s\': %s',
×
1005
                $this->esConnection->connectionName(),
×
1006
                $message
×
1007
            ));
×
1008
        }
1009
    }
1010

1011
    private function raiseConnectedEvent(EndPoint $remoteEndPoint): void
1012
    {
1013
        $this->eventHandler->connected(new ClientConnectionEventArgs($this->esConnection, $remoteEndPoint));
399✔
1014
    }
1015

1016
    private function raiseDisconnected(EndPoint $remoteEndPoint): void
1017
    {
1018
        $this->eventHandler->disconnected(new ClientConnectionEventArgs($this->esConnection, $remoteEndPoint));
×
1019
    }
1020

1021
    private function raiseErrorOccurred(Throwable $e): void
1022
    {
1023
        $this->eventHandler->errorOccurred(new ClientErrorEventArgs($this->esConnection, $e));
×
1024
    }
1025

1026
    private function raiseClosed(string $reason): void
1027
    {
1028
        $this->eventHandler->closed(new ClientClosedEventArgs($this->esConnection, $reason));
408✔
1029
    }
1030

1031
    private function raiseReconnecting(): void
1032
    {
1033
        $this->eventHandler->reconnecting(new ClientReconnectingEventArgs($this->esConnection));
×
1034
    }
1035

1036
    private function raiseAuthenticationFailed(string $reason): void
1037
    {
1038
        $this->eventHandler->authenticationFailed(new ClientAuthenticationFailedEventArgs($this->esConnection, $reason));
×
1039
    }
1040

1041
    public function onConnected(Closure $handler): ListenerHandler
1042
    {
1043
        return $this->eventHandler->whenConnected($handler);
13✔
1044
    }
1045

1046
    public function onDisconnected(Closure $handler): ListenerHandler
1047
    {
1048
        return $this->eventHandler->whenDisconnected($handler);
×
1049
    }
1050

1051
    public function onReconnecting(Closure $handler): ListenerHandler
1052
    {
1053
        return $this->eventHandler->whenReconnecting($handler);
×
1054
    }
1055

1056
    public function onClosed(Closure $handler): ListenerHandler
1057
    {
1058
        return $this->eventHandler->whenClosed($handler);
×
1059
    }
1060

1061
    public function onErrorOccurred(Closure $handler): ListenerHandler
1062
    {
1063
        return $this->eventHandler->whenErrorOccurred($handler);
×
1064
    }
1065

1066
    public function onAuthenticationFailed(Closure $handler): ListenerHandler
1067
    {
1068
        return $this->eventHandler->whenAuthenticationFailed($handler);
×
1069
    }
1070

1071
    public function detach(ListenerHandler $handler): void
1072
    {
1073
        $this->eventHandler->detach($handler);
10✔
1074
    }
1075
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc