• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

prooph / event-store-client / 9555551525

17 Jun 2024 10:16PM UTC coverage: 70.262% (-1.1%) from 71.395%
9555551525

push

github

prolic
update coveralls repo token

3466 of 4933 relevant lines covered (70.26%)

67.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.67
/src/Internal/EventStoreAllCatchUpSubscription.php
1
<?php
2

3
/**
4
 * This file is part of `prooph/event-store-client`.
5
 * (c) 2018-2024 Alexander Miertsch <kontakt@codeliner.ws>
6
 * (c) 2018-2024 Sascha-Oliver Prolic <saschaprolic@googlemail.com>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11

12
declare(strict_types=1);
13

14
namespace Prooph\EventStoreClient\Internal;
15

16
use function Amp\async;
17
use function Amp\delay;
18

19
use Closure;
20
use Exception;
21
use Prooph\EventStore\AllEventsSlice;
22
use Prooph\EventStore\CatchUpSubscriptionSettings;
23
use Prooph\EventStore\EventStoreAllCatchUpSubscription as EventStoreAllCatchUpSubscriptionInterface;
24
use Prooph\EventStore\EventStoreConnection;
25
use Prooph\EventStore\Position;
26
use Prooph\EventStore\ResolvedEvent;
27
use Prooph\EventStore\SubscriptionDropReason;
28
use Prooph\EventStore\UserCredentials;
29
use Psr\Log\LoggerInterface as Logger;
30
use Throwable;
31

32
class EventStoreAllCatchUpSubscription extends EventStoreCatchUpSubscription implements EventStoreAllCatchUpSubscriptionInterface
33
{
34
    private Position $nextReadPosition;
35

36
    private Position $lastProcessedPosition;
37

38
    /**
39
     * @internal
40
     *
41
     * @param Closure(EventStoreCatchUpSubscription, ResolvedEvent): void $eventAppeared
42
     * @param null|Closure(EventStoreCatchUpSubscription): void $liveProcessingStarted
43
     * @param null|Closure(EventStoreCatchUpSubscription, SubscriptionDropReason, null|Throwable): void $subscriptionDropped
44
     */
45
    public function __construct(
46
        EventStoreConnection $connection,
47
        Logger $logger,
48
        ?Position $fromPositionExclusive, // if null from the very beginning
49
        ?UserCredentials $userCredentials,
50
        Closure $eventAppeared,
51
        ?Closure $liveProcessingStarted,
52
        ?Closure $subscriptionDropped,
53
        CatchUpSubscriptionSettings $settings
54
    ) {
55
        parent::__construct(
4✔
56
            $connection,
4✔
57
            $logger,
4✔
58
            '',
4✔
59
            $userCredentials,
4✔
60
            $eventAppeared,
4✔
61
            $liveProcessingStarted,
4✔
62
            $subscriptionDropped,
4✔
63
            $settings
4✔
64
        );
4✔
65

66
        $this->lastProcessedPosition = $fromPositionExclusive ?? Position::end();
4✔
67
        $this->nextReadPosition = $fromPositionExclusive ?? Position::start();
4✔
68
    }
69

70
    public function lastProcessedPosition(): Position
71
    {
72
        return $this->lastProcessedPosition;
×
73
    }
74

75
    protected function readEventsTill(
76
        EventStoreConnection $connection,
77
        bool $resolveLinkTos,
78
        ?UserCredentials $userCredentials,
79
        ?int $lastCommitPosition,
80
        ?int $lastEventNumber
81
    ): void {
82
        async(function () use ($connection, $resolveLinkTos, $userCredentials, $lastCommitPosition): void {
4✔
83
            do {
84
                $slice = $connection->readAllEventsForward(
4✔
85
                    $this->nextReadPosition,
4✔
86
                    $this->readBatchSize,
4✔
87
                    $resolveLinkTos,
4✔
88
                    $userCredentials
4✔
89
                );
4✔
90

91
                $shouldStopOrDone = $this->readEventsCallback($slice, $lastCommitPosition);
4✔
92
            } while (! $shouldStopOrDone);
4✔
93
        })->await();
4✔
94
    }
95

96
    private function readEventsCallback(AllEventsSlice $slice, ?int $lastCommitPosition): bool
97
    {
98
        $shouldStopOrDone = $this->shouldStop || $this->processEvents($lastCommitPosition, $slice);
4✔
99

100
        if ($shouldStopOrDone && $this->verbose) {
4✔
101
            $this->log->debug(\sprintf(
×
102
                'Catch-up Subscription %s to %s: finished reading events, nextReadPosition = %s',
×
103
                $this->subscriptionName(),
×
104
                $this->isSubscribedToAll() ? '<all>' : $this->streamId(),
×
105
                (string) $this->nextReadPosition
×
106
            ));
×
107
        }
108

109
        return $shouldStopOrDone;
4✔
110
    }
111

112
    private function processEvents(?int $lastCommitPosition, AllEventsSlice $slice): bool
113
    {
114
        return async(function () use ($lastCommitPosition, $slice): bool {
4✔
115
            foreach ($slice->events() as $e) {
4✔
116
                if (null === $e->originalPosition()) {
4✔
117
                    throw new \Exception(\sprintf(
×
118
                        'Subscription %s event came up with no OriginalPosition',
×
119
                        $this->subscriptionName()
×
120
                    ));
×
121
                }
122

123
                $this->tryProcess($e);
4✔
124
            }
125

126
            $this->nextReadPosition = $slice->nextPosition();
4✔
127

128
            $done = (null === $lastCommitPosition)
4✔
129
                ? $slice->isEndOfStream()
4✔
130
                : $slice->nextPosition()->greaterOrEquals(new Position($lastCommitPosition, $lastCommitPosition));
4✔
131

132
            if (! $done && $slice->isEndOfStream()) {
4✔
133
                // we are waiting for server to flush its data
134
                delay(0.01);
×
135
            }
136

137
            return $done;
4✔
138
        })->await();
4✔
139
    }
140

141
    protected function tryProcess(ResolvedEvent $e): void
142
    {
143
        $processed = false;
4✔
144

145
        /** @psalm-suppress PossiblyNullReference */
146
        if ($e->originalPosition()->greater($this->lastProcessedPosition)) {
4✔
147
            try {
148
                ($this->eventAppeared)($this, $e);
4✔
149
            } catch (Exception $ex) {
2✔
150
                $this->dropSubscription(SubscriptionDropReason::EventHandlerException, $ex);
2✔
151
            }
152

153
            $this->lastProcessedPosition = $e->originalPosition();
4✔
154
            $processed = true;
4✔
155
        }
156

157
        if ($this->verbose) {
4✔
158
            /** @psalm-suppress PossiblyNullReference */
159
            $this->log->debug(\sprintf(
×
160
                'Catch-up Subscription %s to %s: %s event (%s, %d, %s @ %s)',
×
161
                $this->subscriptionName(),
×
162
                $this->isSubscribedToAll() ? '<all>' : $this->streamId(),
×
163
                $processed ? 'processed' : 'skipping',
×
164
                $e->originalEvent()->eventStreamId(),
×
165
                $e->originalEvent()->eventNumber(),
×
166
                $e->originalEvent()->eventType(),
×
167
                $e->originalPosition() ? $e->originalPosition()->__toString() : '<null>'
×
168
            ));
×
169
        }
170
    }
171
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc