• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

predis / predis / 20455852140

23 Dec 2025 08:40AM UTC coverage: 92.615% (-0.2%) from 92.788%
20455852140

push

github

web-flow
Added retry support (#1616)

* Initial work on retries

* Added retry class and test coverage

* Added support for standalone and cluster

* Make TimeoutException instance of CommunicationException

* Added pipeline, trasnaction, replication support

* Fixed broken test

* Marked test as relay-incompatible

* Marked test as relay-incompatible

* Fixed analysis errors, added missing tests

* Codestyle fixes

* Fixed test

* Update README.md

* Update README.md

* Update README.md

* Updated README.md

* Refactor retry on read and write

* Added check for timeout value

* Updated README.md

* Fixed README.md

* Codestyle changes

* Added missing coverage

* Added missing test coverage

* Removed comments

* Added retry support for Relay connection (#1620)

* Added integration test case with mocked retry

* Changed client initialisation in tests

* Marked test as relay-incompatible

---------

Co-authored-by: Pavlo Yatsukhnenko <yatsukhnenko@users.noreply.github.com>

301 of 358 new or added lines in 18 files covered. (84.08%)

2 existing lines in 2 files now uncovered.

8214 of 8869 relevant lines covered (92.61%)

112.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.46
/src/Connection/Cluster/RedisCluster.php
1
<?php
2

3
/*
4
 * This file is part of the Predis package.
5
 *
6
 * (c) 2009-2020 Daniele Alessandri
7
 * (c) 2021-2025 Till Krüss
8
 *
9
 * For the full copyright and license information, please view the LICENSE
10
 * file that was distributed with this source code.
11
 */
12

13
namespace Predis\Connection\Cluster;
14

15
use ArrayIterator;
16
use Countable;
17
use IteratorAggregate;
18
use OutOfBoundsException;
19
use Predis\ClientException;
20
use Predis\Cluster\RedisStrategy as RedisClusterStrategy;
21
use Predis\Cluster\SlotMap;
22
use Predis\Cluster\StrategyInterface;
23
use Predis\Command\Command;
24
use Predis\Command\CommandInterface;
25
use Predis\Command\RawCommand;
26
use Predis\Connection\AbstractAggregateConnection;
27
use Predis\Connection\ConnectionException;
28
use Predis\Connection\FactoryInterface;
29
use Predis\Connection\NodeConnectionInterface;
30
use Predis\Connection\ParametersInterface;
31
use Predis\Connection\RelayFactory;
32
use Predis\NotSupportedException;
33
use Predis\Response\Error as ErrorResponse;
34
use Predis\Response\ErrorInterface as ErrorResponseInterface;
35
use Predis\Response\ServerException;
36
use Predis\Retry\Retry;
37
use Predis\Retry\Strategy\ExponentialBackoff;
38
use Predis\TimeoutException;
39
use ReturnTypeWillChange;
40
use Throwable;
41
use Traversable;
42

43
/**
44
 * Abstraction for a Redis-backed cluster of nodes (Redis >= 3.0.0).
45
 *
46
 * This connection backend offers smart support for redis-cluster by handling
47
 * automatic slots map (re)generation upon -MOVED or -ASK responses returned by
48
 * Redis when redirecting a client to a different node.
49
 *
50
 * The cluster can be pre-initialized using only a subset of the actual nodes in
51
 * the cluster, Predis will do the rest by adjusting the slots map and creating
52
 * the missing underlying connection instances on the fly.
53
 *
54
 * It is possible to pre-associate connections to a slots range with the "slots"
55
 * parameter in the form "$first-$last". This can greatly reduce runtime node
56
 * guessing and redirections.
57
 *
58
 * It is also possible to ask for the full and updated slots map directly to one
59
 * of the nodes and optionally enable such a behaviour upon -MOVED redirections.
60
 * Asking for the cluster configuration to Redis is actually done by issuing a
61
 * CLUSTER SLOTS command to a random node in the pool.
62
 */
63
class RedisCluster extends AbstractAggregateConnection implements ClusterInterface, IteratorAggregate, Countable
64
{
65
    public $useClusterSlots = true;
66

67
    /**
68
     * @var NodeConnectionInterface[]
69
     */
70
    private $pool = [];
71
    private $slots = [];
72
    private $slotmap;
73
    private $strategy;
74
    private $connections;
75
    private $retryLimit = 5;
76
    private $retryInterval = 10;
77

78
    /**
79
     * @var int
80
     */
81
    private $readTimeout = 1000;
82

83
    /**
84
     * @var ParametersInterface
85
     */
86
    private $connectionParameters;
87

88
    /**
89
     * @param FactoryInterface       $connections Optional connection factory.
90
     * @param StrategyInterface|null $strategy    Optional cluster strategy.
91
     * @param int|null               $readTimeout Optional read timeout
92
     */
93
    public function __construct(
81✔
94
        FactoryInterface $connections,
95
        ParametersInterface $parameters,
96
        ?StrategyInterface $strategy = null,
97
        ?int $readTimeout = null
98
    ) {
99
        $this->connections = $connections;
81✔
100
        $this->connectionParameters = $parameters;
81✔
101
        $this->strategy = $strategy ?: new RedisClusterStrategy();
81✔
102
        $this->slotmap = new SlotMap();
81✔
103

104
        if (!is_null($readTimeout)) {
81✔
105
            $this->readTimeout = $readTimeout;
2✔
106
        }
107
    }
108

109
    /**
110
     * Sets the maximum number of retries for commands upon server failure.
111
     *
112
     * -1 = unlimited retry attempts
113
     *  0 = no retry attempts (fails immediately)
114
     *  n = fail only after n retry attempts
115
     *
116
     * @param int $retry Number of retry attempts.
117
     */
118
    public function setRetryLimit($retry)
3✔
119
    {
120
        $this->retryLimit = (int) $retry;
3✔
121
    }
122

123
    /**
124
     * Sets the initial retry interval (milliseconds).
125
     *
126
     * @param int $retryInterval Milliseconds between retries.
127
     */
128
    public function setRetryInterval($retryInterval)
1✔
129
    {
130
        $this->retryInterval = (int) $retryInterval;
1✔
131
    }
132

133
    /**
134
     * Returns the retry interval (milliseconds).
135
     *
136
     * @return int Milliseconds between retries.
137
     */
138
    public function getRetryInterval()
1✔
139
    {
140
        return (int) $this->retryInterval;
1✔
141
    }
142

143
    /**
144
     * {@inheritdoc}
145
     */
146
    public function isConnected()
3✔
147
    {
148
        foreach ($this->pool as $connection) {
3✔
149
            if ($connection->isConnected()) {
3✔
150
                return true;
2✔
151
            }
152
        }
153

154
        return false;
1✔
155
    }
156

157
    /**
158
     * {@inheritdoc}
159
     */
160
    public function connect()
31✔
161
    {
162
        foreach ($this->pool as $connection) {
31✔
163
            $connection->connect();
31✔
164
        }
165
    }
166

167
    /**
168
     * {@inheritdoc}
169
     */
170
    public function disconnect()
3✔
171
    {
172
        foreach ($this->pool as $connection) {
3✔
173
            $connection->disconnect();
3✔
174
        }
175
    }
176

177
    /**
178
     * {@inheritdoc}
179
     */
180
    public function add(NodeConnectionInterface $connection)
71✔
181
    {
182
        $this->pool[(string) $connection] = $connection;
71✔
183
        $this->slotmap->reset();
71✔
184
    }
185

186
    /**
187
     * {@inheritdoc}
188
     */
189
    public function remove(NodeConnectionInterface $connection)
8✔
190
    {
191
        if (false !== $id = array_search($connection, $this->pool, true)) {
8✔
192
            $this->slotmap->reset();
8✔
193
            $this->slots = array_diff($this->slots, [$connection]);
8✔
194
            unset($this->pool[$id]);
8✔
195

196
            return true;
8✔
197
        }
198

199
        return false;
1✔
200
    }
201

202
    /**
203
     * Removes a connection instance by using its identifier.
204
     *
205
     * @param string $connectionID Connection identifier.
206
     *
207
     * @return bool True if the connection was in the pool.
208
     */
209
    public function removeById($connectionID)
1✔
210
    {
211
        if (isset($this->pool[$connectionID])) {
1✔
212
            $this->slotmap->reset();
1✔
213
            $this->slots = array_diff($this->slots, [$connectionID]);
1✔
214
            unset($this->pool[$connectionID]);
1✔
215

216
            return true;
1✔
217
        }
218

219
        return false;
1✔
220
    }
221

222
    /**
223
     * Generates the current slots map by guessing the cluster configuration out
224
     * of the connection parameters of the connections in the pool.
225
     *
226
     * Generation is based on the same algorithm used by Redis to generate the
227
     * cluster, so it is most effective when all of the connections supplied on
228
     * initialization have the "slots" parameter properly set accordingly to the
229
     * current cluster configuration.
230
     */
231
    public function buildSlotMap()
54✔
232
    {
233
        $this->slotmap->reset();
54✔
234

235
        foreach ($this->pool as $connectionID => $connection) {
54✔
236
            $parameters = $connection->getParameters();
54✔
237

238
            if (!isset($parameters->slots)) {
54✔
239
                continue;
49✔
240
            }
241

242
            foreach (explode(',', $parameters->slots) as $slotRange) {
6✔
243
                $slots = explode('-', $slotRange, 2);
6✔
244

245
                if (!isset($slots[1])) {
6✔
246
                    $slots[1] = $slots[0];
1✔
247
                }
248

249
                $this->slotmap->setSlots($slots[0], $slots[1], $connectionID);
6✔
250
            }
251
        }
252
    }
253

254
    /**
255
     * Queries the specified node of the cluster to fetch the updated slots map.
256
     *
257
     * When the connection fails, this method tries to execute the same command
258
     * on a different connection picked at random from the pool of known nodes,
259
     * up until the retry limit is reached.
260
     *
261
     * @param NodeConnectionInterface $connection Connection to a node of the cluster.
262
     *
263
     * @return mixed
264
     */
265
    private function queryClusterNodeForSlotMap(NodeConnectionInterface $connection)
15✔
266
    {
267
        // Backward-compatible hardcoded retry
268
        $retry = new Retry(
15✔
269
            new ExponentialBackoff($this->retryInterval * 1000, -1),
15✔
270
            $this->retryLimit,
15✔
271
            [ConnectionException::class]
15✔
272
        );
15✔
273

274
        $command = RawCommand::create('CLUSTER', 'SLOTS');
15✔
275

276
        $doCallback = function () use (&$connection, $command) {
15✔
277
            return $connection->executeCommand($command);
15✔
278
        };
15✔
279

280
        $failCallback = function (ConnectionException $exception) use (&$connection) {
15✔
281
            $connection = $exception->getConnection();
3✔
282
            $connection->disconnect();
3✔
283

284
            $this->remove($connection);
3✔
285

286
            if (!$connection = $this->getRandomConnection()) {
3✔
NEW
287
                throw new ClientException('No connections left in the pool for `CLUSTER SLOTS`');
×
288
            }
289
        };
15✔
290

291
        return $retry->callWithRetry($doCallback, $failCallback);
15✔
292
    }
293

294
    /**
295
     * Generates an updated slots map fetching the cluster configuration using
296
     * the CLUSTER SLOTS command against the specified node or a random one from
297
     * the pool.
298
     *
299
     * @param NodeConnectionInterface|null $connection Optional connection instance.
300
     */
301
    public function askSlotMap(?NodeConnectionInterface $connection = null)
16✔
302
    {
303
        if (!$connection && !$connection = $this->getRandomConnection()) {
16✔
304
            return;
1✔
305
        }
306

307
        $this->slotmap->reset();
15✔
308

309
        $response = $this->queryClusterNodeForSlotMap($connection);
15✔
310

311
        foreach ($response as $slots) {
14✔
312
            // We only support master servers for now, so we ignore subsequent
313
            // elements in the $slots array identifying slaves.
314
            [$start, $end, $master] = $slots;
14✔
315

316
            if ($master[0] === '') {
14✔
317
                $this->slotmap->setSlots($start, $end, (string) $connection);
1✔
318
            } else {
319
                $this->slotmap->setSlots($start, $end, "{$master[0]}:{$master[1]}");
14✔
320
            }
321
        }
322
    }
323

324
    /**
325
     * Guesses the correct node associated to a given slot using a precalculated
326
     * slots map, falling back to the same logic used by Redis to initialize a
327
     * cluster (best-effort).
328
     *
329
     * @param int $slot Slot index.
330
     *
331
     * @return string Connection ID.
332
     */
333
    protected function guessNode($slot)
52✔
334
    {
335
        if (!$this->pool) {
52✔
336
            throw new ClientException('No connections available in the pool');
1✔
337
        }
338

339
        if ($this->slotmap->isEmpty()) {
51✔
340
            $this->buildSlotMap();
50✔
341
        }
342

343
        if ($node = $this->slotmap[$slot]) {
51✔
344
            return $node;
5✔
345
        }
346

347
        $count = count($this->pool);
49✔
348
        $index = min((int) ($slot / (int) (16384 / $count)), $count - 1);
49✔
349
        $nodes = array_keys($this->pool);
49✔
350

351
        return $nodes[$index];
49✔
352
    }
353

354
    /**
355
     * Creates a new connection instance from the given connection ID.
356
     *
357
     * @param string $connectionID Identifier for the connection.
358
     *
359
     * @return NodeConnectionInterface
360
     */
361
    protected function createConnection($connectionID)
8✔
362
    {
363
        $separator = strrpos($connectionID, ':');
8✔
364

365
        return $this->connections->create([
8✔
366
            'host' => substr($connectionID, 0, $separator),
8✔
367
            'port' => substr($connectionID, $separator + 1),
8✔
368
        ]);
8✔
369
    }
370

371
    /**
372
     * {@inheritdoc}
373
     */
374
    public function getConnectionByCommand(CommandInterface $command)
50✔
375
    {
376
        $slot = $this->strategy->getSlot($command);
50✔
377

378
        if (!isset($slot)) {
50✔
379
            throw new NotSupportedException(
1✔
380
                "Cannot use '{$command->getId()}' with redis-cluster."
1✔
381
            );
1✔
382
        }
383

384
        if (isset($this->slots[$slot])) {
49✔
385
            return $this->slots[$slot];
44✔
386
        } else {
387
            return $this->getConnectionBySlot($slot);
49✔
388
        }
389
    }
390

391
    /**
392
     * Returns the connection currently associated to a given slot.
393
     *
394
     * @param int $slot Slot index.
395
     *
396
     * @return NodeConnectionInterface
397
     * @throws OutOfBoundsException
398
     */
399
    public function getConnectionBySlot($slot)
52✔
400
    {
401
        if (!SlotMap::isValid($slot)) {
52✔
402
            throw new OutOfBoundsException("Invalid slot [$slot].");
×
403
        }
404

405
        if (isset($this->slots[$slot])) {
52✔
406
            return $this->slots[$slot];
1✔
407
        }
408

409
        $connectionID = $this->guessNode($slot);
52✔
410

411
        if (!$connection = $this->getConnectionById($connectionID)) {
51✔
412
            $connection = $this->createConnection($connectionID);
1✔
413
            $this->pool[$connectionID] = $connection;
1✔
414
        }
415

416
        return $this->slots[$slot] = $connection;
51✔
417
    }
418

419
    /**
420
     * {@inheritdoc}
421
     */
422
    public function getConnectionById($connectionID)
54✔
423
    {
424
        return $this->pool[$connectionID] ?? null;
54✔
425
    }
426

427
    /**
428
     * Returns a random connection from the pool.
429
     *
430
     * @return NodeConnectionInterface|null
431
     */
432
    protected function getRandomConnection()
3✔
433
    {
434
        if (!$this->pool) {
3✔
435
            return null;
1✔
436
        }
437

438
        return $this->pool[array_rand($this->pool)];
2✔
439
    }
440

441
    /**
442
     * Permanently associates the connection instance to a new slot.
443
     * The connection is added to the connections pool if not yet included.
444
     *
445
     * @param NodeConnectionInterface $connection Connection instance.
446
     * @param int                     $slot       Target slot index.
447
     */
448
    protected function move(NodeConnectionInterface $connection, $slot)
13✔
449
    {
450
        $this->pool[(string) $connection] = $connection;
13✔
451
        $this->slots[(int) $slot] = $connection;
13✔
452
        $this->slotmap[(int) $slot] = $connection;
13✔
453
    }
454

455
    /**
456
     * Handles -ERR responses returned by Redis.
457
     *
458
     * @param CommandInterface       $command Command that generated the -ERR response.
459
     * @param ErrorResponseInterface $error   Redis error response object.
460
     *
461
     * @return mixed
462
     */
463
    protected function onErrorResponse(CommandInterface $command, ErrorResponseInterface $error)
18✔
464
    {
465
        $details = explode(' ', $error->getMessage(), 2);
18✔
466

467
        switch ($details[0]) {
18✔
468
            case 'MOVED':
18✔
469
                return $this->onMovedResponse($command, $details[1]);
13✔
470

471
            case 'ASK':
7✔
472
                return $this->onAskResponse($command, $details[1]);
2✔
473

474
            default:
475
                return $error;
5✔
476
        }
477
    }
478

479
    /**
480
     * Handles -MOVED responses by executing again the command against the node
481
     * indicated by the Redis response.
482
     *
483
     * @param CommandInterface $command Command that generated the -MOVED response.
484
     * @param string           $details Parameters of the -MOVED response.
485
     *
486
     * @return mixed
487
     */
488
    protected function onMovedResponse(CommandInterface $command, $details)
13✔
489
    {
490
        [$slot, $connectionID] = explode(' ', $details, 2);
13✔
491

492
        // Handle connection ID in the form of "IP:port (details about exception)"
493
        // by trimming everything after first space (including the space)
494
        $startPositionOfExtraDetails = strpos($connectionID, ' ');
13✔
495

496
        if ($startPositionOfExtraDetails !== false) {
13✔
497
            $connectionID = substr($connectionID, 0, $startPositionOfExtraDetails);
7✔
498
        }
499

500
        if (!$connection = $this->getConnectionById($connectionID)) {
13✔
501
            $connection = $this->createConnection($connectionID);
5✔
502
        }
503

504
        if ($this->useClusterSlots) {
13✔
505
            $this->askSlotMap($connection);
9✔
506
        }
507

508
        $this->move($connection, $slot);
13✔
509

510
        return $this->executeCommand($command);
13✔
511
    }
512

513
    /**
514
     * Handles -ASK responses by executing again the command against the node
515
     * indicated by the Redis response.
516
     *
517
     * @param CommandInterface $command Command that generated the -ASK response.
518
     * @param string           $details Parameters of the -ASK response.
519
     *
520
     * @return mixed
521
     */
522
    protected function onAskResponse(CommandInterface $command, $details)
2✔
523
    {
524
        [$slot, $connectionID] = explode(' ', $details, 2);
2✔
525

526
        if (!$connection = $this->getConnectionById($connectionID)) {
2✔
527
            $connection = $this->createConnection($connectionID);
1✔
528
        }
529

530
        $connection->executeCommand(RawCommand::create('ASKING'));
2✔
531

532
        return $connection->executeCommand($command);
2✔
533
    }
534

535
    /**
536
     * Ensures that a command is executed one more time on connection failure.
537
     *
538
     * The connection to the node that generated the error is evicted from the
539
     * pool before trying to fetch an updated slots map from another node. If
540
     * the new slots map points to an unreachable server the client gives up and
541
     * throws the exception as the nodes participating in the cluster may still
542
     * have to agree that something changed in the configuration of the cluster.
543
     *
544
     * @param CommandInterface $command Command instance.
545
     * @param string           $method  Actual method.
546
     *
547
     * @return mixed
548
     * @throws Throwable
549
     */
550
    private function retryCommandOnFailure(CommandInterface $command, $method)
45✔
551
    {
552
        if ($this->connectionParameters->isDisabledRetry() || $this->connections instanceof RelayFactory) {
45✔
553
            // Override default parameters, for backward-compatibility
554
            // with current behaviour
555
            $retry = new Retry(
44✔
556
                new ExponentialBackoff($this->retryInterval * 1000, -1),
44✔
557
                $this->retryLimit
44✔
558
            );
44✔
559
        } else {
560
            $retry = $this->connectionParameters->retry;
3✔
561
        }
562
        $retry->updateCatchableExceptions([ServerException::class]);
45✔
563

564
        $doCallback = function () use ($command, $method) {
45✔
565
            $response = $this->getConnectionByCommand($command)->$method($command);
45✔
566

567
            if ($response instanceof ErrorResponse) {
44✔
568
                $message = $response->getMessage();
16✔
569

570
                if (strpos($message, 'CLUSTERDOWN') !== false) {
16✔
571
                    throw new ServerException($message);
2✔
572
                }
573
            }
574

575
            return $response;
43✔
576
        };
45✔
577

578
        return $retry->callWithRetry(
45✔
579
            $doCallback,
45✔
580
            function (Throwable $e) {
45✔
581
                $this->onFailCallback($e);
6✔
582
            }
45✔
583
        );
45✔
584
    }
585

586
    /**
587
     * {@inheritdoc}
588
     */
589
    public function writeRequest(CommandInterface $command)
1✔
590
    {
591
        $this->retryCommandOnFailure($command, __FUNCTION__);
1✔
592
    }
593

594
    /**
595
     * {@inheritdoc}
596
     */
597
    public function readResponse(CommandInterface $command)
1✔
598
    {
599
        return $this->retryCommandOnFailure($command, __FUNCTION__);
1✔
600
    }
601

602
    /**
603
     * {@inheritdoc}
604
     */
605
    public function executeCommand(CommandInterface $command)
43✔
606
    {
607
        $response = $this->retryCommandOnFailure($command, __FUNCTION__);
43✔
608

609
        if ($response instanceof ErrorResponseInterface) {
41✔
610
            return $this->onErrorResponse($command, $response);
18✔
611
        }
612

613
        return $response;
41✔
614
    }
615

616
    /**
617
     * {@inheritdoc}
618
     */
619
    public function executeCommandOnEachNode(CommandInterface $command): array
1✔
620
    {
621
        $responses = [];
1✔
622

623
        foreach ($this->pool as $connection) {
1✔
624
            $responses[] = $connection->executeCommand($command);
1✔
625
        }
626

627
        return $responses;
1✔
628
    }
629

630
    /**
631
     * @return int
632
     */
633
    #[ReturnTypeWillChange]
10✔
634
    public function count()
635
    {
636
        return count($this->pool);
10✔
637
    }
638

639
    /**
640
     * @return Traversable<string, NodeConnectionInterface>
641
     */
642
    #[ReturnTypeWillChange]
2✔
643
    public function getIterator()
644
    {
645
        if ($this->slotmap->isEmpty()) {
2✔
646
            $this->useClusterSlots ? $this->askSlotMap() : $this->buildSlotMap();
2✔
647
        }
648

649
        $connections = [];
2✔
650

651
        foreach ($this->slotmap->getNodes() as $node) {
2✔
652
            if (!$connection = $this->getConnectionById($node)) {
2✔
653
                $this->add($connection = $this->createConnection($node));
1✔
654
            }
655

656
            $connections[] = $connection;
2✔
657
        }
658

659
        return new ArrayIterator($connections);
2✔
660
    }
661

662
    /**
663
     * Returns the underlying slot map.
664
     *
665
     * @return SlotMap
666
     */
667
    public function getSlotMap()
8✔
668
    {
669
        return $this->slotmap;
8✔
670
    }
671

672
    /**
673
     * {@inheritDoc}
674
     */
675
    public function getClusterStrategy(): StrategyInterface
8✔
676
    {
677
        return $this->strategy;
8✔
678
    }
679

680
    /**
681
     * Returns the underlying connection factory used to create new connection
682
     * instances to Redis nodes indicated by redis-cluster.
683
     *
684
     * @return FactoryInterface
685
     */
686
    public function getConnectionFactory()
1✔
687
    {
688
        return $this->connections;
1✔
689
    }
690

691
    /**
692
     * Enables automatic fetching of the current slots map from one of the nodes
693
     * using the CLUSTER SLOTS command. This option is enabled by default as
694
     * asking the current slots map to Redis upon -MOVED responses may reduce
695
     * overhead by eliminating the trial-and-error nature of the node guessing
696
     * procedure, mostly when targeting many keys that would end up in a lot of
697
     * redirections.
698
     *
699
     * The slots map can still be manually fetched using the askSlotMap()
700
     * method whether or not this option is enabled.
701
     *
702
     * @param bool $value Enable or disable the use of CLUSTER SLOTS.
703
     */
704
    public function useClusterSlots($value)
13✔
705
    {
706
        $this->useClusterSlots = (bool) $value;
13✔
707
    }
708

709
    /**
710
     * {@inheritdoc}
711
     */
712
    public function getParameters(): ?ParametersInterface
34✔
713
    {
714
        return $this->connectionParameters;
34✔
715
    }
716

717
    /**
718
     * Loop over connections until there's data to read.
719
     *
720
     * @return mixed
721
     */
722
    public function read()
×
723
    {
724
        while (true) {
×
725
            foreach ($this->pool as $connection) {
×
726
                if ($connection->hasDataToRead()) {
×
727
                    return $connection->read();
×
728
                }
729
            }
730

731
            usleep($this->readTimeout);
×
732
        }
733
    }
734

735
    /**
736
     * Handle exceptions.
737
     *
738
     * @param  Throwable $exception
739
     * @return void
740
     */
741
    private function onFailCallback(Throwable $exception)
6✔
742
    {
743
        if ($exception instanceof ConnectionException) {
6✔
744
            $connection = $exception->getConnection();
2✔
745

746
            if ($connection) {
2✔
747
                $connection->disconnect();
2✔
748
                $this->remove($connection);
2✔
749
            }
750

751
            if ($this->useClusterSlots) {
2✔
752
                $this->askSlotMap();
1✔
753
            }
754
        }
755

756
        if ($exception instanceof TimeoutException) {
6✔
757
            $connection = $exception->getConnection();
2✔
758

759
            if ($connection) {
2✔
760
                $connection->disconnect();
2✔
761
            }
762
        }
763
    }
764
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc