• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

predis / predis / 20455852140

23 Dec 2025 08:40AM UTC coverage: 92.615% (-0.2%) from 92.788%
20455852140

push

github

web-flow
Added retry support (#1616)

* Initial work on retries

* Added retry class and test coverage

* Added support for standalone and cluster

* Make TimeoutException instance of CommunicationException

* Added pipeline, trasnaction, replication support

* Fixed broken test

* Marked test as relay-incompatible

* Marked test as relay-incompatible

* Fixed analysis errors, added missing tests

* Codestyle fixes

* Fixed test

* Update README.md

* Update README.md

* Update README.md

* Updated README.md

* Refactor retry on read and write

* Added check for timeout value

* Updated README.md

* Fixed README.md

* Codestyle changes

* Added missing coverage

* Added missing test coverage

* Removed comments

* Added retry support for Relay connection (#1620)

* Added integration test case with mocked retry

* Changed client initialisation in tests

* Marked test as relay-incompatible

---------

Co-authored-by: Pavlo Yatsukhnenko <yatsukhnenko@users.noreply.github.com>

301 of 358 new or added lines in 18 files covered. (84.08%)

2 existing lines in 2 files now uncovered.

8214 of 8869 relevant lines covered (92.61%)

112.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.4
/src/Connection/Replication/MasterSlaveReplication.php
1
<?php
2

3
/*
4
 * This file is part of the Predis package.
5
 *
6
 * (c) 2009-2020 Daniele Alessandri
7
 * (c) 2021-2025 Till Krüss
8
 *
9
 * For the full copyright and license information, please view the LICENSE
10
 * file that was distributed with this source code.
11
 */
12

13
namespace Predis\Connection\Replication;
14

15
use InvalidArgumentException;
16
use Predis\ClientException;
17
use Predis\Command\Command;
18
use Predis\Command\CommandInterface;
19
use Predis\Command\RawCommand;
20
use Predis\Connection\AbstractAggregateConnection;
21
use Predis\Connection\ConnectionException;
22
use Predis\Connection\FactoryInterface;
23
use Predis\Connection\NodeConnectionInterface;
24
use Predis\Connection\ParametersInterface;
25
use Predis\Connection\RelayFactory;
26
use Predis\Replication\MissingMasterException;
27
use Predis\Replication\ReplicationStrategy;
28
use Predis\Response\ErrorInterface as ResponseErrorInterface;
29
use Predis\TimeoutException;
30
use Throwable;
31

32
/**
33
 * Aggregate connection handling replication of Redis nodes configured in a
34
 * single master / multiple slaves setup.
35
 */
36
class MasterSlaveReplication extends AbstractAggregateConnection implements ReplicationInterface
37
{
38
    /**
39
     * @var ReplicationStrategy
40
     */
41
    protected $strategy;
42

43
    /**
44
     * @var NodeConnectionInterface
45
     */
46
    protected $master;
47

48
    /**
49
     * @var NodeConnectionInterface[]
50
     */
51
    protected $slaves = [];
52

53
    /**
54
     * @var NodeConnectionInterface[]
55
     */
56
    protected $pool = [];
57

58
    /**
59
     * @var NodeConnectionInterface[]
60
     */
61
    protected $aliases = [];
62

63
    /**
64
     * @var NodeConnectionInterface
65
     */
66
    protected $current;
67

68
    /**
69
     * @var bool
70
     */
71
    protected $autoDiscovery = false;
72

73
    /**
74
     * @var FactoryInterface
75
     */
76
    protected $connectionFactory;
77

78
    /**
79
     * {@inheritdoc}
80
     */
81
    public function __construct(?ReplicationStrategy $strategy = null)
64✔
82
    {
83
        $this->strategy = $strategy ?: new ReplicationStrategy();
64✔
84
    }
85

86
    /**
87
     * Configures the automatic discovery of the replication configuration on failure.
88
     *
89
     * @param bool $value Enable or disable auto discovery.
90
     */
91
    public function setAutoDiscovery($value)
3✔
92
    {
93
        if (!$this->connectionFactory) {
3✔
94
            throw new ClientException('Automatic discovery requires a connection factory');
1✔
95
        }
96

97
        $this->autoDiscovery = (bool) $value;
2✔
98
    }
99

100
    /**
101
     * Sets the connection factory used to create the connections by the auto
102
     * discovery procedure.
103
     *
104
     * @param FactoryInterface $connectionFactory Connection factory instance.
105
     */
106
    public function setConnectionFactory(FactoryInterface $connectionFactory)
5✔
107
    {
108
        $this->connectionFactory = $connectionFactory;
5✔
109
    }
110

111
    /**
112
     * Resets the connection state.
113
     */
114
    protected function reset()
56✔
115
    {
116
        $this->current = null;
56✔
117
    }
118

119
    /**
120
     * {@inheritdoc}
121
     */
122
    public function add(NodeConnectionInterface $connection)
56✔
123
    {
124
        $parameters = $connection->getParameters();
56✔
125

126
        if ('master' === $parameters->role) {
56✔
127
            $this->master = $connection;
49✔
128
        } else {
129
            // everything else is considered a slvave.
130
            $this->slaves[] = $connection;
45✔
131
        }
132

133
        if (isset($parameters->alias)) {
56✔
134
            $this->aliases[$parameters->alias] = $connection;
3✔
135
        }
136

137
        $this->pool[(string) $connection] = $connection;
56✔
138

139
        $this->reset();
56✔
140
    }
141

142
    /**
143
     * {@inheritdoc}
144
     */
145
    public function remove(NodeConnectionInterface $connection)
6✔
146
    {
147
        if ($connection === $this->master) {
6✔
148
            $this->master = null;
1✔
149
        } elseif (false !== $id = array_search($connection, $this->slaves, true)) {
5✔
150
            unset($this->slaves[$id]);
5✔
151
        } else {
152
            return false;
1✔
153
        }
154

155
        unset($this->pool[(string) $connection]);
6✔
156

157
        if ($this->aliases && $alias = $connection->getParameters()->alias) {
6✔
158
            unset($this->aliases[$alias]);
×
159
        }
160

161
        $this->reset();
6✔
162

163
        return true;
6✔
164
    }
165

166
    /**
167
     * {@inheritdoc}
168
     */
169
    public function getConnectionByCommand(CommandInterface $command)
27✔
170
    {
171
        if (!$this->current) {
27✔
172
            if ($this->strategy->isReadOperation($command) && $slave = $this->pickSlave()) {
23✔
173
                $this->current = $slave;
13✔
174
            } else {
175
                $this->current = $this->getMasterOrDie();
10✔
176
            }
177

178
            return $this->current;
21✔
179
        }
180

181
        if ($this->current === $master = $this->getMasterOrDie()) {
19✔
182
            return $master;
9✔
183
        }
184

185
        if (!$this->strategy->isReadOperation($command) || !$this->slaves) {
11✔
186
            $this->current = $master;
6✔
187
        }
188

189
        return $this->current;
11✔
190
    }
191

192
    /**
193
     * {@inheritdoc}
194
     */
195
    public function getConnectionById($id)
4✔
196
    {
197
        return $this->pool[$id] ?? null;
4✔
198
    }
199

200
    /**
201
     * Returns a connection instance by its alias.
202
     *
203
     * @param string $alias Connection alias.
204
     *
205
     * @return NodeConnectionInterface|null
206
     */
207
    public function getConnectionByAlias($alias)
3✔
208
    {
209
        return $this->aliases[$alias] ?? null;
3✔
210
    }
211

212
    /**
213
     * Returns a connection by its role.
214
     *
215
     * @param string $role Connection role (`master` or `slave`)
216
     *
217
     * @return NodeConnectionInterface|null
218
     */
219
    public function getConnectionByRole($role)
13✔
220
    {
221
        if ($role === 'master') {
13✔
222
            return $this->getMaster();
10✔
223
        } elseif ($role === 'slave') {
7✔
224
            return $this->pickSlave();
6✔
225
        }
226

227
        return null;
1✔
228
    }
229

230
    /**
231
     * Switches the internal connection in use by the backend.
232
     *
233
     * @param NodeConnectionInterface $connection Connection instance in the pool.
234
     */
235
    public function switchTo(NodeConnectionInterface $connection)
13✔
236
    {
237
        if ($connection && $connection === $this->current) {
13✔
UNCOV
238
            return;
×
239
        }
240

241
        if ($connection !== $this->master && !in_array($connection, $this->slaves, true)) {
13✔
242
            throw new InvalidArgumentException('Invalid connection or connection not found.');
2✔
243
        }
244

245
        $this->current = $connection;
11✔
246
    }
247

248
    /**
249
     * {@inheritdoc}
250
     */
251
    public function switchToMaster()
6✔
252
    {
253
        if (!$connection = $this->getConnectionByRole('master')) {
6✔
254
            throw new InvalidArgumentException('Invalid connection or connection not found.');
1✔
255
        }
256

257
        $this->switchTo($connection);
5✔
258
    }
259

260
    /**
261
     * {@inheritdoc}
262
     */
263
    public function switchToSlave()
2✔
264
    {
265
        if (!$connection = $this->getConnectionByRole('slave')) {
2✔
266
            throw new InvalidArgumentException('Invalid connection or connection not found.');
1✔
267
        }
268

269
        $this->switchTo($connection);
1✔
270
    }
271

272
    /**
273
     * {@inheritdoc}
274
     */
275
    public function getCurrent()
5✔
276
    {
277
        return $this->current;
5✔
278
    }
279

280
    /**
281
     * {@inheritdoc}
282
     */
283
    public function getMaster()
38✔
284
    {
285
        return $this->master;
38✔
286
    }
287

288
    /**
289
     * Returns the connection associated to the master server.
290
     *
291
     * @return NodeConnectionInterface
292
     */
293
    private function getMasterOrDie()
24✔
294
    {
295
        if (!$connection = $this->getMaster()) {
24✔
296
            throw new MissingMasterException('No master server available for replication');
1✔
297
        }
298

299
        return $connection;
23✔
300
    }
301

302
    /**
303
     * {@inheritdoc}
304
     */
305
    public function getSlaves()
6✔
306
    {
307
        return $this->slaves;
6✔
308
    }
309

310
    /**
311
     * Returns the underlying replication strategy.
312
     *
313
     * @return ReplicationStrategy
314
     */
315
    public function getReplicationStrategy()
4✔
316
    {
317
        return $this->strategy;
4✔
318
    }
319

320
    /**
321
     * Returns a random slave.
322
     *
323
     * @return NodeConnectionInterface|null
324
     */
325
    protected function pickSlave()
29✔
326
    {
327
        if (!$this->slaves) {
29✔
328
            return null;
6✔
329
        }
330

331
        return $this->slaves[array_rand($this->slaves)];
24✔
332
    }
333

334
    /**
335
     * {@inheritdoc}
336
     */
337
    public function isConnected()
2✔
338
    {
339
        return $this->current ? $this->current->isConnected() : false;
2✔
340
    }
341

342
    /**
343
     * {@inheritdoc}
344
     */
345
    public function connect()
5✔
346
    {
347
        if (!$this->current) {
5✔
348
            if (!$this->current = $this->pickSlave()) {
5✔
349
                if (!$this->current = $this->getMaster()) {
2✔
350
                    throw new ClientException('No available connection for replication');
1✔
351
                }
352
            }
353
        }
354

355
        $this->current->connect();
4✔
356
    }
357

358
    /**
359
     * {@inheritdoc}
360
     */
361
    public function disconnect()
2✔
362
    {
363
        foreach ($this->pool as $connection) {
2✔
364
            $connection->disconnect();
2✔
365
        }
366
    }
367

368
    /**
369
     * Handles response from INFO.
370
     *
371
     * @param string $response
372
     *
373
     * @return array
374
     */
375
    private function handleInfoResponse($response)
4✔
376
    {
377
        $info = [];
4✔
378

379
        foreach (preg_split('/\r?\n/', $response) as $row) {
4✔
380
            if (strpos($row, ':') === false) {
4✔
381
                continue;
4✔
382
            }
383

384
            [$k, $v] = explode(':', $row, 2);
4✔
385
            $info[$k] = $v;
4✔
386
        }
387

388
        return $info;
4✔
389
    }
390

391
    /**
392
     * Fetches the replication configuration from one of the servers.
393
     */
394
    public function discover()
5✔
395
    {
396
        if (!$this->connectionFactory) {
5✔
397
            throw new ClientException('Discovery requires a connection factory');
1✔
398
        }
399

400
        while (true) {
4✔
401
            try {
402
                if ($connection = $this->getMaster()) {
4✔
403
                    $this->discoverFromMaster($connection, $this->connectionFactory);
3✔
404
                    break;
2✔
405
                } elseif ($connection = $this->pickSlave()) {
2✔
406
                    $this->discoverFromSlave($connection, $this->connectionFactory);
2✔
407
                    break;
2✔
408
                } else {
409
                    throw new ClientException('No connection available for discovery');
×
410
                }
411
            } catch (ConnectionException $exception) {
1✔
412
                $this->remove($connection);
1✔
413
            }
414
        }
415
    }
416

417
    /**
418
     * Discovers the replication configuration by contacting the master node.
419
     *
420
     * @param NodeConnectionInterface $connection        Connection to the master node.
421
     * @param FactoryInterface        $connectionFactory Connection factory instance.
422
     */
423
    protected function discoverFromMaster(NodeConnectionInterface $connection, FactoryInterface $connectionFactory)
4✔
424
    {
425
        $response = $connection->executeCommand(RawCommand::create('INFO', 'REPLICATION'));
4✔
426
        $replication = $this->handleInfoResponse($response);
4✔
427

428
        if ($replication['role'] !== 'master') {
4✔
429
            throw new ClientException("Role mismatch (expected master, got slave) [$connection]");
×
430
        }
431

432
        $this->slaves = [];
4✔
433

434
        foreach ($replication as $k => $v) {
4✔
435
            $parameters = null;
4✔
436

437
            if (strpos($k, 'slave') === 0 && preg_match('/ip=(?P<host>.*),port=(?P<port>\d+)/', $v, $parameters)) {
4✔
438
                $slaveConnection = $connectionFactory->create([
4✔
439
                    'host' => $parameters['host'],
4✔
440
                    'port' => $parameters['port'],
4✔
441
                    'role' => 'slave',
4✔
442
                ]);
4✔
443

444
                $this->add($slaveConnection);
4✔
445
            }
446
        }
447
    }
448

449
    /**
450
     * Discovers the replication configuration by contacting one of the slaves.
451
     *
452
     * @param NodeConnectionInterface $connection        Connection to one of the slaves.
453
     * @param FactoryInterface        $connectionFactory Connection factory instance.
454
     */
455
    protected function discoverFromSlave(NodeConnectionInterface $connection, FactoryInterface $connectionFactory)
2✔
456
    {
457
        $response = $connection->executeCommand(RawCommand::create('INFO', 'REPLICATION'));
2✔
458
        $replication = $this->handleInfoResponse($response);
2✔
459

460
        if ($replication['role'] !== 'slave') {
2✔
461
            throw new ClientException("Role mismatch (expected slave, got master) [$connection]");
×
462
        }
463

464
        $masterConnection = $connectionFactory->create([
2✔
465
            'host' => $replication['master_host'],
2✔
466
            'port' => $replication['master_port'],
2✔
467
            'role' => 'master',
2✔
468
        ]);
2✔
469

470
        $this->add($masterConnection);
2✔
471

472
        $this->discoverFromMaster($masterConnection, $connectionFactory);
2✔
473
    }
474

475
    /**
476
     * Retries the execution of a command upon slave failure.
477
     *
478
     * @param CommandInterface $command Command instance.
479
     * @param string           $method  Actual method.
480
     *
481
     * @return mixed
482
     * @throws Throwable
483
     */
484
    private function retryCommandOnFailure(CommandInterface $command, $method)
17✔
485
    {
486
        $parameters = $this->getParameters();
17✔
487

488
        if (!$parameters->isDisabledRetry() && !$this->connectionFactory instanceof RelayFactory) {
17✔
489
            $retry = $parameters->retry;
1✔
490
            $retry->updateCatchableExceptions([MissingMasterException::class]);
1✔
491

492
            return $retry->callWithRetry(
1✔
493
                function () use ($command, $method) {
1✔
494
                    return $this->executeCommandInternal($command, $method);
1✔
495
                },
1✔
496
                function (Throwable $exception) {
1✔
497
                    $this->onFailCallback($exception);
1✔
498
                }
1✔
499
            );
1✔
500
        }
501

502
        while (true) {
16✔
503
            try {
504
                $connection = $this->getConnectionByCommand($command);
16✔
505
                $response = $connection->$method($command);
15✔
506

507
                if ($response instanceof ResponseErrorInterface && $response->getErrorType() === 'LOADING') {
14✔
508
                    throw new ConnectionException($connection, "Redis is loading the dataset in memory [$connection]");
1✔
509
                }
510

511
                break;
14✔
512
            } catch (ConnectionException $exception) {
6✔
513
                $this->onConnectionExceptionCallback($exception);
5✔
514
            } catch (MissingMasterException $exception) {
1✔
515
                $this->onMissingMasterException($exception);
1✔
516
            }
517
        }
518

519
        return $response;
14✔
520
    }
521

522
    /**
523
     * Executes command against valid connection.
524
     *
525
     * @param  CommandInterface    $command
526
     * @param  string              $method
527
     * @return mixed
528
     * @throws ConnectionException
529
     */
530
    protected function executeCommandInternal(CommandInterface $command, string $method)
1✔
531
    {
532
        $connection = $this->getConnectionByCommand($command);
1✔
533
        $response = $connection->$method($command);
1✔
534

535
        if ($response instanceof ResponseErrorInterface && $response->getErrorType() === 'LOADING') {
1✔
NEW
536
            throw new ConnectionException($connection, "Redis is loading the dataset in memory [$connection]");
×
537
        }
538

539
        return $response;
1✔
540
    }
541

542
    /**
543
     * {@inheritdoc}
544
     */
545
    public function writeRequest(CommandInterface $command)
1✔
546
    {
547
        $this->retryCommandOnFailure($command, __FUNCTION__);
1✔
548
    }
549

550
    /**
551
     * {@inheritdoc}
552
     */
553
    public function readResponse(CommandInterface $command)
1✔
554
    {
555
        return $this->retryCommandOnFailure($command, __FUNCTION__);
1✔
556
    }
557

558
    /**
559
     * {@inheritdoc}
560
     */
561
    public function executeCommand(CommandInterface $command)
15✔
562
    {
563
        return $this->retryCommandOnFailure($command, __FUNCTION__);
15✔
564
    }
565

566
    /**
567
     * {@inheritdoc}
568
     */
569
    public function __sleep()
1✔
570
    {
571
        return ['master', 'slaves', 'pool', 'aliases', 'strategy'];
1✔
572
    }
573

574
    /**
575
     * {@inheritdoc}
576
     */
577
    public function getParameters(): ?ParametersInterface
23✔
578
    {
579
        if (isset($this->master)) {
23✔
580
            return $this->master->getParameters();
20✔
581
        }
582

583
        $slave = $this->pickSlave();
3✔
584

585
        if (null !== $slave) {
3✔
586
            return $slave->getParameters();
3✔
587
        }
588

589
        return null;
×
590
    }
591

592
    /**
593
     * Handle connection exception.
594
     *
595
     * @param  ConnectionException                 $exception
596
     * @return void
597
     * @throws ClientException|ConnectionException
598
     */
599
    private function onConnectionExceptionCallback(ConnectionException $exception)
5✔
600
    {
601
        $connection = $exception->getConnection();
5✔
602
        $connection->disconnect();
5✔
603

604
        if ($connection === $this->master && !$this->autoDiscovery) {
5✔
605
            // Throw immediately when master connection is failing, even
606
            // when the command represents a read-only operation, unless
607
            // automatic discovery has been enabled.
608
            throw $exception;
1✔
609
        } else {
610
            // Otherwise remove the failing slave and attempt to execute
611
            // the command again on one of the remaining slaves...
612
            $this->remove($connection);
4✔
613
        }
614

615
        // ... that is, unless we have no more connections to use.
616
        if (!$this->slaves && !$this->master) {
4✔
NEW
617
            throw $exception;
×
618
        } elseif ($this->autoDiscovery) {
4✔
619
            $this->discover();
1✔
620
        }
621
    }
622

623
    /**
624
     * Exception handling callback.
625
     *
626
     * @param  Throwable $exception
627
     * @return void
628
     * @throws Throwable
629
     */
630
    private function onFailCallback(Throwable $exception)
1✔
631
    {
632
        if ($exception instanceof ConnectionException) {
1✔
NEW
633
            $this->onConnectionExceptionCallback($exception);
×
634

NEW
635
            return;
×
636
        }
637

638
        if ($exception instanceof MissingMasterException) {
1✔
NEW
639
            $this->onMissingMasterException($exception);
×
640

NEW
641
            return;
×
642
        }
643

644
        if ($exception instanceof TimeoutException) {
1✔
645
            $connection = $exception->getConnection();
1✔
646

647
            if ($connection) {
1✔
648
                $connection->disconnect();
1✔
649

650
                return;
1✔
651
            }
652
        }
653

NEW
654
        throw $exception;
×
655
    }
656

657
    /**
658
     * @param  MissingMasterException $exception
659
     * @return void
660
     * @throws ClientException
661
     * @throws MissingMasterException
662
     */
663
    private function onMissingMasterException(MissingMasterException $exception)
1✔
664
    {
665
        if ($this->autoDiscovery) {
1✔
NEW
666
            $this->discover();
×
667
        } else {
668
            throw $exception;
1✔
669
        }
670
    }
671
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc