• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

prooph / event-store-client / 9555551525

17 Jun 2024 10:16PM UTC coverage: 70.262% (-1.1%) from 71.395%
9555551525

push

github

prolic
update coveralls repo token

3466 of 4933 relevant lines covered (70.26%)

67.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

35.64
/src/Internal/SubscriptionsManager.php
1
<?php
2

3
/**
4
 * This file is part of `prooph/event-store-client`.
5
 * (c) 2018-2024 Alexander Miertsch <kontakt@codeliner.ws>
6
 * (c) 2018-2024 Sascha-Oliver Prolic <saschaprolic@googlemail.com>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11

12
declare(strict_types=1);
13

14
namespace Prooph\EventStoreClient\Internal;
15

16
use Prooph\EventStore\Exception\ConnectionClosed;
17
use Prooph\EventStore\Exception\OperationTimedOut;
18
use Prooph\EventStore\Exception\RetriesLimitReached;
19
use Prooph\EventStore\SubscriptionDropReason;
20
use Prooph\EventStore\Util\DateTime;
21
use Prooph\EventStore\Util\Guid;
22
use Prooph\EventStoreClient\ConnectionSettings;
23
use Prooph\EventStoreClient\Transport\Tcp\TcpPackageConnection;
24
use SplQueue;
25

26
/** @internal  */
27
class SubscriptionsManager
28
{
29
    /** @var array<string, SubscriptionItem> */
30
    private array $activeSubscriptions = [];
31

32
    private SplQueue $waitingSubscriptions;
33

34
    /** @var list<SubscriptionItem> */
35
    private array $retryPendingSubscriptions = [];
36

37
    public function __construct(private string $connectionName, private ConnectionSettings $settings)
38
    {
39
        $this->waitingSubscriptions = new SplQueue();
420✔
40
    }
41

42
    public function getActiveSubscription(string $correlationId): ?SubscriptionItem
43
    {
44
        return $this->activeSubscriptions[$correlationId] ?? null;
87✔
45
    }
46

47
    public function cleanUp(): void
48
    {
49
        $connectionClosedException = ConnectionClosed::withName($this->connectionName);
408✔
50

51
        foreach ($this->activeSubscriptions as $subscriptionItem) {
408✔
52
            $subscriptionItem->operation()->dropSubscription(
72✔
53
                SubscriptionDropReason::ConnectionClosed,
72✔
54
                $connectionClosedException
72✔
55
            );
72✔
56
        }
57

58
        while (! $this->waitingSubscriptions->isEmpty()) {
408✔
59
            $subscriptionItem = $this->waitingSubscriptions->dequeue();
×
60
            \assert($subscriptionItem instanceof SubscriptionItem);
61
            $subscriptionItem->operation()->dropSubscription(
×
62
                SubscriptionDropReason::ConnectionClosed,
×
63
                $connectionClosedException
×
64
            );
×
65
        }
66

67
        foreach ($this->retryPendingSubscriptions as $subscriptionItem) {
408✔
68
            $subscriptionItem->operation()->dropSubscription(
×
69
                SubscriptionDropReason::ConnectionClosed,
×
70
                $connectionClosedException
×
71
            );
×
72
        }
73

74
        $this->activeSubscriptions = [];
408✔
75
        $this->retryPendingSubscriptions = [];
408✔
76
    }
77

78
    public function purgeSubscribedAndDroppedSubscriptions(string $connectionId): void
79
    {
80
        $subscriptionsToRemove = new SplQueue();
×
81

82
        foreach ($this->activeSubscriptions as $subscriptionItem) {
×
83
            if ($subscriptionItem->connectionId() !== $connectionId) {
×
84
                continue;
×
85
            }
86

87
            $subscriptionItem->operation()->connectionClosed();
×
88
            $subscriptionsToRemove->enqueue($subscriptionItem);
×
89
        }
90

91
        while (! $subscriptionsToRemove->isEmpty()) {
×
92
            $subscriptionItem = $subscriptionsToRemove->dequeue();
×
93
            \assert($subscriptionItem instanceof SubscriptionItem);
94
            unset($this->activeSubscriptions[$subscriptionItem->correlationId()]);
×
95
        }
96
    }
97

98
    public function checkTimeoutsAndRetry(TcpPackageConnection $connection): void
99
    {
100
        $retrySubscriptions = new SplQueue();
399✔
101
        $removeSubscriptions = new SplQueue();
399✔
102

103
        foreach ($this->activeSubscriptions as $subscription) {
399✔
104
            if ($subscription->isSubscribed()) {
8✔
105
                continue;
8✔
106
            }
107

108
            if ($subscription->connectionId() !== $connection->connectionId()) {
×
109
                $this->retryPendingSubscriptions[] = $subscription;
×
110
            } elseif ($subscription->timeout() > 0
×
111
                && (float) DateTime::utcNow()->format('U.u') - (float) $subscription->lastUpdated()->format('U.u') > $this->settings->operationTimeout()
×
112
            ) {
113
                $err = \sprintf(
×
114
                    'EventStoreNodeConnection \'%s\': subscription never got confirmation from server',
×
115
                    $connection->connectionId()
×
116
                );
×
117

118
                $this->settings->log()->error($err);
×
119

120
                if ($this->settings->failOnNoServerResponse()) {
×
121
                    $subscription->operation()->dropSubscription(
×
122
                        SubscriptionDropReason::SubscribingError,
×
123
                        new OperationTimedOut($err)
×
124
                    );
×
125
                    $removeSubscriptions->enqueue($subscription);
×
126
                } else {
127
                    $retrySubscriptions->enqueue($subscription);
×
128
                }
129
            }
130
        }
131

132
        while (! $retrySubscriptions->isEmpty()) {
399✔
133
            $this->scheduleSubscriptionRetry($retrySubscriptions->dequeue());
×
134
        }
135

136
        while (! $removeSubscriptions->isEmpty()) {
399✔
137
            $this->removeSubscription($removeSubscriptions->dequeue());
×
138
        }
139

140
        if (\count($this->retryPendingSubscriptions) > 0) {
399✔
141
            foreach ($this->retryPendingSubscriptions as $subscription) {
×
142
                $subscription->incRetryCount();
×
143
                $this->startSubscription($subscription, $connection);
×
144
            }
145

146
            $this->retryPendingSubscriptions = [];
×
147
        }
148

149
        while (! $this->waitingSubscriptions->isEmpty()) {
399✔
150
            $this->startSubscription($this->waitingSubscriptions->dequeue(), $connection);
23✔
151
        }
152
    }
153

154
    public function removeSubscription(SubscriptionItem $subscription): bool
155
    {
156
        $result = isset($this->activeSubscriptions[$subscription->correlationId()]);
19✔
157
        $this->logDebug('RemoveSubscription %s, result %s', (string) $subscription, $result ? 'yes' : 'no');
19✔
158
        unset($this->activeSubscriptions[$subscription->correlationId()]);
19✔
159

160
        return $result;
19✔
161
    }
162

163
    public function scheduleSubscriptionRetry(SubscriptionItem $subscription): void
164
    {
165
        if (! $this->removeSubscription($subscription)) {
×
166
            $this->logDebug('RemoveSubscription failed when trying to retry %s', (string) $subscription);
×
167

168
            return;
×
169
        }
170

171
        if ($subscription->maxRetries() >= 0 && $subscription->retryCount() >= $subscription->maxRetries()) {
×
172
            $this->logDebug('RETRIES LIMIT REACHED when trying to retry %s', (string) $subscription);
×
173
            $subscription->operation()->dropSubscription(
×
174
                SubscriptionDropReason::SubscribingError,
×
175
                RetriesLimitReached::with($subscription->retryCount())
×
176
            );
×
177

178
            return;
×
179
        }
180

181
        $this->logDebug('retrying subscription %s', (string) $subscription);
×
182
        $this->retryPendingSubscriptions[] = $subscription;
×
183
    }
184

185
    public function enqueueSubscription(SubscriptionItem $subscriptionItem): void
186
    {
187
        $this->waitingSubscriptions->enqueue($subscriptionItem);
23✔
188
    }
189

190
    public function startSubscription(SubscriptionItem $subscription, TcpPackageConnection $connection): void
191
    {
192
        if ($subscription->isSubscribed()) {
87✔
193
            $this->logDebug('StartSubscription REMOVING due to already subscribed %s', (string) $subscription);
×
194
            $this->removeSubscription($subscription);
×
195

196
            return;
×
197
        }
198

199
        $correlationId = Guid::generateAsHex();
87✔
200
        $subscription->setCorrelationId($correlationId);
87✔
201
        $subscription->setConnectionId($connection->connectionId());
87✔
202
        $subscription->setLastUpdated(DateTime::utcNow());
87✔
203

204
        $this->activeSubscriptions[$correlationId] = $subscription;
87✔
205

206
        if (! $subscription->operation()->subscribe($correlationId, $connection)) {
87✔
207
            $this->logDebug('StartSubscription REMOVING AS COULD NOT SUBSCRIBE %s', (string) $subscription);
×
208
            $this->removeSubscription($subscription);
×
209
        }
210
        $this->logDebug('StartSubscription SUBSCRIBING %s', (string) $subscription);
87✔
211
    }
212

213
    private function logDebug(string $message, string ...$parameters): void
214
    {
215
        if ($this->settings->verboseLogging()) {
87✔
216
            $message = empty($parameters)
×
217
                ? $message
×
218
                : \sprintf($message, ...$parameters);
×
219

220
            $this->settings->log()->debug(\sprintf(
×
221
                'EventStoreNodeConnection \'%s\': %s',
×
222
                $this->connectionName,
×
223
                $message
×
224
            ));
×
225
        }
226
    }
227
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc