• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

prooph / event-store-client / 9555551525

17 Jun 2024 10:16PM UTC coverage: 70.262% (-1.1%) from 71.395%
9555551525

push

github

prolic
update coveralls repo token

3466 of 4933 relevant lines covered (70.26%)

67.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.18
/src/ClientOperations/AbstractOperation.php
1
<?php
2

3
/**
4
 * This file is part of `prooph/event-store-client`.
5
 * (c) 2018-2024 Alexander Miertsch <kontakt@codeliner.ws>
6
 * (c) 2018-2024 Sascha-Oliver Prolic <saschaprolic@googlemail.com>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11

12
declare(strict_types=1);
13

14
namespace Prooph\EventStoreClient\ClientOperations;
15

16
use Amp\DeferredFuture;
17
use Amp\Future;
18
use Exception;
19
use Google\Protobuf\Internal\Message;
20
use Prooph\EventStore\EndPoint;
21
use Prooph\EventStore\Exception\NotAuthenticated;
22
use Prooph\EventStore\Exception\ServerError;
23
use Prooph\EventStore\Exception\UnexpectedCommand;
24
use Prooph\EventStore\UserCredentials;
25
use Prooph\EventStoreClient\Messages\ClientMessages\NotHandled;
26
use Prooph\EventStoreClient\Messages\ClientMessages\NotHandled\MasterInfo;
27
use Prooph\EventStoreClient\Messages\ClientMessages\NotHandled\NotHandledReason;
28
use Prooph\EventStoreClient\SystemData\InspectionDecision;
29
use Prooph\EventStoreClient\SystemData\InspectionResult;
30
use Prooph\EventStoreClient\SystemData\TcpCommand;
31
use Prooph\EventStoreClient\SystemData\TcpFlags;
32
use Prooph\EventStoreClient\SystemData\TcpPackage;
33
use Psr\Log\LoggerInterface as Logger;
34
use Throwable;
35

36
/**
37
 * @internal
38
 * @template TResponse of Message
39
 * @template TResult
40
 */
41
abstract class AbstractOperation implements ClientOperation
42
{
43
    /**
44
     * @param class-string<TResponse> $responseClassName
45
     */
46
    public function __construct(
47
        private readonly Logger $log,
48
        protected readonly DeferredFuture $deferred,
49
        protected readonly ?UserCredentials $credentials,
50
        private readonly TcpCommand $requestCommand, // we need generics
51
        private readonly TcpCommand $responseCommand,
52
        /** @var class-string<TResponse> */
53
        private readonly string $responseClassName
54
    ) {
55
    }
395✔
56

57
    abstract protected function createRequestDto(): Message;
58

59
    /** @param TResponse $response */
60
    abstract protected function inspectResponse(Message $response): InspectionResult;
61

62
    /**
63
     * @param TResponse $response
64
     * @return TResult
65
     */
66
    abstract protected function transformResponse(Message $response);
67

68
    public function future(): Future
69
    {
70
        return $this->deferred->getFuture();
×
71
    }
72

73
    public function createNetworkPackage(string $correlationId): TcpPackage
74
    {
75
        $login = null;
394✔
76
        $pass = null;
394✔
77

78
        if ($this->credentials) {
394✔
79
            $login = $this->credentials->username();
238✔
80
            $pass = $this->credentials->password();
238✔
81
        }
82

83
        return new TcpPackage(
394✔
84
            $this->requestCommand,
394✔
85
            $this->credentials ? TcpFlags::Authenticated : TcpFlags::None,
394✔
86
            $correlationId,
394✔
87
            $this->createRequestDto()->serializeToString(),
394✔
88
            $login,
394✔
89
            $pass
394✔
90
        );
394✔
91
    }
92

93
    public function inspectPackage(TcpPackage $package): InspectionResult
94
    {
95
        if ($package->command() === $this->responseCommand) {
394✔
96
            $responseMessage = new $this->responseClassName();
394✔
97
            $responseMessage->mergeFromString($package->data());
394✔
98

99
            return $this->inspectResponse($responseMessage);
394✔
100
        }
101

102
        return match ($package->command()) {
13✔
103
            TcpCommand::NotAuthenticatedException => $this->inspectNotAuthenticated($package),
13✔
104
            TcpCommand::BadRequest => $this->inspectBadRequest($package),
13✔
105
            TcpCommand::NotHandled => $this->inspectNotHandled($package),
13✔
106
            default => $this->inspectUnexpectedCommand($package, $this->responseCommand),
13✔
107
        };
13✔
108
    }
109

110
    protected function succeed(Message $response): void
111
    {
112
        try {
113
            $result = $this->transformResponse($response);
384✔
114
        } catch (Exception $e) {
×
115
            $this->deferred->error($e);
×
116

117
            return;
×
118
        }
119

120
        $this->deferred->complete($result);
384✔
121
    }
122

123
    public function fail(Throwable $exception): void
124
    {
125
        $this->deferred->error($exception);
84✔
126
    }
127

128
    private function inspectNotAuthenticated(TcpPackage $package): InspectionResult
129
    {
130
        $this->fail(new NotAuthenticated());
13✔
131

132
        return new InspectionResult(InspectionDecision::EndOperation, 'Not authenticated');
13✔
133
    }
134

135
    private function inspectBadRequest(TcpPackage $package): InspectionResult
136
    {
137
        $this->fail(new ServerError());
×
138

139
        return new InspectionResult(InspectionDecision::EndOperation, 'Bad request');
×
140
    }
141

142
    private function inspectNotHandled(TcpPackage $package): InspectionResult
143
    {
144
        $message = new NotHandled();
×
145
        $message->mergeFromString($package->data());
×
146

147
        switch ($message->getReason()) {
×
148
            case NotHandledReason::NotReady:
×
149
                return new InspectionResult(InspectionDecision::Retry, 'Not handled: not ready');
×
150
            case NotHandledReason::TooBusy:
×
151
                return new InspectionResult(InspectionDecision::Retry, 'Not handled: too busy');
×
152
            case NotHandledReason::NotMaster:
×
153
                $masterInfo = new MasterInfo();
×
154
                $masterInfo->mergeFromString($message->getAdditionalInfo());
×
155

156
                return new InspectionResult(
×
157
                    InspectionDecision::Reconnect,
×
158
                    'Not handled: not master',
×
159
                    new EndPoint(
×
160
                        $masterInfo->getExternalTcpAddress(),
×
161
                        $masterInfo->getExternalTcpPort()
×
162
                    ),
×
163
                    new EndPoint(
×
164
                        $masterInfo->getExternalSecureTcpAddress(),
×
165
                        $masterInfo->getExternalSecureTcpPort()
×
166
                    )
×
167
                );
×
168
            default:
169
                $this->log->error('Unknown NotHandledReason: ' . $message->getReason());
×
170

171
                return new InspectionResult(InspectionDecision::Retry, 'Not handled: unknown');
×
172
        }
173
    }
174

175
    private function inspectUnexpectedCommand(TcpPackage $package, TcpCommand $expectedCommand): InspectionResult
176
    {
177
        $this->log->error('Unexpected TcpCommand received');
×
178
        $this->log->error(\sprintf(
×
179
            'Expected: %s, Actual: %s, Flags: %s, CorrelationId: %s',
×
180
            $expectedCommand->name,
×
181
            $package->command()->name,
×
182
            $package->flags()->name,
×
183
            $package->correlationId()
×
184
        ));
×
185
        $this->log->error(\sprintf(
×
186
            'Operation (%s)',
×
187
            \get_class($this)
×
188
        ));
×
189
        $this->log->error('TcpPackage Data Dump (base64):');
×
190

191
        if (empty($package->data())) {
×
192
            $this->log->error('--- NO DATA ---');
×
193
        } else {
194
            $this->log->error(\base64_encode($package->data()));
×
195
        }
196

197
        $exception = UnexpectedCommand::with($package->command(), $expectedCommand);
×
198
        $this->fail($exception);
×
199

200
        return new InspectionResult(InspectionDecision::EndOperation, $exception->getMessage());
×
201
    }
202
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc