• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

prooph / event-store-client / 9555551525

17 Jun 2024 10:16PM UTC coverage: 70.262% (-1.1%) from 71.395%
9555551525

push

github

prolic
update coveralls repo token

3466 of 4933 relevant lines covered (70.26%)

67.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.4
/src/Internal/EventStoreCatchUpSubscription.php
1
<?php
2

3
/**
4
 * This file is part of `prooph/event-store-client`.
5
 * (c) 2018-2024 Alexander Miertsch <kontakt@codeliner.ws>
6
 * (c) 2018-2024 Sascha-Oliver Prolic <saschaprolic@googlemail.com>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11

12
declare(strict_types=1);
13

14
namespace Prooph\EventStoreClient\Internal;
15

16
use function Amp\async;
17

18
use Amp\DeferredFuture;
19
use Amp\TimeoutCancellation;
20
use Closure;
21
use Exception;
22
use Prooph\EventStore\CatchUpSubscriptionSettings;
23
use Prooph\EventStore\ClientConnectionEventArgs;
24
use Prooph\EventStore\EventStoreCatchUpSubscription as EventStoreCatchUpSubscriptionInterface;
25
use Prooph\EventStore\EventStoreConnection;
26
use Prooph\EventStore\EventStoreSubscription;
27
use Prooph\EventStore\Internal\DropData;
28
use Prooph\EventStore\ListenerHandler;
29
use Prooph\EventStore\ResolvedEvent;
30
use Prooph\EventStore\SubscriptionDropReason;
31
use Prooph\EventStore\UserCredentials;
32
use Psr\Log\LoggerInterface as Logger;
33
use Revolt\EventLoop;
34
use SplQueue;
35
use Throwable;
36

37
abstract class EventStoreCatchUpSubscription implements EventStoreCatchUpSubscriptionInterface
38
{
39
    protected const AllStream = '<all>';
40

41
    private ResolvedEvent $dropSubscriptionEvent;
42

43
    private bool $isSubscribedToAll;
44

45
    private string $streamId;
46

47
    private string $subscriptionName;
48

49
    protected Logger $log;
50

51
    private EventStoreConnection $connection;
52

53
    private bool $resolveLinkTos;
54

55
    private ?UserCredentials $userCredentials;
56

57
    protected int $readBatchSize;
58

59
    protected int $maxPushQueueSize;
60

61
    /** @var Closure(EventStoreCatchUpSubscription, ResolvedEvent): void */
62
    protected Closure $eventAppeared;
63

64
    /** @var null|Closure(EventStoreCatchUpSubscription): void  */
65
    private ?Closure $liveProcessingStarted;
66

67
    /** @var null|Closure(EventStoreCatchUpSubscription, SubscriptionDropReason, null|Throwable): void */
68
    private ?Closure $subscriptionDropped;
69

70
    protected bool $verbose;
71

72
    /** @var SplQueue<ResolvedEvent> */
73
    private SplQueue $liveQueue;
74

75
    private ?EventStoreSubscription $subscription = null;
76

77
    private ?DropData $dropData = null;
78

79
    private bool $allowProcessing = false;
80

81
    private bool $isProcessing = false;
82

83
    protected bool $shouldStop = false;
84

85
    private bool $isDropped = false;
86

87
    private DeferredFuture $stopped;
88

89
    private ListenerHandler $connectListener;
90

91
    /**
92
     * @param Closure(EventStoreCatchUpSubscription, ResolvedEvent): void $eventAppeared
93
     * @param null|Closure(EventStoreCatchUpSubscription): void $liveProcessingStarted
94
     * @param null|Closure(EventStoreCatchUpSubscription, SubscriptionDropReason, null|Throwable): void $subscriptionDropped
95
     *
96
     * @internal
97
     */
98
    public function __construct(
99
        EventStoreConnection $connection,
100
        Logger $logger,
101
        string $streamId,
102
        ?UserCredentials $userCredentials,
103
        Closure $eventAppeared,
104
        ?Closure $liveProcessingStarted,
105
        ?Closure $subscriptionDropped,
106
        CatchUpSubscriptionSettings $settings
107
    ) {
108
        $this->dropSubscriptionEvent = new ResolvedEvent(null, null, null);
22✔
109
        $this->log = $logger;
22✔
110
        $this->connection = $connection;
22✔
111
        $this->isSubscribedToAll = empty($streamId);
22✔
112
        $this->streamId = $streamId;
22✔
113
        $this->userCredentials = $userCredentials;
22✔
114
        $this->eventAppeared = $eventAppeared;
22✔
115
        $this->liveProcessingStarted = $liveProcessingStarted;
22✔
116
        $this->subscriptionDropped = $subscriptionDropped;
22✔
117
        $this->resolveLinkTos = $settings->resolveLinkTos();
22✔
118
        $this->readBatchSize = $settings->readBatchSize();
22✔
119
        $this->maxPushQueueSize = $settings->maxLiveQueueSize();
22✔
120
        $this->verbose = $settings->verboseLogging();
22✔
121
        $this->liveQueue = new SplQueue();
22✔
122
        $this->subscriptionName = $settings->subscriptionName();
22✔
123
        $this->connectListener = new ListenerHandler(function (): void {
22✔
124
        });
22✔
125
        $this->stopped = new DeferredFuture();
22✔
126
        $this->stopped->complete(true);
22✔
127
    }
128

129
    public function isSubscribedToAll(): bool
130
    {
131
        return $this->isSubscribedToAll;
×
132
    }
133

134
    public function streamId(): string
135
    {
136
        return $this->streamId;
18✔
137
    }
138

139
    public function subscriptionName(): string
140
    {
141
        return $this->subscriptionName;
×
142
    }
143

144
    abstract protected function readEventsTill(
145
        EventStoreConnection $connection,
146
        bool $resolveLinkTos,
147
        ?UserCredentials $userCredentials,
148
        ?int $lastCommitPosition,
149
        ?int $lastEventNumber
150
    ): void;
151

152
    abstract protected function tryProcess(ResolvedEvent $e): void;
153

154
    /** @internal */
155
    public function start(): void
156
    {
157
        if ($this->verbose) {
22✔
158
            $this->log->debug(\sprintf(
×
159
                'Catch-up Subscription %s to %s: starting...',
×
160
                $this->subscriptionName,
×
161
                $this->isSubscribedToAll ? self::AllStream : $this->streamId
×
162
            ));
×
163
        }
164

165
        $this->runSubscription();
22✔
166
    }
167

168
    public function stop(?float $timeout = null): void
169
    {
170
        if ($this->verbose) {
10✔
171
            $this->log->debug(\sprintf(
×
172
                'Catch-up Subscription %s to %s: requesting stop...',
×
173
                $this->subscriptionName,
×
174
                $this->isSubscribedToAll ? self::AllStream : $this->streamId
×
175
            ));
×
176
            $this->log->debug(\sprintf(
×
177
                'Catch-up Subscription %s to %s: unhooking from connection.Connected',
×
178
                $this->subscriptionName,
×
179
                $this->isSubscribedToAll ? self::AllStream : $this->streamId
×
180
            ));
×
181
        }
182

183
        $this->connection->detach($this->connectListener);
10✔
184
        $this->shouldStop = true;
10✔
185
        $this->enqueueSubscriptionDropNotification(SubscriptionDropReason::UserInitiated, null);
10✔
186

187
        if (null === $timeout) {
10✔
188
            return;
×
189
        }
190

191
        if ($this->verbose) {
10✔
192
            $this->log->debug(\sprintf(
×
193
                'Waiting on subscription %s to stop',
×
194
                $this->subscriptionName
×
195
            ));
×
196
        }
197

198
        $this->stopped->getFuture()->await(new TimeoutCancellation($timeout));
10✔
199
    }
200

201
    private function onReconnect(ClientConnectionEventArgs $clientConnectionEventArgs): void
202
    {
203
        if ($this->verbose) {
1✔
204
            $this->log->debug(\sprintf(
×
205
                'Catch-up Subscription %s to %s: recovering after reconnection',
×
206
                $this->subscriptionName,
×
207
                $this->isSubscribedToAll ? self::AllStream : $this->streamId
×
208
            ));
×
209
            $this->log->debug(\sprintf(
×
210
                'Catch-up Subscription %s to %s: unhooking from connection.Connected',
×
211
                $this->subscriptionName,
×
212
                $this->isSubscribedToAll ? self::AllStream : $this->streamId
×
213
            ));
×
214
        }
215

216
        $this->connection->detach($this->connectListener);
1✔
217

218
        EventLoop::defer(function (): void {
1✔
219
            $this->runSubscription();
1✔
220
        });
1✔
221
    }
222

223
    private function runSubscription(): void
224
    {
225
        if ($this->verbose) {
22✔
226
            $this->log->debug(\sprintf(
×
227
                'Catch-up Subscription %s to %s: running...',
×
228
                $this->subscriptionName,
×
229
                $this->isSubscribedToAll ? self::AllStream : $this->streamId
×
230
            ));
×
231
        }
232

233
        $this->stopped = new DeferredFuture();
22✔
234
        $this->allowProcessing = false;
22✔
235

236
        async(function (): void {
22✔
237
            if (! $this->shouldStop) {
22✔
238
                if ($this->verbose) {
22✔
239
                    $this->log->debug(\sprintf(
×
240
                        'Catch-up Subscription %s to %s: pulling events...',
×
241
                        $this->subscriptionName,
×
242
                        $this->isSubscribedToAll ? self::AllStream : $this->streamId
×
243
                    ));
×
244
                }
245

246
                try {
247
                    $this->readEventsTill($this->connection, $this->resolveLinkTos, $this->userCredentials, null, null);
22✔
248
                    $this->subscribeToStream();
19✔
249
                } catch (Exception $ex) {
7✔
250
                    $this->dropSubscription(SubscriptionDropReason::CatchUpError, $ex);
7✔
251

252
                    throw $ex;
7✔
253
                }
254
            } else {
255
                $this->dropSubscription(SubscriptionDropReason::UserInitiated, null);
×
256
            }
257
        })->await();
22✔
258
    }
259

260
    private function subscribeToStream(): void
261
    {
262
        async(function (): void {
19✔
263
            if (! $this->shouldStop) {
19✔
264
                if ($this->verbose) {
19✔
265
                    $this->log->debug(\sprintf(
×
266
                        'Catch-up Subscription %s to %s: subscribing...',
×
267
                        $this->subscriptionName,
×
268
                        $this->isSubscribedToAll ? self::AllStream : $this->streamId
×
269
                    ));
×
270
                }
271

272
                $eventAppeared = function (
19✔
273
                    EventStoreSubscription $subscription,
19✔
274
                    ResolvedEvent $resolvedEvent
19✔
275
                ): void {
19✔
276
                    $this->enqueuePushedEvent($subscription, $resolvedEvent);
6✔
277
                };
19✔
278

279
                $subscriptionDropped = function (
20✔
280
                    EventStoreSubscription $subscription,
20✔
281
                    SubscriptionDropReason $reason,
20✔
282
                    ?Throwable $exception = null
20✔
283
                ): void {
20✔
284
                    $this->serverSubscriptionDropped($reason, $exception);
12✔
285
                };
20✔
286

287
                $subscription = empty($this->streamId)
19✔
288
                    ? $this->connection->subscribeToAll(
4✔
289
                        $this->resolveLinkTos,
4✔
290
                        $eventAppeared,
4✔
291
                        $subscriptionDropped,
4✔
292
                        $this->userCredentials
4✔
293
                    )
4✔
294
                    : $this->connection->subscribeToStream(
15✔
295
                        $this->streamId,
15✔
296
                        $this->resolveLinkTos,
15✔
297
                        $eventAppeared,
15✔
298
                        $subscriptionDropped,
15✔
299
                        $this->userCredentials
15✔
300
                    );
15✔
301

302
                $this->subscription = $subscription;
17✔
303

304
                $this->readMissedHistoricEvents();
17✔
305
            } else {
306
                $this->dropSubscription(SubscriptionDropReason::UserInitiated, null);
×
307
            }
308
        })->await();
19✔
309
    }
310

311
    private function readMissedHistoricEvents(): void
312
    {
313
        if (! $this->shouldStop) {
17✔
314
            if ($this->verbose) {
17✔
315
                $this->log->debug(\sprintf(
×
316
                    'Catch-up Subscription %s to %s: pulling events (if left)...',
×
317
                    $this->subscriptionName,
×
318
                    $this->isSubscribedToAll ? self::AllStream : $this->streamId
×
319
                ));
×
320
            }
321

322
            /** @psalm-suppress PossiblyNullReference */
323
            $this->readEventsTill(
17✔
324
                $this->connection,
17✔
325
                $this->resolveLinkTos,
17✔
326
                $this->userCredentials,
17✔
327
                $this->subscription->lastCommitPosition(),
17✔
328
                $this->subscription->lastEventNumber()
17✔
329
            );
17✔
330
            $this->startLiveProcessing();
15✔
331
        } else {
332
            $this->dropSubscription(SubscriptionDropReason::UserInitiated, null);
×
333
        }
334
    }
335

336
    private function startLiveProcessing(): void
337
    {
338
        if ($this->shouldStop) {
15✔
339
            $this->dropSubscription(SubscriptionDropReason::UserInitiated, null);
×
340

341
            return;
×
342
        }
343

344
        if ($this->verbose) {
15✔
345
            $this->log->debug(\sprintf(
×
346
                'Catch-up Subscription %s to %s: processing live events...',
×
347
                $this->subscriptionName,
×
348
                $this->isSubscribedToAll ? self::AllStream : $this->streamId
×
349
            ));
×
350
        }
351

352
        if ($this->liveProcessingStarted) {
15✔
353
            ($this->liveProcessingStarted)($this);
4✔
354
        }
355

356
        if ($this->verbose) {
15✔
357
            $this->log->debug(\sprintf(
×
358
                'Catch-up Subscription %s to %s: hooking to connection.Connected',
×
359
                $this->subscriptionName,
×
360
                $this->isSubscribedToAll ? self::AllStream : $this->streamId
×
361
            ));
×
362
        }
363

364
        $this->connectListener = $this->connection->onConnected(function (ClientConnectionEventArgs $args): void {
15✔
365
            $this->onReconnect($args);
1✔
366
        });
15✔
367

368
        $this->allowProcessing = true;
15✔
369

370
        $this->ensureProcessingPushQueue();
15✔
371
    }
372

373
    private function enqueuePushedEvent(EventStoreSubscription $subscription, ResolvedEvent $e): void
374
    {
375
        if ($this->verbose) {
6✔
376
            /** @psalm-suppress PossiblyNullReference */
377
            $this->log->debug(\sprintf(
×
378
                'Catch-up Subscription %s to %s: event appeared (%s, %s, %s, @ %s)',
×
379
                $this->subscriptionName,
×
380
                $this->isSubscribedToAll ? self::AllStream : $this->streamId,
×
381
                $e->originalStreamName(),
×
382
                $e->originalEventNumber(),
×
383
                $e->originalEvent()->eventType(),
×
384
                (string) $e->originalPosition()
×
385
            ));
×
386
        }
387

388
        if ($this->liveQueue->count() >= $this->maxPushQueueSize) {
6✔
389
            $this->enqueueSubscriptionDropNotification(SubscriptionDropReason::ProcessingQueueOverflow, null);
×
390
            $subscription->unsubscribe();
×
391

392
            return;
×
393
        }
394

395
        $this->liveQueue->enqueue($e);
6✔
396

397
        if ($this->allowProcessing) {
6✔
398
            $this->ensureProcessingPushQueue();
5✔
399
        }
400
    }
401

402
    private function serverSubscriptionDropped(
403
        SubscriptionDropReason $reason,
404
        ?Throwable $exception
405
    ): void {
406
        $this->enqueueSubscriptionDropNotification($reason, $exception);
12✔
407
    }
408

409
    private function enqueueSubscriptionDropNotification(SubscriptionDropReason $reason, ?Throwable $error): void
410
    {
411
        // if drop data was already set -- no need to enqueue drop again, somebody did that already
412
        $dropData = new DropData($reason, $error);
13✔
413

414
        if (null === $this->dropData) {
13✔
415
            $this->dropData = $dropData;
13✔
416

417
            $this->liveQueue->enqueue($this->dropSubscriptionEvent);
13✔
418

419
            if ($this->allowProcessing) {
13✔
420
                $this->ensureProcessingPushQueue();
13✔
421
            }
422
        }
423
    }
424

425
    private function ensureProcessingPushQueue(): void
426
    {
427
        if (! $this->isProcessing) {
16✔
428
            $this->isProcessing = true;
16✔
429

430
            EventLoop::defer(function (): void {
16✔
431
                $this->processLiveQueue();
15✔
432
            });
16✔
433
        }
434
    }
435

436
    private function processLiveQueue(): void
437
    {
438
        async(function (): void {
15✔
439
            do {
440
                while (! $this->liveQueue->isEmpty()) {
15✔
441
                    $e = $this->liveQueue->dequeue();
13✔
442

443
                    if ($e === $this->dropSubscriptionEvent) {
13✔
444
                        $this->dropData ??= new DropData(
13✔
445
                            SubscriptionDropReason::Unknown,
13✔
446
                            new \Exception('Drop reason not specified')
13✔
447
                        );
13✔
448
                        $this->dropSubscription($this->dropData->reason(), $this->dropData->error());
13✔
449

450
                        $this->isProcessing = false;
13✔
451

452
                        return;
13✔
453
                    }
454

455
                    try {
456
                        $this->tryProcess($e);
6✔
457
                    } catch (Exception $ex) {
×
458
                        $this->log->debug(\sprintf(
×
459
                            'Catch-up Subscription %s to %s: Exception occurred in subscription %s',
×
460
                            $this->subscriptionName,
×
461
                            $this->isSubscribedToAll ? self::AllStream : $this->streamId,
×
462
                            $ex->getMessage()
×
463
                        ));
×
464

465
                        $this->dropSubscription(SubscriptionDropReason::EventHandlerException, $ex);
×
466

467
                        return;
×
468
                    }
469
                }
470
            } while ($this->liveQueue->count() > 0);
13✔
471

472
            $this->isProcessing = false;
13✔
473
        })->await();
15✔
474
    }
475

476
    protected function dropSubscription(SubscriptionDropReason $reason, ?Throwable $error): void
477
    {
478
        if ($this->isDropped) {
21✔
479
            return;
3✔
480
        }
481

482
        $this->isDropped = true;
21✔
483

484
        if ($this->verbose) {
21✔
485
            $this->log->debug(\sprintf(
×
486
                'Catch-up Subscription %s to %s: dropped subscription, reason: %s %s',
×
487
                $this->subscriptionName,
×
488
                $this->isSubscribedToAll ? self::AllStream : $this->streamId,
×
489
                $reason->name,
×
490
                null === $error ? '' : $error->getMessage()
×
491
            ));
×
492
        }
493

494
        $this->subscription?->unsubscribe();
21✔
495

496
        if ($this->subscriptionDropped) {
21✔
497
            ($this->subscriptionDropped)($this, $reason, $error);
19✔
498
        }
499

500
        $this->stopped->complete(true);
21✔
501
    }
502
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc