• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

prooph / event-store-client / 9555551525

17 Jun 2024 10:16PM UTC coverage: 70.262% (-1.1%) from 71.395%
9555551525

push

github

prolic
update coveralls repo token

3466 of 4933 relevant lines covered (70.26%)

67.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.33
/src/Internal/EventStorePersistentSubscription.php
1
<?php
2

3
/**
4
 * This file is part of `prooph/event-store-client`.
5
 * (c) 2018-2024 Alexander Miertsch <kontakt@codeliner.ws>
6
 * (c) 2018-2024 Sascha-Oliver Prolic <saschaprolic@googlemail.com>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11

12
declare(strict_types=1);
13

14
namespace Prooph\EventStoreClient\Internal;
15

16
use function Amp\async;
17

18
use Amp\DeferredFuture;
19

20
use function Amp\delay;
21

22
use Amp\TimeoutCancellation;
23
use Closure;
24
use Exception;
25
use Prooph\EventStore\EventId;
26
use Prooph\EventStore\EventStorePersistentSubscription as EventStorePersistentSubscriptionInterface;
27
use Prooph\EventStore\Exception\RuntimeException;
28
use Prooph\EventStore\Internal\DropData;
29
use Prooph\EventStore\Internal\PersistentEventStoreSubscription;
30
use Prooph\EventStore\Internal\ResolvedEvent as InternalResolvedEvent;
31
use Prooph\EventStore\PersistentSubscriptionNakEventAction;
32
use Prooph\EventStore\PersistentSubscriptionResolvedEvent;
33
use Prooph\EventStore\ResolvedEvent;
34
use Prooph\EventStore\SubscriptionDropReason;
35
use Prooph\EventStore\UserCredentials;
36
use Prooph\EventStoreClient\ConnectionSettings;
37
use Prooph\EventStoreClient\Internal\Message\StartPersistentSubscriptionMessage;
38
use Psr\Log\LoggerInterface as Logger;
39
use Revolt\EventLoop;
40
use SplQueue;
41
use Throwable;
42

43
/** @internal */
44
class EventStorePersistentSubscription implements EventStorePersistentSubscriptionInterface
45
{
46
    private ResolvedEvent $dropSubscriptionEvent;
47

48
    private ?PersistentEventStoreSubscription $subscription = null;
49

50
    /** @var SplQueue */
51
    private $queue;
52

53
    private bool $isProcessing = false;
54

55
    private ?DropData $dropData = null;
56

57
    private bool $isDropped = false;
58

59
    private DeferredFuture $stopped;
60

61
    /**
62
     * @internal
63
     *
64
     * @param Closure(EventStorePersistentSubscription, ResolvedEvent, null|int): void $eventAppeared
65
     * @param null|Closure(EventStorePersistentSubscription, SubscriptionDropReason, null|Throwable): void $subscriptionDropped
66
     */
67
    public function __construct(
68
        private readonly string $subscriptionId,
69
        private readonly string $streamId,
70
        private readonly Closure $eventAppeared,
71
        private readonly ?Closure $subscriptionDropped,
72
        private readonly ?UserCredentials $userCredentials,
73
        private readonly Logger $log,
74
        private readonly bool $verbose,
75
        private readonly ConnectionSettings $settings,
76
        private readonly EventStoreConnectionLogicHandler $handler,
77
        private readonly int $bufferSize = 10,
78
        private readonly bool $autoAck = true
79
    ) {
80
        $this->dropSubscriptionEvent = new ResolvedEvent(null, null, null);
34✔
81
        $this->queue = new SplQueue();
34✔
82
        $this->stopped = new DeferredFuture();
34✔
83
        $this->stopped->complete(true);
34✔
84
    }
85

86
    /**
87
     * @internal
88
     *
89
     * @param Closure(EventStorePersistentSubscription, ResolvedEvent, null|int): void $eventAppeared
90
     * @param null|Closure(EventStorePersistentSubscription, SubscriptionDropReason, null|Throwable): void $subscriptionDropped
91
     */
92
    public function startSubscription(
93
        string $subscriptionId,
94
        string $streamId,
95
        int $bufferSize,
96
        ?UserCredentials $userCredentials,
97
        Closure $onEventAppeared,
98
        ?Closure $onSubscriptionDropped,
99
        ConnectionSettings $settings
100
    ): PersistentEventStoreSubscription {
101
        $deferred = new DeferredFuture();
34✔
102

103
        $this->handler->enqueueMessage(new StartPersistentSubscriptionMessage(
34✔
104
            $deferred,
34✔
105
            $subscriptionId,
34✔
106
            $streamId,
34✔
107
            $bufferSize,
34✔
108
            $userCredentials,
34✔
109
            $onEventAppeared,
34✔
110
            $onSubscriptionDropped,
34✔
111
            $settings->maxRetries(),
34✔
112
            $settings->operationTimeout()
34✔
113
        ));
34✔
114

115
        return $deferred->getFuture()->await();
34✔
116
    }
117

118
    /**
119
     * @internal
120
     */
121
    public function start(): void
122
    {
123
        $this->stopped = new DeferredFuture();
34✔
124

125
        $this->subscription = $this->startSubscription(
35✔
126
            $this->subscriptionId,
35✔
127
            $this->streamId,
35✔
128
            $this->bufferSize,
35✔
129
            $this->userCredentials,
35✔
130
            function (
35✔
131
                PersistentEventStoreSubscription $subscription,
35✔
132
                PersistentSubscriptionResolvedEvent $resolvedEvent
35✔
133
            ): void {
35✔
134
                $this->onEventAppeared($resolvedEvent);
25✔
135
            },
35✔
136
            function (
35✔
137
                PersistentEventStoreSubscription $subscription,
35✔
138
                SubscriptionDropReason $reason,
35✔
139
                ?Throwable $exception
35✔
140
            ): void {
35✔
141
                $this->onSubscriptionDropped($reason, $exception);
30✔
142
            },
35✔
143
            $this->settings
35✔
144
        );
35✔
145
    }
146

147
    /**
148
     * Acknowledge that a message have completed processing (this will tell the server it has been processed)
149
     * Note: There is no need to ack a message if you have Auto Ack enabled
150
     */
151
    public function acknowledge(InternalResolvedEvent $event): void
152
    {
153
        /** @psalm-suppress PossiblyNullReference */
154
        $this->subscription->notifyEventsProcessed([$event->originalEvent()->eventId()]);
4✔
155
    }
156

157
    /**
158
     * Acknowledge that a message have completed processing (this will tell the server it has been processed)
159
     * Note: There is no need to ack a message if you have Auto Ack enabled
160
     *
161
     * @param list<InternalResolvedEvent> $events
162
     */
163
    public function acknowledgeMultiple(array $events): void
164
    {
165
        $ids = \array_map(
×
166
            /** @psalm-suppress PossiblyNullReference */
167
            fn (InternalResolvedEvent $event): EventId => $event->originalEvent()->eventId(),
×
168
            $events
×
169
        );
×
170

171
        /** @psalm-suppress PossiblyNullReference */
172
        $this->subscription->notifyEventsProcessed($ids);
×
173
    }
174

175
    /**
176
     * Acknowledge that a message have completed processing (this will tell the server it has been processed)
177
     * Note: There is no need to ack a message if you have Auto Ack enabled
178
     */
179
    public function acknowledgeEventId(EventId $eventId): void
180
    {
181
        /** @psalm-suppress PossiblyNullReference */
182
        $this->subscription->notifyEventsProcessed([$eventId]);
×
183
    }
184

185
    /**
186
     * Acknowledge that a message have completed processing (this will tell the server it has been processed)
187
     * Note: There is no need to ack a message if you have Auto Ack enabled
188
     *
189
     * @param list<EventId> $eventIds
190
     */
191
    public function acknowledgeMultipleEventIds(array $eventIds): void
192
    {
193
        /** @psalm-suppress PossiblyNullReference */
194
        $this->subscription->notifyEventsProcessed($eventIds);
×
195
    }
196

197
    /**
198
     * Mark a message failed processing. The server will be take action based upon the action paramter
199
     */
200
    public function fail(
201
        InternalResolvedEvent $event,
202
        PersistentSubscriptionNakEventAction $action,
203
        string $reason
204
    ): void {
205
        /** @psalm-suppress PossiblyNullReference */
206
        $this->subscription->notifyEventsFailed([$event->originalEvent()->eventId()], $action, $reason);
3✔
207
    }
208

209
    /**
210
     * Mark n messages that have failed processing. The server will take action based upon the action parameter
211
     *
212
     * @param list<InternalResolvedEvent> $events
213
     */
214
    public function failMultiple(
215
        array $events,
216
        PersistentSubscriptionNakEventAction $action,
217
        string $reason
218
    ): void {
219
        $ids = \array_map(
×
220
            /** @psalm-suppress PossiblyNullReference */
221
            fn (InternalResolvedEvent $event): EventId => $event->originalEvent()->eventId(),
×
222
            $events
×
223
        );
×
224

225
        /** @psalm-suppress PossiblyNullReference */
226
        $this->subscription->notifyEventsFailed($ids, $action, $reason);
×
227
    }
228

229
    public function failEventId(EventId $eventId, PersistentSubscriptionNakEventAction $action, string $reason): void
230
    {
231
        /** @psalm-suppress PossiblyNullReference */
232
        $this->subscription->notifyEventsFailed([$eventId], $action, $reason);
×
233
    }
234

235
    /**
236
     * @param list<EventId> $eventIds
237
     */
238
    public function failMultipleEventIds(array $eventIds, PersistentSubscriptionNakEventAction $action, string $reason): void
239
    {
240
        /** @psalm-suppress PossiblyNullReference */
241
        $this->subscription->notifyEventsFailed($eventIds, $action, $reason);
×
242
    }
243

244
    public function stop(?float $timeout = null): void
245
    {
246
        if ($this->verbose) {
1✔
247
            $this->log->debug(\sprintf(
×
248
                'Persistent Subscription to %s: requesting stop...',
×
249
                $this->streamId
×
250
            ));
×
251
        }
252

253
        $this->enqueueSubscriptionDropNotification(SubscriptionDropReason::UserInitiated, null);
1✔
254

255
        if (null === $timeout) {
1✔
256
            return;
1✔
257
        }
258

259
        $this->stopped->getFuture()->await(new TimeoutCancellation($timeout));
1✔
260
    }
261

262
    private function enqueueSubscriptionDropNotification(
263
        SubscriptionDropReason $reason,
264
        ?Throwable $error
265
    ): void {
266
        // if drop data was already set -- no need to enqueue drop again, somebody did that already
267
        if (null === $this->dropData) {
30✔
268
            $this->dropData = new DropData($reason, $error);
30✔
269

270
            $this->enqueue(
30✔
271
                new PersistentSubscriptionResolvedEvent($this->dropSubscriptionEvent, null)
30✔
272
            );
30✔
273
        }
274
    }
275

276
    private function onSubscriptionDropped(
277
        SubscriptionDropReason $reason,
278
        ?Throwable $exception
279
    ): void {
280
        $this->enqueueSubscriptionDropNotification($reason, $exception);
30✔
281
    }
282

283
    private function onEventAppeared(
284
        PersistentSubscriptionResolvedEvent $resolvedEvent
285
    ): void {
286
        $this->enqueue($resolvedEvent);
25✔
287
    }
288

289
    private function enqueue(PersistentSubscriptionResolvedEvent $resolvedEvent): void
290
    {
291
        $this->queue->enqueue($resolvedEvent);
31✔
292

293
        if (! $this->isProcessing) {
31✔
294
            $this->isProcessing = true;
31✔
295

296
            EventLoop::defer(function (): void {
35✔
297
                $this->processQueue();
34✔
298
            });
35✔
299
        }
300
    }
301

302
    private function processQueue(): void
303
    {
304
        async(function (): void {
34✔
305
            do {
306
                if (null === $this->subscription) {
34✔
307
                    delay(1);
×
308
                } else {
309
                    while (! $this->queue->isEmpty()) {
34✔
310
                        $e = $this->queue->dequeue();
34✔
311
                        \assert($e instanceof PersistentSubscriptionResolvedEvent);
312

313
                        if ($e->event() === $this->dropSubscriptionEvent) {
34✔
314
                            // drop subscription artificial ResolvedEvent
315

316
                            if (null === $this->dropData) {
22✔
317
                                throw new RuntimeException('Drop reason not specified');
×
318
                            }
319

320
                            $this->dropSubscription($this->dropData->reason(), $this->dropData->error());
22✔
321

322
                            return;
22✔
323
                        }
324

325
                        if (null !== $this->dropData) {
25✔
326
                            $this->dropSubscription($this->dropData->reason(), $this->dropData->error());
7✔
327

328
                            return;
7✔
329
                        }
330

331
                        try {
332
                            ($this->eventAppeared)($this, $e->event(), $e->retryCount());
22✔
333

334
                            if ($this->autoAck) {
21✔
335
                                /** @psalm-suppress PossiblyNullReference */
336
                                $this->subscription->notifyEventsProcessed([$e->originalEvent()->eventId()]);
15✔
337
                            }
338

339
                            if ($this->verbose) {
21✔
340
                                /** @psalm-suppress PossiblyNullReference */
341
                                $this->log->debug(\sprintf(
21✔
342
                                    'Persistent Subscription to %s: processed event (%s, %d, %s @ %d)',
21✔
343
                                    $this->streamId,
21✔
344
                                    $e->originalEvent()->eventStreamId(),
21✔
345
                                    $e->originalEvent()->eventNumber(),
21✔
346
                                    $e->originalEvent()->eventType(),
21✔
347
                                    $e->event()->originalEventNumber()
21✔
348
                                ));
21✔
349
                            }
350
                        } catch (Exception $ex) {
2✔
351
                            //TODO GFY should we autonak here?
352

353
                            $this->dropSubscription(SubscriptionDropReason::EventHandlerException, $ex);
2✔
354

355
                            return;
2✔
356
                        }
357
                    }
358
                }
359
            } while (! $this->queue->isEmpty() && $this->isProcessing);
20✔
360

361
            $this->isProcessing = false;
20✔
362
        })->await();
34✔
363
    }
364

365
    private function dropSubscription(SubscriptionDropReason $reason, ?Throwable $error): void
366
    {
367
        if (! $this->isDropped) {
30✔
368
            $this->isDropped = true;
30✔
369

370
            if ($this->verbose) {
30✔
371
                $this->log->debug(\sprintf(
×
372
                    'Persistent Subscription to %s: dropping subscription, reason: %s %s',
×
373
                    $this->streamId,
×
374
                    $reason->name,
×
375
                    null === $error ? '' : $error->getMessage()
×
376
                ));
×
377
            }
378

379
            $this->subscription?->unsubscribe();
30✔
380

381
            if ($this->subscriptionDropped) {
30✔
382
                ($this->subscriptionDropped)($this, $reason, $error);
4✔
383
            }
384

385
            $this->stopped->complete(true);
30✔
386
        }
387
    }
388
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc