• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

predis / predis / 23297885341

19 Mar 2026 01:44PM UTC coverage: 92.827%. First build
23297885341

Pull #1656

github

web-flow
Merge ebe61c872 into 1c4219cf8
Pull Request #1656: Handle -READONLY as a redirection signal for Redis Cluster (AWS ElastiCache support)

1 of 8 new or added lines in 1 file covered. (12.5%)

8347 of 8992 relevant lines covered (92.83%)

114.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.59
/src/Connection/Cluster/RedisCluster.php
1
<?php
2

3
/*
4
 * This file is part of the Predis package.
5
 *
6
 * (c) 2009-2020 Daniele Alessandri
7
 * (c) 2021-2026 Till Krüss
8
 *
9
 * For the full copyright and license information, please view the LICENSE
10
 * file that was distributed with this source code.
11
 */
12

13
namespace Predis\Connection\Cluster;
14

15
use ArrayIterator;
16
use Countable;
17
use IteratorAggregate;
18
use OutOfBoundsException;
19
use Predis\ClientException;
20
use Predis\Cluster\RedisStrategy as RedisClusterStrategy;
21
use Predis\Cluster\SlotMap;
22
use Predis\Cluster\StrategyInterface;
23
use Predis\Command\Command;
24
use Predis\Command\CommandInterface;
25
use Predis\Command\RawCommand;
26
use Predis\Connection\AbstractAggregateConnection;
27
use Predis\Connection\ConnectionException;
28
use Predis\Connection\FactoryInterface;
29
use Predis\Connection\NodeConnectionInterface;
30
use Predis\Connection\ParametersInterface;
31
use Predis\Connection\RelayFactory;
32
use Predis\NotSupportedException;
33
use Predis\Response\Error as ErrorResponse;
34
use Predis\Response\ErrorInterface as ErrorResponseInterface;
35
use Predis\Response\ServerException;
36
use Predis\Retry\Retry;
37
use Predis\Retry\Strategy\ExponentialBackoff;
38
use Predis\TimeoutException;
39
use ReturnTypeWillChange;
40
use Throwable;
41
use Traversable;
42

43
/**
44
 * Abstraction for a Redis-backed cluster of nodes (Redis >= 3.0.0).
45
 *
46
 * This connection backend offers smart support for redis-cluster by handling
47
 * automatic slots map (re)generation upon -MOVED or -ASK responses returned by
48
 * Redis when redirecting a client to a different node.
49
 *
50
 * The cluster can be pre-initialized using only a subset of the actual nodes in
51
 * the cluster, Predis will do the rest by adjusting the slots map and creating
52
 * the missing underlying connection instances on the fly.
53
 *
54
 * It is possible to pre-associate connections to a slots range with the "slots"
55
 * parameter in the form "$first-$last". This can greatly reduce runtime node
56
 * guessing and redirections.
57
 *
58
 * It is also possible to ask for the full and updated slots map directly to one
59
 * of the nodes and optionally enable such a behaviour upon -MOVED redirections.
60
 * Asking for the cluster configuration to Redis is actually done by issuing a
61
 * CLUSTER SLOTS command to a random node in the pool.
62
 */
63
class RedisCluster extends AbstractAggregateConnection implements ClusterInterface, IteratorAggregate, Countable
64
{
65
    public $useClusterSlots = true;
66

67
    /**
68
     * @var NodeConnectionInterface[]
69
     */
70
    private $pool = [];
71
    private $slots = [];
72
    private $slotmap;
73
    private $strategy;
74
    private $connections;
75
    private $retryLimit = 5;
76
    private $retryInterval = 10;
77

78
    /**
79
     * @var int
80
     */
81
    private $readTimeout = 1000;
82

83
    /**
84
     * @var ParametersInterface
85
     */
86
    private $connectionParameters;
87

88
    /**
89
     * @param FactoryInterface       $connections Optional connection factory.
90
     * @param StrategyInterface|null $strategy    Optional cluster strategy.
91
     * @param int|null               $readTimeout Optional read timeout
92
     */
93
    public function __construct(
85✔
94
        FactoryInterface $connections,
95
        ParametersInterface $parameters,
96
        ?StrategyInterface $strategy = null,
97
        ?int $readTimeout = null
98
    ) {
99
        $this->connections = $connections;
85✔
100
        $this->connectionParameters = $parameters;
85✔
101
        $this->strategy = $strategy ?: new RedisClusterStrategy();
85✔
102
        $this->slotmap = new SlotMap();
85✔
103

104
        if (!is_null($readTimeout)) {
85✔
105
            $this->readTimeout = $readTimeout;
2✔
106
        }
107
    }
108

109
    /**
110
     * Sets the maximum number of retries for commands upon server failure.
111
     *
112
     * -1 = unlimited retry attempts
113
     *  0 = no retry attempts (fails immediately)
114
     *  n = fail only after n retry attempts
115
     *
116
     * @param int $retry Number of retry attempts.
117
     */
118
    public function setRetryLimit($retry)
4✔
119
    {
120
        $this->retryLimit = (int) $retry;
4✔
121
    }
122

123
    /**
124
     * Sets the initial retry interval (milliseconds).
125
     *
126
     * @param int $retryInterval Milliseconds between retries.
127
     */
128
    public function setRetryInterval($retryInterval)
2✔
129
    {
130
        $this->retryInterval = (int) $retryInterval;
2✔
131
    }
132

133
    /**
134
     * Returns the retry interval (milliseconds).
135
     *
136
     * @return int Milliseconds between retries.
137
     */
138
    public function getRetryInterval()
1✔
139
    {
140
        return (int) $this->retryInterval;
1✔
141
    }
142

143
    /**
144
     * {@inheritdoc}
145
     */
146
    public function isConnected()
3✔
147
    {
148
        foreach ($this->pool as $connection) {
3✔
149
            if ($connection->isConnected()) {
3✔
150
                return true;
2✔
151
            }
152
        }
153

154
        return false;
1✔
155
    }
156

157
    /**
158
     * {@inheritdoc}
159
     */
160
    public function connect()
34✔
161
    {
162
        foreach ($this->pool as $connection) {
34✔
163
            $connection->connect();
34✔
164
        }
165
    }
166

167
    /**
168
     * {@inheritdoc}
169
     */
170
    public function disconnect()
3✔
171
    {
172
        foreach ($this->pool as $connection) {
3✔
173
            $connection->disconnect();
3✔
174
        }
175
    }
176

177
    /**
178
     * {@inheritdoc}
179
     */
180
    public function add(NodeConnectionInterface $connection)
75✔
181
    {
182
        $this->pool[(string) $connection] = $connection;
75✔
183
        $this->slotmap->reset();
75✔
184
    }
185

186
    /**
187
     * {@inheritdoc}
188
     */
189
    public function remove(NodeConnectionInterface $connection)
8✔
190
    {
191
        if (false !== $id = array_search($connection, $this->pool, true)) {
8✔
192
            $this->slotmap->reset();
8✔
193
            $this->slots = array_diff($this->slots, [$connection]);
8✔
194
            unset($this->pool[$id]);
8✔
195

196
            return true;
8✔
197
        }
198

199
        return false;
1✔
200
    }
201

202
    /**
203
     * Removes a connection instance by using its identifier.
204
     *
205
     * @param string $connectionID Connection identifier.
206
     *
207
     * @return bool True if the connection was in the pool.
208
     */
209
    public function removeById($connectionID)
1✔
210
    {
211
        if (isset($this->pool[$connectionID])) {
1✔
212
            $this->slotmap->reset();
1✔
213
            $this->slots = array_diff($this->slots, [$connectionID]);
1✔
214
            unset($this->pool[$connectionID]);
1✔
215

216
            return true;
1✔
217
        }
218

219
        return false;
1✔
220
    }
221

222
    /**
223
     * Generates the current slots map by guessing the cluster configuration out
224
     * of the connection parameters of the connections in the pool.
225
     *
226
     * Generation is based on the same algorithm used by Redis to generate the
227
     * cluster, so it is most effective when all of the connections supplied on
228
     * initialization have the "slots" parameter properly set accordingly to the
229
     * current cluster configuration.
230
     */
231
    public function buildSlotMap()
57✔
232
    {
233
        $this->slotmap->reset();
57✔
234

235
        foreach ($this->pool as $connectionID => $connection) {
57✔
236
            $parameters = $connection->getParameters();
57✔
237

238
            if (!isset($parameters->slots)) {
57✔
239
                continue;
52✔
240
            }
241

242
            foreach (explode(',', $parameters->slots) as $slotRange) {
6✔
243
                $slots = explode('-', $slotRange, 2);
6✔
244

245
                if (!isset($slots[1])) {
6✔
246
                    $slots[1] = $slots[0];
1✔
247
                }
248

249
                $this->slotmap->setSlots($slots[0], $slots[1], $connectionID);
6✔
250
            }
251
        }
252
    }
253

254
    /**
255
     * Queries the specified node of the cluster to fetch the updated slots map.
256
     *
257
     * When the connection fails, this method tries to execute the same command
258
     * on a different connection picked at random from the pool of known nodes,
259
     * up until the retry limit is reached.
260
     *
261
     * @param NodeConnectionInterface $connection Connection to a node of the cluster.
262
     *
263
     * @return mixed
264
     */
265
    private function queryClusterNodeForSlotMap(NodeConnectionInterface $connection)
15✔
266
    {
267
        // Backward-compatible hardcoded retry
268
        $retry = new Retry(
15✔
269
            new ExponentialBackoff($this->retryInterval * 1000, -1),
15✔
270
            $this->retryLimit,
15✔
271
            [ConnectionException::class]
15✔
272
        );
15✔
273

274
        $command = RawCommand::create('CLUSTER', 'SLOTS');
15✔
275

276
        $doCallback = static function () use (&$connection, $command) {
15✔
277
            return $connection->executeCommand($command);
15✔
278
        };
15✔
279

280
        $failCallback = function (ConnectionException $exception) use (&$connection) {
15✔
281
            $connection = $exception->getConnection();
3✔
282
            $connection->disconnect();
3✔
283

284
            $this->remove($connection);
3✔
285

286
            if (!$connection = $this->getRandomConnection()) {
3✔
287
                throw new ClientException('No connections left in the pool for `CLUSTER SLOTS`');
×
288
            }
289
        };
15✔
290

291
        return $retry->callWithRetry($doCallback, $failCallback);
15✔
292
    }
293

294
    /**
295
     * Generates an updated slots map fetching the cluster configuration using
296
     * the CLUSTER SLOTS command against the specified node or a random one from
297
     * the pool.
298
     *
299
     * @param NodeConnectionInterface|null $connection Optional connection instance.
300
     */
301
    public function askSlotMap(?NodeConnectionInterface $connection = null)
16✔
302
    {
303
        if (!$connection && !$connection = $this->getRandomConnection()) {
16✔
304
            return;
1✔
305
        }
306

307
        $this->slotmap->reset();
15✔
308

309
        $response = $this->queryClusterNodeForSlotMap($connection);
15✔
310

311
        foreach ($response as $slots) {
14✔
312
            // We only support master servers for now, so we ignore subsequent
313
            // elements in the $slots array identifying slaves.
314
            [$start, $end, $master] = $slots;
14✔
315

316
            if ($master[0] === '') {
14✔
317
                $this->slotmap->setSlots($start, $end, (string) $connection);
1✔
318
            } else {
319
                $this->slotmap->setSlots($start, $end, "{$master[0]}:{$master[1]}");
14✔
320
            }
321
        }
322
    }
323

324
    /**
325
     * Guesses the correct node associated to a given slot using a precalculated
326
     * slots map, falling back to the same logic used by Redis to initialize a
327
     * cluster (best-effort).
328
     *
329
     * @param int $slot Slot index.
330
     *
331
     * @return string Connection ID.
332
     */
333
    protected function guessNode($slot)
56✔
334
    {
335
        if (!$this->pool) {
56✔
336
            throw new ClientException('No connections available in the pool');
1✔
337
        }
338

339
        if ($this->slotmap->isEmpty()) {
55✔
340
            $this->buildSlotMap();
53✔
341
        }
342

343
        if ($node = $this->slotmap[$slot]) {
55✔
344
            return $node;
6✔
345
        }
346

347
        $count = count($this->pool);
52✔
348
        $index = min((int) ($slot / (int) (16384 / $count)), $count - 1);
52✔
349
        $nodes = array_keys($this->pool);
52✔
350

351
        return $nodes[$index];
52✔
352
    }
353

354
    /**
355
     * Creates a new connection instance from the given connection ID.
356
     *
357
     * @param string $connectionID Identifier for the connection.
358
     *
359
     * @return NodeConnectionInterface
360
     */
361
    protected function createConnection($connectionID)
8✔
362
    {
363
        $separator = strrpos($connectionID, ':');
8✔
364

365
        return $this->connections->create([
8✔
366
            'host' => substr($connectionID, 0, $separator),
8✔
367
            'port' => substr($connectionID, $separator + 1),
8✔
368
        ]);
8✔
369
    }
370

371
    /**
372
     * {@inheritdoc}
373
     */
374
    public function getConnectionByCommand(CommandInterface $command)
54✔
375
    {
376
        $slot = $this->strategy->getSlot($command);
54✔
377

378
        if (!isset($slot)) {
54✔
379
            throw new NotSupportedException(
1✔
380
                "Cannot use '{$command->getId()}' with redis-cluster."
1✔
381
            );
1✔
382
        }
383

384
        if (isset($this->slots[$slot])) {
53✔
385
            return $this->slots[$slot];
45✔
386
        }
387

388
        return $this->getConnectionBySlot($slot);
53✔
389
    }
390

391
    /**
392
     * Returns the connection currently associated to a given slot.
393
     *
394
     * @param int $slot Slot index.
395
     *
396
     * @return NodeConnectionInterface
397
     * @throws OutOfBoundsException
398
     */
399
    public function getConnectionBySlot($slot)
56✔
400
    {
401
        if (!SlotMap::isValid($slot)) {
56✔
402
            throw new OutOfBoundsException("Invalid slot [$slot].");
×
403
        }
404

405
        if (isset($this->slots[$slot])) {
56✔
406
            return $this->slots[$slot];
1✔
407
        }
408

409
        $connectionID = $this->guessNode($slot);
56✔
410

411
        if (!$connection = $this->getConnectionById($connectionID)) {
55✔
412
            $connection = $this->createConnection($connectionID);
1✔
413
            $this->pool[$connectionID] = $connection;
1✔
414
        }
415

416
        return $this->slots[$slot] = $connection;
55✔
417
    }
418

419
    /**
420
     * {@inheritdoc}
421
     */
422
    public function getConnectionById($connectionID)
58✔
423
    {
424
        return $this->pool[$connectionID] ?? null;
58✔
425
    }
426

427
    /**
428
     * Returns a random connection from the pool.
429
     *
430
     * @return NodeConnectionInterface|null
431
     */
432
    protected function getRandomConnection()
3✔
433
    {
434
        if (!$this->pool) {
3✔
435
            return null;
1✔
436
        }
437

438
        return $this->pool[array_rand($this->pool)];
2✔
439
    }
440

441
    /**
442
     * Permanently associates the connection instance to a new slot.
443
     * The connection is added to the connections pool if not yet included.
444
     *
445
     * @param NodeConnectionInterface $connection Connection instance.
446
     * @param int                     $slot       Target slot index.
447
     */
448
    protected function move(NodeConnectionInterface $connection, $slot)
13✔
449
    {
450
        $this->pool[(string) $connection] = $connection;
13✔
451
        $this->slots[(int) $slot] = $connection;
13✔
452
        $this->slotmap[(int) $slot] = $connection;
13✔
453
    }
454

455
    /**
456
     * Handles -ERR responses returned by Redis.
457
     *
458
     * @param CommandInterface       $command Command that generated the -ERR response.
459
     * @param ErrorResponseInterface $error   Redis error response object.
460
     *
461
     * @return mixed
462
     */
463
    protected function onErrorResponse(CommandInterface $command, ErrorResponseInterface $error)
18✔
464
    {
465
        $details = explode(' ', $error->getMessage(), 2);
18✔
466

467
        switch ($details[0]) {
18✔
468
            case 'READONLY':
18✔
NEW
469
                return $this->onReadOnlyResponse($command);
×
470

471
            case 'MOVED':
18✔
472
                return $this->onMovedResponse($command, $details[1]);
13✔
473

474
            case 'ASK':
7✔
475
                return $this->onAskResponse($command, $details[1]);
2✔
476

477
            default:
478
                return $error;
5✔
479
        }
480
    }
481

482
    /**
483
     * Handles -READONLY responses by disconnecting the current node's connection
484
     * and refreshing the slots map (when cluster slots are enabled), then
485
     * re-executing the command so it is routed to the updated primary node.
486
     *
487
     * This is a workaround for AWS ElastiCache Redis OSS, which may return
488
     * -READONLY errors during failover events. Standard Redis clusters do not
489
     * exhibit this behavior.
490
     *
491
     * @param CommandInterface $command Command that generated the -READONLY response.
492
     *
493
     * @return mixed
494
     */
NEW
495
    protected function onReadOnlyResponse(CommandInterface $command)
×
496
    {
NEW
497
        if ($this->useClusterSlots) {
×
NEW
498
            $connection = $this->getConnectionByCommand($command);
×
NEW
499
            $connection->disconnect();
×
NEW
500
            $this->askSlotMap();
×
501
        }
502

NEW
503
        return $this->executeCommand($command);
×
504
    }
505

506
    /**
507
     * Handles -MOVED responses by executing again the command against the node
508
     * indicated by the Redis response.
509
     *
510
     * @param CommandInterface $command Command that generated the -MOVED response.
511
     * @param string           $details Parameters of the -MOVED response.
512
     *
513
     * @return mixed
514
     */
515
    protected function onMovedResponse(CommandInterface $command, $details)
13✔
516
    {
517
        [$slot, $connectionID] = explode(' ', $details, 2);
13✔
518

519
        // Handle connection ID in the form of "IP:port (details about exception)"
520
        // by trimming everything after first space (including the space)
521
        $startPositionOfExtraDetails = strpos($connectionID, ' ');
13✔
522

523
        if ($startPositionOfExtraDetails !== false) {
13✔
524
            $connectionID = substr($connectionID, 0, $startPositionOfExtraDetails);
7✔
525
        }
526

527
        if (!$connection = $this->getConnectionById($connectionID)) {
13✔
528
            $connection = $this->createConnection($connectionID);
5✔
529
        }
530

531
        if ($this->useClusterSlots) {
13✔
532
            $this->askSlotMap($connection);
9✔
533
        }
534

535
        $this->move($connection, $slot);
13✔
536

537
        return $this->executeCommand($command);
13✔
538
    }
539

540
    /**
541
     * Handles -ASK responses by executing again the command against the node
542
     * indicated by the Redis response.
543
     *
544
     * @param CommandInterface $command Command that generated the -ASK response.
545
     * @param string           $details Parameters of the -ASK response.
546
     *
547
     * @return mixed
548
     */
549
    protected function onAskResponse(CommandInterface $command, $details)
2✔
550
    {
551
        [$slot, $connectionID] = explode(' ', $details, 2);
2✔
552

553
        if (!$connection = $this->getConnectionById($connectionID)) {
2✔
554
            $connection = $this->createConnection($connectionID);
1✔
555
        }
556

557
        $connection->executeCommand(RawCommand::create('ASKING'));
2✔
558

559
        return $connection->executeCommand($command);
2✔
560
    }
561

562
    /**
563
     * Ensures that a command is executed one more time on connection failure.
564
     *
565
     * The connection to the node that generated the error is evicted from the
566
     * pool before trying to fetch an updated slots map from another node. If
567
     * the new slots map points to an unreachable server the client gives up and
568
     * throws the exception as the nodes participating in the cluster may still
569
     * have to agree that something changed in the configuration of the cluster.
570
     *
571
     * @param CommandInterface $command Command instance.
572
     * @param string           $method  Actual method.
573
     *
574
     * @return mixed
575
     * @throws Throwable
576
     */
577
    private function retryCommandOnFailure(CommandInterface $command, $method)
49✔
578
    {
579
        if ($this->connectionParameters->isDisabledRetry() || $this->connections instanceof RelayFactory) {
49✔
580
            // Override default parameters, for backward-compatibility
581
            // with current behaviour
582
            $retry = new Retry(
48✔
583
                new ExponentialBackoff($this->retryInterval * 1000, -1),
48✔
584
                $this->retryLimit
48✔
585
            );
48✔
586
        } else {
587
            $retry = $this->connectionParameters->retry;
3✔
588
        }
589
        $retry->updateCatchableExceptions([ServerException::class]);
49✔
590

591
        $doCallback = function () use ($command, $method) {
49✔
592
            $response = $this->getConnectionByCommand($command)->$method($command);
49✔
593

594
            if ($response instanceof ErrorResponse) {
47✔
595
                $message = $response->getMessage();
16✔
596

597
                if (strpos($message, 'CLUSTERDOWN') !== false) {
16✔
598
                    throw new ServerException($message);
2✔
599
                }
600
            }
601

602
            return $response;
46✔
603
        };
49✔
604

605
        return $retry->callWithRetry(
49✔
606
            $doCallback,
49✔
607
            function (Throwable $e) {
49✔
608
                $this->onFailCallback($e);
9✔
609
            }
49✔
610
        );
49✔
611
    }
612

613
    /**
614
     * {@inheritdoc}
615
     */
616
    public function writeRequest(CommandInterface $command)
1✔
617
    {
618
        $this->retryCommandOnFailure($command, __FUNCTION__);
1✔
619
    }
620

621
    /**
622
     * {@inheritdoc}
623
     */
624
    public function readResponse(CommandInterface $command)
1✔
625
    {
626
        return $this->retryCommandOnFailure($command, __FUNCTION__);
1✔
627
    }
628

629
    /**
630
     * {@inheritdoc}
631
     */
632
    public function executeCommand(CommandInterface $command)
47✔
633
    {
634
        $response = $this->retryCommandOnFailure($command, __FUNCTION__);
47✔
635

636
        if ($response instanceof ErrorResponseInterface) {
44✔
637
            return $this->onErrorResponse($command, $response);
18✔
638
        }
639

640
        return $response;
44✔
641
    }
642

643
    /**
644
     * {@inheritdoc}
645
     */
646
    public function executeCommandOnEachNode(CommandInterface $command): array
1✔
647
    {
648
        $responses = [];
1✔
649

650
        foreach ($this->pool as $connection) {
1✔
651
            $responses[] = $connection->executeCommand($command);
1✔
652
        }
653

654
        return $responses;
1✔
655
    }
656

657
    /**
658
     * @return int
659
     */
660
    #[ReturnTypeWillChange]
10✔
661
    public function count()
662
    {
663
        return count($this->pool);
10✔
664
    }
665

666
    /**
667
     * @return Traversable<string, NodeConnectionInterface>
668
     */
669
    #[ReturnTypeWillChange]
2✔
670
    public function getIterator()
671
    {
672
        if ($this->slotmap->isEmpty()) {
2✔
673
            $this->useClusterSlots ? $this->askSlotMap() : $this->buildSlotMap();
2✔
674
        }
675

676
        $connections = [];
2✔
677

678
        foreach ($this->slotmap->getNodes() as $node) {
2✔
679
            if (!$connection = $this->getConnectionById($node)) {
2✔
680
                $this->add($connection = $this->createConnection($node));
1✔
681
            }
682

683
            $connections[] = $connection;
2✔
684
        }
685

686
        return new ArrayIterator($connections);
2✔
687
    }
688

689
    /**
690
     * Returns the underlying slot map.
691
     *
692
     * @return SlotMap
693
     */
694
    public function getSlotMap()
9✔
695
    {
696
        return $this->slotmap;
9✔
697
    }
698

699
    /**
700
     * {@inheritDoc}
701
     */
702
    public function getClusterStrategy(): StrategyInterface
8✔
703
    {
704
        return $this->strategy;
8✔
705
    }
706

707
    /**
708
     * Returns the underlying connection factory used to create new connection
709
     * instances to Redis nodes indicated by redis-cluster.
710
     *
711
     * @return FactoryInterface
712
     */
713
    public function getConnectionFactory()
1✔
714
    {
715
        return $this->connections;
1✔
716
    }
717

718
    /**
719
     * Enables automatic fetching of the current slots map from one of the nodes
720
     * using the CLUSTER SLOTS command. This option is enabled by default as
721
     * asking the current slots map to Redis upon -MOVED responses may reduce
722
     * overhead by eliminating the trial-and-error nature of the node guessing
723
     * procedure, mostly when targeting many keys that would end up in a lot of
724
     * redirections.
725
     *
726
     * The slots map can still be manually fetched using the askSlotMap()
727
     * method whether or not this option is enabled.
728
     *
729
     * @param bool $value Enable or disable the use of CLUSTER SLOTS.
730
     */
731
    public function useClusterSlots($value)
14✔
732
    {
733
        $this->useClusterSlots = (bool) $value;
14✔
734
    }
735

736
    /**
737
     * {@inheritdoc}
738
     */
739
    public function getParameters(): ?ParametersInterface
37✔
740
    {
741
        return $this->connectionParameters;
37✔
742
    }
743

744
    /**
745
     * Loop over connections until there's data to read.
746
     *
747
     * @return mixed
748
     */
749
    public function read()
×
750
    {
751
        while (true) {
×
752
            foreach ($this->pool as $connection) {
×
753
                if ($connection->hasDataToRead()) {
×
754
                    return $connection->read();
×
755
                }
756
            }
757

758
            usleep($this->readTimeout);
×
759
        }
760
    }
761

762
    /**
763
     * Handle exceptions.
764
     *
765
     * @param  Throwable $exception
766
     * @return void
767
     */
768
    private function onFailCallback(Throwable $exception)
9✔
769
    {
770
        if ($exception instanceof ConnectionException) {
9✔
771
            $connection = $exception->getConnection();
2✔
772

773
            if ($connection) {
2✔
774
                $connection->disconnect();
2✔
775
                $this->remove($connection);
2✔
776
            }
777

778
            if ($this->useClusterSlots) {
2✔
779
                $this->askSlotMap();
1✔
780
            }
781
        }
782

783
        if ($exception instanceof TimeoutException) {
9✔
784
            $connection = $exception->getConnection();
2✔
785

786
            if ($connection) {
2✔
787
                $connection->disconnect();
2✔
788
            }
789
        }
790
    }
791
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc