• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

prooph / event-store-client / 9555551525

17 Jun 2024 10:16PM UTC coverage: 70.262% (-1.1%) from 71.395%
9555551525

push

github

prolic
update coveralls repo token

3466 of 4933 relevant lines covered (70.26%)

67.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.0
/src/Internal/OperationsManager.php
1
<?php
2

3
/**
4
 * This file is part of `prooph/event-store-client`.
5
 * (c) 2018-2024 Alexander Miertsch <kontakt@codeliner.ws>
6
 * (c) 2018-2024 Sascha-Oliver Prolic <saschaprolic@googlemail.com>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11

12
declare(strict_types=1);
13

14
namespace Prooph\EventStoreClient\Internal;
15

16
use Closure;
17
use Prooph\EventStore\Exception\ConnectionClosed;
18
use Prooph\EventStore\Exception\OperationTimedOut;
19
use Prooph\EventStore\Exception\RetriesLimitReached;
20
use Prooph\EventStore\Util\DateTime;
21
use Prooph\EventStore\Util\Guid;
22
use Prooph\EventStoreClient\ConnectionSettings;
23
use Prooph\EventStoreClient\Transport\Tcp\TcpPackageConnection;
24
use SplQueue;
25

26
/** @internal */
27
class OperationsManager
28
{
29
    private Closure $operationItemSeqNoComparer;
30

31
    private string $connectionName;
32

33
    private ConnectionSettings $settings;
34

35
    /** @var array<string, OperationItem> */
36
    private array $activeOperations = [];
37

38
    /** @var SplQueue<OperationItem> */
39
    private SplQueue $waitingOperations;
40

41
    /** @var list<OperationItem> */
42
    private array $retryPendingOperations = [];
43

44
    private int $totalOperationCount = 0;
45

46
    public function __construct(string $connectionName, ConnectionSettings $settings)
47
    {
48
        $this->connectionName = $connectionName;
420✔
49
        $this->settings = $settings;
420✔
50

51
        $this->operationItemSeqNoComparer = function (OperationItem $a, OperationItem $b): int {
420✔
52
            if ($a->segNo() === $b->segNo()) {
×
53
                return 0;
×
54
            }
55

56
            return ($a->segNo() < $b->segNo()) ? -1 : 1;
×
57
        };
420✔
58

59
        $this->waitingOperations = new SplQueue();
420✔
60
    }
61

62
    public function totalOperationCount(): int
63
    {
64
        return $this->totalOperationCount;
395✔
65
    }
66

67
    public function getActiveOperation(string $correlationId): ?OperationItem
68
    {
69
        return $this->activeOperations[$correlationId] ?? null;
396✔
70
    }
71

72
    public function cleanUp(): void
73
    {
74
        $closedConnectionException = ConnectionClosed::withName($this->connectionName);
408✔
75

76
        foreach ($this->activeOperations as $operationItem) {
408✔
77
            try {
78
                $operationItem->operation()->fail($closedConnectionException);
×
79
            } catch (\Error $e) {
×
80
                // ignore, future was already completed
81
            }
82
        }
83

84
        while (! $this->waitingOperations->isEmpty()) {
408✔
85
            $operationItem = $this->waitingOperations->dequeue();
×
86

87
            try {
88
                $operationItem->operation()->fail($closedConnectionException);
×
89
            } catch (\Error $e) {
×
90
                // ignore, future was already completed
91
            }
92
        }
93

94
        foreach ($this->retryPendingOperations as $operationItem) {
408✔
95
            try {
96
                $operationItem->operation()->fail($closedConnectionException);
×
97
            } catch (\Error $e) {
×
98
                // ignore, future was already completed
99
            }
100
        }
101

102
        $this->activeOperations = [];
408✔
103
        $this->retryPendingOperations = [];
408✔
104
        $this->totalOperationCount = 0;
408✔
105
    }
106

107
    public function checkTimeoutsAndRetry(TcpPackageConnection $connection): void
108
    {
109
        $retryOperations = [];
399✔
110
        $removeOperations = [];
399✔
111

112
        foreach ($this->activeOperations as $operation) {
399✔
113
            if ($operation->connectionId() !== $connection->connectionId()) {
12✔
114
                $retryOperations[] = $operation;
×
115
            } elseif ($operation->timeout() > 0
12✔
116
                && (float) DateTime::utcNow()->format('U.u') - (float) $operation->lastUpdated()->format('U.u') > $this->settings->operationTimeout()
12✔
117
            ) {
118
                $err = \sprintf(
×
119
                    'EventStoreNodeConnection \'%s\': subscription never got confirmation from server',
×
120
                    $connection->connectionId()
×
121
                );
×
122

123
                $this->settings->log()->error($err);
×
124

125
                if ($this->settings->failOnNoServerResponse()) {
×
126
                    $operation->operation()->fail(new OperationTimedOut($err));
×
127
                    $removeOperations[] = $operation;
×
128
                } else {
129
                    $retryOperations[] = $operation;
×
130
                }
131
            }
132
        }
133

134
        foreach ($retryOperations as $operation) {
399✔
135
            $this->scheduleOperationRetry($operation);
×
136
        }
137

138
        foreach ($removeOperations as $operation) {
399✔
139
            $this->removeOperation($operation);
×
140
        }
141

142
        if (\count($this->retryPendingOperations) > 0) {
399✔
143
            \usort($this->retryPendingOperations, $this->operationItemSeqNoComparer);
×
144

145
            foreach ($this->retryPendingOperations as $operation) {
×
146
                $oldCorrId = $operation->correlationId();
×
147
                $operation->setCorrelationId(Guid::generateAsHex());
×
148
                $operation->incRetryCount();
×
149
                $this->logDebug('retrying, old corrId %s, operation %s', $oldCorrId, (string) $operation);
×
150
                $this->scheduleOperation($operation, $connection);
×
151
            }
152

153
            $this->retryPendingOperations = [];
×
154
        }
155

156
        $this->tryScheduleWaitingOperations($connection);
399✔
157
    }
158

159
    public function scheduleOperationRetry(OperationItem $operation): void
160
    {
161
        if (! $this->removeOperation($operation)) {
×
162
            return;
×
163
        }
164

165
        $this->logDebug('ScheduleOperationRetry for %s', (string) $operation);
×
166
        if ($operation->maxRetries() >= 0 && $operation->retryCount() >= $operation->maxRetries()) {
×
167
            $operation->operation()->fail(
×
168
                RetriesLimitReached::with($operation->retryCount())
×
169
            );
×
170

171
            return;
×
172
        }
173

174
        $this->retryPendingOperations[] = $operation;
×
175
    }
176

177
    public function removeOperation(OperationItem $operation): bool
178
    {
179
        if (! isset($this->activeOperations[$operation->correlationId()])) {
394✔
180
            $this->logDebug('RemoveOperation FAILED for %s', (string) $operation);
×
181

182
            return false;
×
183
        }
184

185
        unset($this->activeOperations[$operation->correlationId()]);
394✔
186
        $this->logDebug('RemoveOperation SUCCEEDED for %s', (string) $operation);
394✔
187

188
        return true;
394✔
189
    }
190

191
    public function tryScheduleWaitingOperations(TcpPackageConnection $connection): void
192
    {
193
        while (! $this->waitingOperations->isEmpty()
399✔
194
            && \count($this->activeOperations) < $this->settings->maxConcurrentItems()
399✔
195
        ) {
196
            $this->executeOperation($this->waitingOperations->dequeue(), $connection);
394✔
197
        }
198

199
        $this->totalOperationCount = \count($this->activeOperations) + \count($this->waitingOperations);
399✔
200
    }
201

202
    public function executeOperation(OperationItem $operation, TcpPackageConnection $connection): void
203
    {
204
        $operation->setConnectionId($connection->connectionId());
394✔
205
        $operation->setLastUpdated(DateTime::utcNow());
394✔
206

207
        $correlationId = $operation->correlationId();
394✔
208
        $this->activeOperations[$correlationId] = $operation;
394✔
209

210
        $package = $operation->operation()->createNetworkPackage($correlationId);
394✔
211

212
        $this->logDebug(
394✔
213
            'ExecuteOperation package %s, %s, %s',
394✔
214
            $package->command()->name,
394✔
215
            $package->correlationId(),
394✔
216
            (string) $operation
394✔
217
        );
394✔
218
        $connection->enqueueSend($package);
394✔
219
    }
220

221
    public function enqueueOperation(OperationItem $operation): void
222
    {
223
        $this->logDebug('EnqueueOperation WAITING for %s', (string) $operation);
390✔
224
        $this->waitingOperations->enqueue($operation);
390✔
225
    }
226

227
    public function scheduleOperation(OperationItem $operation, TcpPackageConnection $connection): void
228
    {
229
        $this->waitingOperations->enqueue($operation);
364✔
230
        $this->tryScheduleWaitingOperations($connection);
364✔
231
    }
232

233
    private function logDebug(string $message, string ...$parameters): void
234
    {
235
        if ($this->settings->verboseLogging()) {
394✔
236
            $message = empty($parameters)
×
237
                ? $message
×
238
                : \sprintf($message, ...$parameters);
×
239

240
            $this->settings->log()->debug(\sprintf(
×
241
                'EventStoreNodeConnection \'%s\': %s',
×
242
                $this->connectionName,
×
243
                $message
×
244
            ));
×
245
        }
246
    }
247
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc