• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

prooph / event-store-client / 9555551525

17 Jun 2024 10:16PM UTC coverage: 70.262% (-1.1%) from 71.395%
9555551525

push

github

prolic
update coveralls repo token

3466 of 4933 relevant lines covered (70.26%)

67.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.67
/src/Internal/EventStoreStreamCatchUpSubscription.php
1
<?php
2

3
/**
4
 * This file is part of `prooph/event-store-client`.
5
 * (c) 2018-2024 Alexander Miertsch <kontakt@codeliner.ws>
6
 * (c) 2018-2024 Sascha-Oliver Prolic <saschaprolic@googlemail.com>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11

12
declare(strict_types=1);
13

14
namespace Prooph\EventStoreClient\Internal;
15

16
use function Amp\async;
17
use function Amp\delay;
18

19
use Closure;
20
use Exception;
21
use Prooph\EventStore\CatchUpSubscriptionSettings;
22
use Prooph\EventStore\EventStoreConnection;
23
use Prooph\EventStore\EventStoreStreamCatchUpSubscription as EventStoreStreamCatchUpSubscriptionInterface;
24
use Prooph\EventStore\Exception\StreamDeleted;
25
use Prooph\EventStore\ResolvedEvent;
26
use Prooph\EventStore\SliceReadStatus;
27
use Prooph\EventStore\StreamEventsSlice;
28
use Prooph\EventStore\SubscriptionDropReason;
29
use Prooph\EventStore\UserCredentials;
30
use Psr\Log\LoggerInterface as Logger;
31
use Throwable;
32

33
class EventStoreStreamCatchUpSubscription extends EventStoreCatchUpSubscription implements EventStoreStreamCatchUpSubscriptionInterface
34
{
35
    private int $nextReadEventNumber;
36

37
    private int $lastProcessedEventNumber;
38

39
    /**
40
     * @param Closure(EventStoreCatchUpSubscription, ResolvedEvent): void $eventAppeared
41
     * @param null|Closure(EventStoreCatchUpSubscription): void $liveProcessingStarted
42
     * @param null|Closure(EventStoreCatchUpSubscription, SubscriptionDropReason, null|Throwable): void $subscriptionDropped
43
     *
44
     * @internal
45
     */
46
    public function __construct(
47
        EventStoreConnection $connection,
48
        Logger $logger,
49
        string $streamId,
50
        ?int $fromEventNumberExclusive, // if null from the very beginning
51
        ?UserCredentials $userCredentials,
52
        Closure $eventAppeared,
53
        ?Closure $liveProcessingStarted,
54
        ?Closure $subscriptionDropped,
55
        CatchUpSubscriptionSettings $settings
56
    ) {
57
        parent::__construct(
18✔
58
            $connection,
18✔
59
            $logger,
18✔
60
            $streamId,
18✔
61
            $userCredentials,
18✔
62
            $eventAppeared,
18✔
63
            $liveProcessingStarted,
18✔
64
            $subscriptionDropped,
18✔
65
            $settings
18✔
66
        );
18✔
67

68
        $this->lastProcessedEventNumber = $fromEventNumberExclusive ?? -1;
18✔
69
        $this->nextReadEventNumber = $fromEventNumberExclusive ?? 0;
18✔
70
    }
71

72
    public function lastProcessedEventNumber(): int
73
    {
74
        return $this->lastProcessedEventNumber;
2✔
75
    }
76

77
    protected function readEventsTill(
78
        EventStoreConnection $connection,
79
        bool $resolveLinkTos,
80
        ?UserCredentials $userCredentials,
81
        ?int $lastCommitPosition,
82
        ?int $lastEventNumber
83
    ): void {
84
        do {
85
            $slice = $connection->readStreamEventsForward(
18✔
86
                $this->streamId(),
18✔
87
                $this->nextReadEventNumber,
18✔
88
                $this->readBatchSize,
18✔
89
                $resolveLinkTos,
18✔
90
                $userCredentials
18✔
91
            );
18✔
92

93
            $shouldStopOrDone = $this->readEventsCallback($slice, $lastEventNumber);
16✔
94
        } while (! $shouldStopOrDone);
16✔
95
    }
96

97
    private function readEventsCallback(StreamEventsSlice $slice, ?int $lastEventNumber): bool
98
    {
99
        $shouldStopOrDone = $this->shouldStop || $this->processEvents($lastEventNumber, $slice);
16✔
100

101
        if ($shouldStopOrDone && $this->verbose) {
16✔
102
            $this->log->debug(\sprintf(
×
103
                'Catch-up Subscription %s to %s: finished reading events, nextReadEventNumber = %d',
×
104
                $this->subscriptionName(),
×
105
                $this->isSubscribedToAll() ? self::AllStream : $this->streamId(),
×
106
                $this->nextReadEventNumber
×
107
            ));
×
108
        }
109

110
        return $shouldStopOrDone;
16✔
111
    }
112

113
    private function processEvents(?int $lastEventNumber, StreamEventsSlice $slice): bool
114
    {
115
        return async(function () use ($lastEventNumber, $slice): bool {
16✔
116
            switch ($slice->status()) {
16✔
117
                case SliceReadStatus::Success:
16✔
118
                    foreach ($slice->events() as $e) {
12✔
119
                        $this->tryProcess($e);
12✔
120
                    }
121
                    $this->nextReadEventNumber = $slice->nextEventNumber();
12✔
122
                    $done = (null === $lastEventNumber) ? $slice->isEndOfStream() : $slice->nextEventNumber() > $lastEventNumber;
12✔
123

124
                    break;
12✔
125
                case SliceReadStatus::StreamNotFound:
4✔
126
                    if (null !== $lastEventNumber && $lastEventNumber !== -1) {
4✔
127
                        throw new \Exception(\sprintf(
×
128
                            'Impossible: stream %s disappeared in the middle of catching up subscription %s',
×
129
                            $this->streamId(),
×
130
                            $this->subscriptionName()
×
131
                        ));
×
132
                    }
133

134
                    $done = true;
4✔
135

136
                    break;
4✔
137
                case SliceReadStatus::StreamDeleted:
×
138
                    throw StreamDeleted::with($this->streamId());
×
139
            }
140

141
            if (! $done && $slice->isEndOfStream()) {
16✔
142
                delay(1);
×
143
            }
144

145
            return $done;
16✔
146
        })->await();
16✔
147
    }
148

149
    protected function tryProcess(ResolvedEvent $e): void
150
    {
151
        $processed = false;
14✔
152

153
        if ($e->originalEventNumber() > $this->lastProcessedEventNumber) {
14✔
154
            try {
155
                ($this->eventAppeared)($this, $e);
14✔
156
            } catch (Exception $ex) {
1✔
157
                $this->dropSubscription(SubscriptionDropReason::EventHandlerException, $ex);
1✔
158
            }
159

160
            $this->lastProcessedEventNumber = $e->originalEventNumber();
14✔
161
            $processed = true;
14✔
162
        }
163

164
        if ($this->verbose) {
14✔
165
            /** @psalm-suppress PossiblyNullReference */
166
            $this->log->debug(\sprintf(
×
167
                'Catch-up Subscription %s to %s: %s event (%s, %d, %s @ %d)',
×
168
                $this->subscriptionName(),
×
169
                $this->isSubscribedToAll() ? self::AllStream : $this->streamId(),
×
170
                $processed ? 'processed' : 'skipping',
×
171
                $e->originalEvent()->eventStreamId(),
×
172
                $e->originalEvent()->eventNumber(),
×
173
                $e->originalEvent()->eventType(),
×
174
                $e->originalEventNumber()
×
175
            ));
×
176
        }
177
    }
178
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc