• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

prooph / event-store-client / 9555551525

17 Jun 2024 10:16PM UTC coverage: 70.262% (-1.1%) from 71.395%
9555551525

push

github

prolic
update coveralls repo token

3466 of 4933 relevant lines covered (70.26%)

67.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

2.59
/src/Internal/ClusterDnsEndPointDiscoverer.php
1
<?php
2

3
/**
4
 * This file is part of `prooph/event-store-client`.
5
 * (c) 2018-2024 Alexander Miertsch <kontakt@codeliner.ws>
6
 * (c) 2018-2024 Sascha-Oliver Prolic <saschaprolic@googlemail.com>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11

12
declare(strict_types=1);
13

14
namespace Prooph\EventStoreClient\Internal;
15

16
use function Amp\delay;
17

18
use Amp\Http\Client\HttpClient;
19
use Amp\Http\Client\HttpClientBuilder;
20
use Amp\Http\Client\Interceptor\SetRequestTimeout;
21
use Amp\Http\Client\Request;
22
use Exception;
23
use Prooph\EventStore\EndPoint;
24
use Prooph\EventStore\Util\Json;
25
use Prooph\EventStoreClient\Exception\ClusterException;
26
use Prooph\EventStoreClient\GossipSeed;
27
use Prooph\EventStoreClient\Messages\ClusterMessages\ClusterInfoDto;
28
use Prooph\EventStoreClient\Messages\ClusterMessages\MemberInfoDto;
29
use Prooph\EventStoreClient\Messages\ClusterMessages\VNodeState;
30
use Psr\Log\LoggerInterface as Logger;
31

32
/** @internal */
33
final class ClusterDnsEndPointDiscoverer implements EndPointDiscoverer
34
{
35
    private HttpClient $httpClient;
36

37
    /** @var list<MemberInfoDto> */
38
    private array $oldGossip = [];
39

40
    /**
41
     * @param list<GossipSeed> $gossipSeeds
42
     */
43
    public function __construct(
44
        private readonly Logger $log,
45
        private readonly string $clusterDns,
46
        private readonly int $maxDiscoverAttempts,
47
        private readonly int $managerExternalHttpPort,
48
        private readonly array $gossipSeeds,
49
        float $gossipTimeout,
50
        private readonly bool $preferRandomNode
51
    ) {
52
        $builder = new HttpClientBuilder();
7✔
53
        $builder->intercept(new SetRequestTimeout($gossipTimeout, $gossipTimeout, $gossipTimeout));
7✔
54
        $this->httpClient = $builder->build();
7✔
55
    }
56

57
    public function discover(?EndPoint $failedTcpEndPoint): NodeEndPoints
58
    {
59
        for ($attempt = 1; $attempt <= $this->maxDiscoverAttempts; ++$attempt) {
×
60
            try {
61
                $endPoints = $this->discoverEndPoint($failedTcpEndPoint);
×
62

63
                if (null !== $endPoints) {
×
64
                    $this->log->info(\sprintf(
×
65
                        'Discovering attempt %d/%d successful: best candidate is %s',
×
66
                        $attempt,
×
67
                        $this->maxDiscoverAttempts,
×
68
                        $endPoints
×
69
                    ));
×
70

71
                    return $endPoints;
×
72
                }
73
            } catch (Exception $e) {
×
74
                $this->log->info(\sprintf(
×
75
                    'Discovering attempt %d/%d failed with error: %s',
×
76
                    $attempt,
×
77
                    $this->maxDiscoverAttempts,
×
78
                    $e->getMessage()
×
79
                ));
×
80
            }
81

82
            delay(0.5);
×
83
        }
84

85
        throw new ClusterException(\sprintf(
×
86
            'Failed to discover candidate in %d attempts',
×
87
            $this->maxDiscoverAttempts
×
88
        ));
×
89
    }
90

91
    private function discoverEndPoint(?EndPoint $failedTcpEndPoint): ?NodeEndPoints
92
    {
93
        $oldGossip = $this->oldGossip;
×
94

95
        $gossipCandidates = ! empty($oldGossip)
×
96
            ? $this->getGossipCandidatesFromOldGossip($oldGossip, $failedTcpEndPoint)
×
97
            : $this->getGossipCandidatesFromDns();
×
98

99
        foreach ($gossipCandidates as $candidate) {
×
100
            $gossip = $this->tryGetGossipFrom($candidate);
×
101
            \assert(null === $gossip || $gossip instanceof ClusterInfoDto);
102

103
            if (null === $gossip || empty($gossip->members())) {
×
104
                continue;
×
105
            }
106

107
            $bestNode = $this->tryDetermineBestNode($gossip->members(), $this->preferRandomNode);
×
108

109
            if (null !== $bestNode) {
×
110
                $this->oldGossip = $gossip->members();
×
111

112
                return $bestNode;
×
113
            }
114
        }
115

116
        return null;
×
117
    }
118

119
    /** @return list<GossipSeed> */
120
    private function getGossipCandidatesFromDns(): array
121
    {
122
        if (\count($this->gossipSeeds) > 0) {
×
123
            $endPoints = $this->gossipSeeds;
×
124
        } else {
125
            $endPoints = [new GossipSeed(new EndPoint($this->clusterDns, $this->managerExternalHttpPort))];
×
126
        }
127

128
        \shuffle($endPoints);
×
129

130
        return $endPoints;
×
131
    }
132

133
    /**
134
     * @param list<MemberInfoDto> $oldGossip
135
     * @param EndPoint|null $failedTcpEndPoint
136
     * @return GossipSeed[]
137
     */
138
    private function getGossipCandidatesFromOldGossip(array $oldGossip, ?EndPoint $failedTcpEndPoint): array
139
    {
140
        $filter = function () use ($oldGossip, $failedTcpEndPoint): array {
×
141
            $result = [];
×
142
            foreach ($oldGossip as $dto) {
×
143
                if ($failedTcpEndPoint && $dto->externalTcpIp() === $failedTcpEndPoint->host()) {
×
144
                    continue;
×
145
                }
146

147
                $result[] = $dto;
×
148
            }
149

150
            return $result;
×
151
        };
×
152

153
        $gossipCandidates = null === $failedTcpEndPoint
×
154
            ? $oldGossip
×
155
            : $filter();
×
156

157
        return $this->arrangeGossipCandidates($gossipCandidates);
×
158
    }
159

160
    /**
161
     * @param list<MemberInfoDto> $members
162
     * @return GossipSeed[]
163
     */
164
    private function arrangeGossipCandidates(array $members): array
165
    {
166
        $result = [];
×
167
        $i = -1;
×
168
        $j = \count($members);
×
169

170
        foreach ($members as $k => $member) {
×
171
            if ($members[$k]->state()->value === VNodeState::Manager) {
×
172
                $result[--$j] = new GossipSeed(new EndPoint($members[$k]->httpAddress(), $members[$k]->httpPort()));
×
173
            } else {
174
                $result[++$i] = new GossipSeed(new EndPoint($members[$k]->httpAddress(), $members[$k]->httpPort()));
×
175
            }
176
        }
177

178
        \shuffle($result);
×
179

180
        return $result;
×
181
    }
182

183
    private function tryGetGossipFrom(GossipSeed $endPoint): ?ClusterInfoDto
184
    {
185
        $schema = $endPoint->seedOverTls() ? 'https://' : 'http://';
×
186
        $uri = $schema . $endPoint->endPoint()->host() . ':' . $endPoint->endPoint()->port() . '/gossip?format=json';
×
187
        $this->log->info($uri);
×
188

189
        try {
190
            $request = new Request($uri);
×
191

192
            $header = $endPoint->hostHeader();
×
193

194
            if (! empty($header)) {
×
195
                $headerData = \explode(':', $header);
×
196
                $request->setHeader($headerData[0], $headerData[1]);
×
197
            }
198

199
            $response = $this->httpClient->request($request);
×
200
        } catch (Exception $e) {
×
201
            $this->log->error($e->getMessage());
×
202

203
            return null;
×
204
        }
205

206
        if ($response->getStatus() !== 200) {
×
207
            return null;
×
208
        }
209

210
        $json = $response->getBody()->read();
×
211

212
        if ('' === $json) {
×
213
            return null;
×
214
        }
215

216
        $data = Json::decode($json);
×
217

218
        return new ClusterInfoDto(\array_map(
×
219
            fn (array $member) => new MemberInfoDto($member),
×
220
            $data['members']
×
221
        ));
×
222
    }
223

224
    /**
225
     * @param list<MemberInfoDto> $members
226
     * @param bool $preferRandomNode
227
     * @return NodeEndPoints|null
228
     */
229
    private function tryDetermineBestNode(array $members, bool $preferRandomNode): ?NodeEndPoints
230
    {
231
        $nodes = [];
×
232

233
        foreach ($members as $member) {
×
234
            if (\in_array(
×
235
                $member->state(),
×
236
                [
×
237
                    VNodeState::Manager,
×
238
                    VNodeState::ShuttingDown,
×
239
                    VNodeState::Shutdown,
×
240
                ],
×
241
                true
×
242
            )) {
×
243
                continue;
×
244
            }
245

246
            $nodes[] = $member;
×
247
        }
248

249
        if (empty($nodes)) {
×
250
            return null;
×
251
        }
252

253
        $key = 0;
×
254

255
        if ($preferRandomNode) {
×
256
            $key = \rand(0, \count($nodes) - 1);
×
257
        }
258

259
        $node = $nodes[$key];
×
260

261
        $normTcp = new EndPoint($node->externalTcpIp(), $node->externalTcpPort());
×
262
        $secTcp = $node->externalSecureTcpPort() > 0
×
263
            ? new EndPoint($node->externalTcpIp(), $node->externalSecureTcpPort())
×
264
            : null;
×
265

266
        $this->log->info(\sprintf(
×
267
            'Discovering: found best choice [%s, %s] (%s)',
×
268
            $normTcp,
×
269
            null === $secTcp ? 'n/a' : $secTcp,
×
270
            $node->state()->name
×
271
        ));
×
272

273
        return new NodeEndPoints($normTcp, $secTcp);
×
274
    }
275
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc