• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

prooph / event-store-client / 9555551525

17 Jun 2024 10:16PM UTC coverage: 70.262% (-1.1%) from 71.395%
9555551525

push

github

prolic
update coveralls repo token

3466 of 4933 relevant lines covered (70.26%)

67.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.09
/src/PersistentSubscriptions/PersistentSubscriptionsClient.php
1
<?php
2

3
/**
4
 * This file is part of `prooph/event-store-client`.
5
 * (c) 2018-2024 Alexander Miertsch <kontakt@codeliner.ws>
6
 * (c) 2018-2024 Sascha-Oliver Prolic <saschaprolic@googlemail.com>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11

12
declare(strict_types=1);
13

14
namespace Prooph\EventStoreClient\PersistentSubscriptions;
15

16
use Amp\DeferredFuture;
17
use Amp\Http\Client\Response;
18
use Prooph\EventStore\EndPoint;
19
use Prooph\EventStore\PersistentSubscriptions\PersistentSubscriptionDetails;
20
use Prooph\EventStore\Transport\Http\EndpointExtensions;
21
use Prooph\EventStore\Transport\Http\HttpStatusCode;
22
use Prooph\EventStore\UserCredentials;
23
use Prooph\EventStore\Util\Json;
24
use Prooph\EventStoreClient\Exception\PersistentSubscriptionCommandFailed;
25
use Prooph\EventStoreClient\Transport\Http\HttpClient;
26
use Throwable;
27
use UnexpectedValueException;
28

29
/** @internal */
30
class PersistentSubscriptionsClient
31
{
32
    private HttpClient $client;
33

34
    private EndpointExtensions $httpSchema;
35

36
    public function __construct(int $operationTimeout, bool $tlsTerminatedEndpoint, bool $verifyPeer)
37
    {
38
        $this->client = new HttpClient($operationTimeout, $verifyPeer);
9✔
39
        $this->httpSchema = EndpointExtensions::useHttps($tlsTerminatedEndpoint);
9✔
40
    }
41

42
    public function describe(
43
        EndPoint $endPoint,
44
        string $stream,
45
        string $subscriptionName,
46
        ?UserCredentials $userCredentials = null
47
    ): PersistentSubscriptionDetails {
48
        $body = $this->sendGet(
1✔
49
            EndpointExtensions::formatStringToHttpUrl(
1✔
50
                $endPoint,
1✔
51
                $this->httpSchema,
1✔
52
                '/subscriptions/%s/%s/info',
1✔
53
                $stream,
1✔
54
                $subscriptionName
1✔
55
            ),
1✔
56
            $userCredentials,
1✔
57
            HttpStatusCode::Ok
1✔
58
        );
1✔
59

60
        if ('' === $body) {
1✔
61
            throw new UnexpectedValueException('No content received');
×
62
        }
63

64
        return PersistentSubscriptionDetails::fromArray(Json::decode($body));
1✔
65
    }
66

67
    /**
68
     * @return PersistentSubscriptionDetails[]
69
     */
70
    public function list(
71
        EndPoint $endPoint,
72
        ?string $stream = null,
73
        ?UserCredentials $userCredentials = null
74
    ): array {
75
        $formatString = '/subscriptions';
3✔
76

77
        if (null !== $stream) {
3✔
78
            $formatString .= "/$stream";
1✔
79
        }
80

81
        $body = $this->sendGet(
3✔
82
            EndpointExtensions::formatStringToHttpUrl(
3✔
83
                $endPoint,
3✔
84
                $this->httpSchema,
3✔
85
                $formatString
3✔
86
            ),
3✔
87
            $userCredentials,
3✔
88
            HttpStatusCode::Ok
3✔
89
        );
3✔
90

91
        if ('' === $body) {
3✔
92
            throw new UnexpectedValueException('No content received');
×
93
        }
94

95
        return \array_map(
3✔
96
            fn (array $entry) => PersistentSubscriptionDetails::fromArray($entry),
3✔
97
            Json::decode($body)
3✔
98
        );
3✔
99
    }
100

101
    public function replayParkedMessages(
102
        EndPoint $endPoint,
103
        string $stream,
104
        string $subscriptionName,
105
        ?UserCredentials $userCredentials = null
106
    ): void {
107
        $this->sendPost(
1✔
108
            EndpointExtensions::formatStringToHttpUrl(
1✔
109
                $endPoint,
1✔
110
                $this->httpSchema,
1✔
111
                '/subscriptions/%s/%s/replayParked',
1✔
112
                $stream,
1✔
113
                $subscriptionName
1✔
114
            ),
1✔
115
            '',
1✔
116
            $userCredentials,
1✔
117
            HttpStatusCode::Ok
1✔
118
        );
1✔
119
    }
120

121
    private function sendGet(string $url, ?UserCredentials $userCredentials, int $expectedCode): string
122
    {
123
        $deferred = new DeferredFuture();
4✔
124

125
        $this->client->get(
4✔
126
            $url,
4✔
127
            $userCredentials,
4✔
128
            function (Response $response) use ($deferred, $expectedCode, $url): void {
4✔
129
                if ($response->getStatus() === $expectedCode) {
4✔
130
                    $deferred->complete($response->getBody()->buffer());
4✔
131
                } else {
132
                    $deferred->error(new PersistentSubscriptionCommandFailed(
×
133
                        $response->getStatus(),
×
134
                        \sprintf(
×
135
                            'Server returned %d (%s) for GET on %s',
×
136
                            $response->getStatus(),
×
137
                            $response->getReason(),
×
138
                            $url
×
139
                        )
×
140
                    ));
×
141
                }
142
            },
4✔
143
            function (Throwable $exception) use ($deferred): void {
4✔
144
                $deferred->error($exception);
×
145
            }
4✔
146
        );
4✔
147

148
        return $deferred->getFuture()->await();
4✔
149
    }
150

151
    private function sendPost(
152
        string $url,
153
        string $content,
154
        ?UserCredentials $userCredentials,
155
        int $expectedCode
156
    ): void {
157
        $deferred = new DeferredFuture();
1✔
158

159
        $this->client->post(
1✔
160
            $url,
1✔
161
            $content,
1✔
162
            'application/json',
1✔
163
            $userCredentials,
1✔
164
            function (Response $response) use ($deferred, $expectedCode, $url): void {
1✔
165
                if ($response->getStatus() === $expectedCode) {
1✔
166
                    $deferred->complete(null);
1✔
167
                } else {
168
                    $deferred->error(new PersistentSubscriptionCommandFailed(
×
169
                        $response->getStatus(),
×
170
                        \sprintf(
×
171
                            'Server returned %d (%s) for POST on %s',
×
172
                            $response->getStatus(),
×
173
                            $response->getReason(),
×
174
                            $url
×
175
                        )
×
176
                    ));
×
177
                }
178
            },
1✔
179
            function (Throwable $exception) use ($deferred): void {
1✔
180
                $deferred->error($exception);
×
181
            }
1✔
182
        );
1✔
183

184
        $deferred->getFuture()->await();
1✔
185
    }
186
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc