• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

prooph / event-store-client / 9555551525

17 Jun 2024 10:16PM UTC coverage: 70.262% (-1.1%) from 71.395%
9555551525

push

github

prolic
update coveralls repo token

3466 of 4933 relevant lines covered (70.26%)

67.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.0
/src/ClientOperations/ConnectToPersistentSubscriptionOperation.php
1
<?php
2

3
/**
4
 * This file is part of `prooph/event-store-client`.
5
 * (c) 2018-2024 Alexander Miertsch <kontakt@codeliner.ws>
6
 * (c) 2018-2024 Sascha-Oliver Prolic <saschaprolic@googlemail.com>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11

12
declare(strict_types=1);
13

14
namespace Prooph\EventStoreClient\ClientOperations;
15

16
use Amp\DeferredFuture;
17
use Closure;
18
use Prooph\EventStore\EventId;
19
use Prooph\EventStore\EventStoreSubscription;
20
use Prooph\EventStore\Exception\AccessDenied;
21
use Prooph\EventStore\Exception\InvalidArgumentException;
22
use Prooph\EventStore\Exception\MaximumSubscribersReached;
23
use Prooph\EventStore\Exception\PersistentSubscriptionDeleted;
24
use Prooph\EventStore\Internal\ConnectToPersistentSubscriptions;
25
use Prooph\EventStore\Internal\PersistentEventStoreSubscription;
26
use Prooph\EventStore\PersistentSubscriptionNakEventAction;
27
use Prooph\EventStore\PersistentSubscriptionResolvedEvent;
28
use Prooph\EventStore\SubscriptionDropReason;
29
use Prooph\EventStore\UserCredentials;
30
use Prooph\EventStoreClient\Internal\EventMessageConverter;
31
use Prooph\EventStoreClient\Messages\ClientMessages\ConnectToPersistentSubscription;
32
use Prooph\EventStoreClient\Messages\ClientMessages\PersistentSubscriptionAckEvents;
33
use Prooph\EventStoreClient\Messages\ClientMessages\PersistentSubscriptionConfirmation;
34
use Prooph\EventStoreClient\Messages\ClientMessages\PersistentSubscriptionNakEvents;
35
use Prooph\EventStoreClient\Messages\ClientMessages\PersistentSubscriptionStreamEventAppeared;
36
use Prooph\EventStoreClient\Messages\ClientMessages\SubscriptionDropped;
37
use Prooph\EventStoreClient\Messages\ClientMessages\SubscriptionDropped\SubscriptionDropReason as SubscriptionDropReasonMessage;
38
use Prooph\EventStoreClient\SystemData\InspectionDecision;
39
use Prooph\EventStoreClient\SystemData\InspectionResult;
40
use Prooph\EventStoreClient\SystemData\TcpCommand;
41
use Prooph\EventStoreClient\SystemData\TcpFlags;
42
use Prooph\EventStoreClient\SystemData\TcpPackage;
43
use Psr\Log\LoggerInterface as Logger;
44

45
/** @internal */
46
class ConnectToPersistentSubscriptionOperation extends AbstractSubscriptionOperation implements ConnectToPersistentSubscriptions
47
{
48
    private string $subscriptionId = '';
49

50
    public function __construct(
51
        Logger $logger,
52
        DeferredFuture $deferred,
53
        private readonly string $groupName,
54
        private readonly int $bufferSize,
55
        string $streamId,
56
        ?UserCredentials $userCredentials,
57
        Closure $eventAppeared,
58
        ?Closure $subscriptionDropped,
59
        bool $verboseLogging,
60
        Closure $getConnection
61
    ) {
62
        parent::__construct(
34✔
63
            $logger,
34✔
64
            $deferred,
34✔
65
            $streamId,
34✔
66
            false,
34✔
67
            $userCredentials,
34✔
68
            $eventAppeared,
34✔
69
            $subscriptionDropped,
34✔
70
            $verboseLogging,
34✔
71
            $getConnection
34✔
72
        );
34✔
73
    }
74

75
    protected function createSubscriptionPackage(): TcpPackage
76
    {
77
        $message = new ConnectToPersistentSubscription();
34✔
78
        $message->setEventStreamId($this->streamId);
34✔
79
        $message->setSubscriptionId($this->groupName);
34✔
80
        $message->setAllowedInFlightMessages($this->bufferSize);
34✔
81

82
        $login = null;
34✔
83
        $pass = null;
34✔
84

85
        if ($this->userCredentials) {
34✔
86
            $login = $this->userCredentials->username();
26✔
87
            $pass = $this->userCredentials->password();
26✔
88
        }
89

90
        return new TcpPackage(
34✔
91
            TcpCommand::ConnectToPersistentSubscription,
34✔
92
            $this->userCredentials ? TcpFlags::Authenticated : TcpFlags::None,
34✔
93
            $this->correlationId,
34✔
94
            $message->serializeToString(),
34✔
95
            $login,
34✔
96
            $pass
34✔
97
        );
34✔
98
    }
99

100
    protected function preInspectPackage(TcpPackage $package): ?InspectionResult
101
    {
102
        if ($package->command() === TcpCommand::PersistentSubscriptionConfirmation) {
34✔
103
            $message = new PersistentSubscriptionConfirmation();
32✔
104
            $message->mergeFromString($package->data());
32✔
105

106
            $this->confirmSubscription(
32✔
107
                (int) $message->getLastCommitPosition(),
32✔
108
                $message->getLastEventNumber()
32✔
109
                    ? (int) $message->getLastEventNumber()
30✔
110
                    : null
32✔
111
            );
32✔
112
            $this->subscriptionId = $message->getSubscriptionId();
32✔
113

114
            return new InspectionResult(InspectionDecision::Subscribed, 'SubscriptionConfirmation');
32✔
115
        }
116

117
        if ($package->command() === TcpCommand::PersistentSubscriptionStreamEventAppeared) {
33✔
118
            $message = new PersistentSubscriptionStreamEventAppeared();
26✔
119
            $message->mergeFromString($package->data());
26✔
120

121
            $event = EventMessageConverter::convertResolvedIndexedEventMessageToResolvedEvent($message->getEvent());
26✔
122
            $this->eventAppeared(new PersistentSubscriptionResolvedEvent($event, $message->getRetryCount()));
26✔
123

124
            return new InspectionResult(InspectionDecision::DoNothing, 'StreamEventAppeared');
26✔
125
        }
126

127
        if ($package->command() === TcpCommand::SubscriptionDropped) {
8✔
128
            $message = new SubscriptionDropped();
8✔
129
            $message->mergeFromString($package->data());
8✔
130

131
            if ($message->getReason() === SubscriptionDropReasonMessage::AccessDenied) {
8✔
132
                $this->dropSubscription(SubscriptionDropReason::AccessDenied, new AccessDenied('You do not have access to the stream'));
1✔
133

134
                return new InspectionResult(InspectionDecision::EndOperation, 'SubscriptionDropped');
1✔
135
            }
136

137
            if ($message->getReason() === SubscriptionDropReasonMessage::NotFound) {
7✔
138
                $this->dropSubscription(SubscriptionDropReason::NotFound, new InvalidArgumentException('Subscription not found'));
1✔
139

140
                return new InspectionResult(InspectionDecision::EndOperation, 'SubscriptionDropped');
1✔
141
            }
142

143
            if ($message->getReason() === SubscriptionDropReasonMessage::PersistentSubscriptionDeleted) {
6✔
144
                $this->dropSubscription(SubscriptionDropReason::PersistentSubscriptionDeleted, new PersistentSubscriptionDeleted());
×
145

146
                return new InspectionResult(InspectionDecision::EndOperation, 'SubscriptionDropped');
×
147
            }
148

149
            if ($message->getReason() === SubscriptionDropReasonMessage::SubscriberMaxCountReached) {
6✔
150
                $this->dropSubscription(SubscriptionDropReason::MaxSubscribersReached, new MaximumSubscribersReached());
2✔
151

152
                return new InspectionResult(InspectionDecision::EndOperation, 'SubscriptionDropped');
2✔
153
            }
154

155
            $this->dropSubscription(SubscriptionDropReason::from($message->getReason()), null, ($this->getConnection)());
4✔
156

157
            return new InspectionResult(InspectionDecision::EndOperation, 'SubscriptionDropped');
4✔
158
        }
159

160
        return null;
×
161
    }
162

163
    protected function createSubscriptionObject(int $lastCommitPosition, ?int $lastEventNumber): EventStoreSubscription
164
    {
165
        return new PersistentEventStoreSubscription(
32✔
166
            $this,
32✔
167
            $this->streamId,
32✔
168
            $lastCommitPosition,
32✔
169
            $lastEventNumber
32✔
170
        );
32✔
171
    }
172

173
    /** @param list<EventId> $eventIds */
174
    public function notifyEventsProcessed(array $eventIds): void
175
    {
176
        if (empty($eventIds)) {
19✔
177
            throw new InvalidArgumentException('EventIds cannot be empty');
×
178
        }
179

180
        $message = new PersistentSubscriptionAckEvents();
19✔
181
        $message->setSubscriptionId($this->subscriptionId);
19✔
182
        $message->setProcessedEventIds(\array_map(
19✔
183
            fn (EventId $eventId): string => $eventId->toBinary(),
19✔
184
            $eventIds
19✔
185
        ));
19✔
186

187
        $login = null;
19✔
188
        $pass = null;
19✔
189

190
        if ($this->userCredentials) {
19✔
191
            $login = $this->userCredentials->username();
17✔
192
            $pass = $this->userCredentials->password();
17✔
193
        }
194

195
        $package = new TcpPackage(
19✔
196
            TcpCommand::PersistentSubscriptionAckEvents,
19✔
197
            $this->userCredentials ? TcpFlags::Authenticated : TcpFlags::None,
19✔
198
            $this->correlationId,
19✔
199
            $message->serializeToString(),
19✔
200
            $login,
19✔
201
            $pass
19✔
202
        );
19✔
203

204
        $this->enqueueSend($package);
19✔
205
    }
206

207
    /**
208
     * @param list<EventId> $eventIds
209
     * @param PersistentSubscriptionNakEventAction $action
210
     * @param string $reason
211
     */
212
    public function notifyEventsFailed(
213
        array $eventIds,
214
        PersistentSubscriptionNakEventAction $action,
215
        string $reason
216
    ): void {
217
        if (empty($eventIds)) {
3✔
218
            throw new InvalidArgumentException('EventIds cannot be empty');
×
219
        }
220

221
        $eventIds = \array_map(
3✔
222
            fn (EventId $eventId): string => $eventId->toBinary(),
3✔
223
            $eventIds
3✔
224
        );
3✔
225

226
        $message = new PersistentSubscriptionNakEvents();
3✔
227
        $message->setSubscriptionId($this->subscriptionId);
3✔
228
        $message->setMessage($reason);
3✔
229
        $message->setAction($action->value);
3✔
230
        $message->setProcessedEventIds($eventIds);
3✔
231

232
        $login = null;
3✔
233
        $pass = null;
3✔
234

235
        if ($this->userCredentials) {
3✔
236
            $login = $this->userCredentials->username();
3✔
237
            $pass = $this->userCredentials->password();
3✔
238
        }
239

240
        $package = new TcpPackage(
3✔
241
            TcpCommand::PersistentSubscriptionNakEvents,
3✔
242
            $this->userCredentials ? TcpFlags::Authenticated : TcpFlags::None,
3✔
243
            $this->correlationId,
3✔
244
            $message->serializeToString(),
3✔
245
            $login,
3✔
246
            $pass
3✔
247
        );
3✔
248

249
        $this->enqueueSend($package);
3✔
250
    }
251

252
    public function name(): string
253
    {
254
        return 'ConnectToPersistentSubscription';
34✔
255
    }
256

257
    public function __toString(): string
258
    {
259
        return \sprintf(
34✔
260
            'StreamId: %s, ResolveLinkTos: %s, GroupName: %s, BufferSize: %d, SubscriptionId: %s',
34✔
261
            $this->streamId,
34✔
262
            $this->resolveLinkTos ? 'yes' : 'no',
34✔
263
            $this->groupName,
34✔
264
            $this->bufferSize,
34✔
265
            $this->subscriptionId
34✔
266
        );
34✔
267
    }
268
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc