• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

prooph / event-store-client / 26311784535

22 May 2026 09:01PM UTC coverage: 70.061% (-0.05%) from 70.107%
26311784535

push

github

prolic
Fix Event loop terminated without resuming the current suspension

No need to unreference timerTick, it should be executed as long as EventLoop is running

3454 of 4930 relevant lines covered (70.06%)

67.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

33.66
/src/Internal/SubscriptionsManager.php
1
<?php
2

3
/**
4
 * This file is part of `prooph/event-store-client`.
5
 * (c) 2018-2024 Alexander Miertsch <kontakt@codeliner.ws>
6
 * (c) 2018-2024 Sascha-Oliver Prolic <saschaprolic@googlemail.com>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11

12
declare(strict_types=1);
13

14
namespace Prooph\EventStoreClient\Internal;
15

16
use Prooph\EventStore\Exception\ConnectionClosed;
17
use Prooph\EventStore\Exception\OperationTimedOut;
18
use Prooph\EventStore\Exception\RetriesLimitReached;
19
use Prooph\EventStore\SubscriptionDropReason;
20
use Prooph\EventStore\Util\DateTime;
21
use Prooph\EventStore\Util\Guid;
22
use Prooph\EventStoreClient\ConnectionSettings;
23
use Prooph\EventStoreClient\Transport\Tcp\TcpPackageConnection;
24
use SplQueue;
25

26
/** @internal  */
27
class SubscriptionsManager
28
{
29
    /** @var array<string, SubscriptionItem> */
30
    private array $activeSubscriptions = [];
31

32
    private SplQueue $waitingSubscriptions;
33

34
    /** @var list<SubscriptionItem> */
35
    private array $retryPendingSubscriptions = [];
36

37
    public function __construct(private string $connectionName, private ConnectionSettings $settings)
38
    {
39
        $this->waitingSubscriptions = new SplQueue();
420✔
40
    }
41

42
    public function getActiveSubscription(string $correlationId): ?SubscriptionItem
43
    {
44
        return $this->activeSubscriptions[$correlationId] ?? null;
87✔
45
    }
46

47
    public function cleanUp(): void
48
    {
49
        $connectionClosedException = ConnectionClosed::withName($this->connectionName);
408✔
50

51
        foreach ($this->activeSubscriptions as $subscriptionItem) {
408✔
52
            $subscriptionItem->operation()->dropSubscription(
72✔
53
                SubscriptionDropReason::ConnectionClosed,
72✔
54
                $connectionClosedException
72✔
55
            );
72✔
56
        }
57

58
        while (! $this->waitingSubscriptions->isEmpty()) {
408✔
59
            $subscriptionItem = $this->waitingSubscriptions->dequeue();
×
60
            \assert($subscriptionItem instanceof SubscriptionItem);
61
            $subscriptionItem->operation()->dropSubscription(
×
62
                SubscriptionDropReason::ConnectionClosed,
×
63
                $connectionClosedException
×
64
            );
×
65
        }
66

67
        foreach ($this->retryPendingSubscriptions as $subscriptionItem) {
408✔
68
            $subscriptionItem->operation()->dropSubscription(
×
69
                SubscriptionDropReason::ConnectionClosed,
×
70
                $connectionClosedException
×
71
            );
×
72
        }
73

74
        $this->activeSubscriptions = [];
408✔
75
        $this->retryPendingSubscriptions = [];
408✔
76
    }
77

78
    public function purgeSubscribedAndDroppedSubscriptions(string $connectionId): void
79
    {
80
        $subscriptionsToRemove = new SplQueue();
×
81

82
        foreach ($this->activeSubscriptions as $subscriptionItem) {
×
83
            if ($subscriptionItem->connectionId() !== $connectionId) {
×
84
                continue;
×
85
            }
86

87
            $subscriptionItem->operation()->connectionClosed();
×
88
            $subscriptionsToRemove->enqueue($subscriptionItem);
×
89
        }
90

91
        while (! $subscriptionsToRemove->isEmpty()) {
×
92
            $subscriptionItem = $subscriptionsToRemove->dequeue();
×
93
            \assert($subscriptionItem instanceof SubscriptionItem);
94
            unset($this->activeSubscriptions[$subscriptionItem->correlationId()]);
×
95
        }
96
    }
97

98
    public function checkTimeoutsAndRetry(TcpPackageConnection $connection): void
99
    {
100
        $retrySubscriptions = new SplQueue();
399✔
101
        $removeSubscriptions = new SplQueue();
399✔
102

103
        foreach ($this->activeSubscriptions as $subscription) {
399✔
104
            if ($subscription->isSubscribed()) {
×
105
                continue;
×
106
            }
107

108
            if ($subscription->connectionId() !== $connection->connectionId()) {
×
109
                $this->retryPendingSubscriptions[] = $subscription;
×
110
            } elseif ($subscription->timeout() > 0
×
111
                && (float) DateTime::utcNow()->format('U.u') - (float) $subscription->lastUpdated()->format('U.u') > $this->settings->operationTimeout()
×
112
            ) {
113
                $err = \sprintf(
×
114
                    'EventStoreNodeConnection \'%s\': subscription never got confirmation from server',
×
115
                    $connection->connectionId()
×
116
                );
×
117

118
                $this->settings->log()->error($err);
×
119

120
                if ($this->settings->failOnNoServerResponse()) {
×
121
                    $subscription->operation()->dropSubscription(
×
122
                        SubscriptionDropReason::SubscribingError,
×
123
                        new OperationTimedOut($err)
×
124
                    );
×
125
                    $removeSubscriptions->enqueue($subscription);
×
126
                } else {
127
                    $retrySubscriptions->enqueue($subscription);
×
128
                }
129
            }
130
        }
131

132
        while (! $retrySubscriptions->isEmpty()) {
399✔
133
            $this->scheduleSubscriptionRetry($retrySubscriptions->dequeue());
×
134
        }
135

136
        while (! $removeSubscriptions->isEmpty()) {
399✔
137
            $this->removeSubscription($removeSubscriptions->dequeue());
×
138
        }
139

140
        if (\count($this->retryPendingSubscriptions) > 0) {
399✔
141
            foreach ($this->retryPendingSubscriptions as $subscription) {
×
142
                $subscription->incRetryCount();
×
143
                $this->startSubscription($subscription, $connection);
×
144
            }
145

146
            $this->retryPendingSubscriptions = [];
×
147
        }
148

149
        while (! $this->waitingSubscriptions->isEmpty()) {
399✔
150
            $this->startSubscription($this->waitingSubscriptions->dequeue(), $connection);
23✔
151
        }
152
    }
153

154
    public function removeSubscription(SubscriptionItem $subscription): bool
155
    {
156
        $result = isset($this->activeSubscriptions[$subscription->correlationId()]);
18✔
157
        $this->logDebug('RemoveSubscription %s, result %s', (string) $subscription, $result ? 'yes' : 'no');
18✔
158
        unset($this->activeSubscriptions[$subscription->correlationId()]);
18✔
159

160
        return $result;
18✔
161
    }
162

163
    public function scheduleSubscriptionRetry(SubscriptionItem $subscription): void
164
    {
165
        if (! $this->removeSubscription($subscription)) {
×
166
            $this->logDebug('RemoveSubscription failed when trying to retry %s', (string) $subscription);
×
167

168
            return;
×
169
        }
170

171
        if ($subscription->maxRetries() >= 0 && $subscription->retryCount() >= $subscription->maxRetries()) {
×
172
            $this->logDebug('RETRIES LIMIT REACHED when trying to retry %s', (string) $subscription);
×
173
            $subscription->operation()->dropSubscription(
×
174
                SubscriptionDropReason::SubscribingError,
×
175
                RetriesLimitReached::with($subscription->retryCount())
×
176
            );
×
177

178
            return;
×
179
        }
180

181
        $this->logDebug('retrying subscription %s', (string) $subscription);
×
182
        $this->retryPendingSubscriptions[] = $subscription;
×
183
    }
184

185
    public function enqueueSubscription(SubscriptionItem $subscriptionItem): void
186
    {
187
        $this->waitingSubscriptions->enqueue($subscriptionItem);
23✔
188
    }
189

190
    public function startSubscription(SubscriptionItem $subscription, TcpPackageConnection $connection): void
191
    {
192
        if ($subscription->isSubscribed()) {
87✔
193
            $this->logDebug('StartSubscription REMOVING due to already subscribed %s', (string) $subscription);
×
194
            $this->removeSubscription($subscription);
×
195

196
            return;
×
197
        }
198

199
        $correlationId = Guid::generateAsHex();
87✔
200
        $subscription->setCorrelationId($correlationId);
87✔
201
        $subscription->setConnectionId($connection->connectionId());
87✔
202
        $subscription->setLastUpdated(DateTime::utcNow());
87✔
203

204
        $this->activeSubscriptions[$correlationId] = $subscription;
87✔
205

206
        if (! $subscription->operation()->subscribe($correlationId, $connection)) {
87✔
207
            $this->logDebug('StartSubscription REMOVING AS COULD NOT SUBSCRIBE %s', (string) $subscription);
×
208
            $this->removeSubscription($subscription);
×
209
        }
210
        $this->logDebug('StartSubscription SUBSCRIBING %s', (string) $subscription);
87✔
211
    }
212

213
    private function logDebug(string $message, string ...$parameters): void
214
    {
215
        if ($this->settings->verboseLogging()) {
87✔
216
            $message = empty($parameters)
×
217
                ? $message
×
218
                : \sprintf($message, ...$parameters);
×
219

220
            $this->settings->log()->debug(\sprintf(
×
221
                'EventStoreNodeConnection \'%s\': %s',
×
222
                $this->connectionName,
×
223
                $message
×
224
            ));
×
225
        }
226
    }
227
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc