• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

predis / predis / 20455852140

23 Dec 2025 08:40AM UTC coverage: 92.615% (-0.2%) from 92.788%
20455852140

push

github

web-flow
Added retry support (#1616)

* Initial work on retries

* Added retry class and test coverage

* Added support for standalone and cluster

* Make TimeoutException instance of CommunicationException

* Added pipeline, trasnaction, replication support

* Fixed broken test

* Marked test as relay-incompatible

* Marked test as relay-incompatible

* Fixed analysis errors, added missing tests

* Codestyle fixes

* Fixed test

* Update README.md

* Update README.md

* Update README.md

* Updated README.md

* Refactor retry on read and write

* Added check for timeout value

* Updated README.md

* Fixed README.md

* Codestyle changes

* Added missing coverage

* Added missing test coverage

* Removed comments

* Added retry support for Relay connection (#1620)

* Added integration test case with mocked retry

* Changed client initialisation in tests

* Marked test as relay-incompatible

---------

Co-authored-by: Pavlo Yatsukhnenko <yatsukhnenko@users.noreply.github.com>

301 of 358 new or added lines in 18 files covered. (84.08%)

2 existing lines in 2 files now uncovered.

8214 of 8869 relevant lines covered (92.61%)

112.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.54
/src/Pipeline/Atomic.php
1
<?php
2

3
/*
4
 * This file is part of the Predis package.
5
 *
6
 * (c) 2009-2020 Daniele Alessandri
7
 * (c) 2021-2025 Till Krüss
8
 *
9
 * For the full copyright and license information, please view the LICENSE
10
 * file that was distributed with this source code.
11
 */
12

13
namespace Predis\Pipeline;
14

15
use Predis\ClientException;
16
use Predis\ClientInterface;
17
use Predis\Command\Command;
18
use Predis\Command\CommandInterface;
19
use Predis\CommunicationException;
20
use Predis\Connection\ConnectionInterface;
21
use Predis\Connection\NodeConnectionInterface;
22
use Predis\Response\ErrorInterface as ErrorResponseInterface;
23
use Predis\Response\ResponseInterface;
24
use Predis\Response\ServerException;
25
use SplQueue;
26
use Throwable;
27

28
/**
29
 * Command pipeline wrapped into a MULTI / EXEC transaction.
30
 */
31
class Atomic extends Pipeline
32
{
33
    /**
34
     * {@inheritdoc}
35
     */
36
    public function __construct(ClientInterface $client)
8✔
37
    {
38
        if (!$client->getCommandFactory()->supports('multi', 'exec', 'discard')) {
8✔
39
            throw new ClientException(
×
40
                "'MULTI', 'EXEC' and 'DISCARD' are not supported by the current command factory."
×
41
            );
×
42
        }
43

44
        parent::__construct($client);
8✔
45
    }
46

47
    /**
48
     * {@inheritdoc}
49
     */
50
    protected function getConnection()
7✔
51
    {
52
        $connection = $this->getClient()->getConnection();
7✔
53

54
        if (!$connection instanceof NodeConnectionInterface) {
7✔
55
            $class = __CLASS__;
1✔
56

57
            throw new ClientException("The class '$class' does not support aggregate connections.");
1✔
58
        }
59

60
        return $connection;
6✔
61
    }
62

63
    /**
64
     * {@inheritdoc}
65
     */
66
    protected function executePipeline(ConnectionInterface $connection, SplQueue $commands)
6✔
67
    {
68
        $commandFactory = $this->getClient()->getCommandFactory();
6✔
69
        $retry = $connection->getParameters()->retry;
6✔
70
        $this->executeCommandWithRetry($connection, $commandFactory->create('multi'));
6✔
71

72
        $retry->callWithRetry(function () use ($connection, $commands) {
6✔
73
            $this->queuePipeline($connection, $commands);
6✔
74
        }, function (Throwable $exception) {
6✔
75
            if ($exception instanceof CommunicationException) {
1✔
76
                $exception->getConnection()->disconnect();
1✔
77
            }
78
        });
6✔
79

80
        $executed = $this->executeCommandWithRetry($connection, $commandFactory->create('exec'));
4✔
81

82
        if (!isset($executed)) {
4✔
83
            throw new ClientException(
1✔
84
                'The underlying transaction has been aborted by the server.'
1✔
85
            );
1✔
86
        }
87

88
        if (count($executed) !== count($commands)) {
3✔
89
            $expected = count($commands);
×
90
            $received = count($executed);
×
91

92
            throw new ClientException(
×
93
                "Invalid number of responses [expected $expected, received $received]."
×
94
            );
×
95
        }
96

97
        $responses = [];
3✔
98
        $sizeOfPipe = count($commands);
3✔
99
        $exceptions = $this->throwServerExceptions();
3✔
100
        $protocolVersion = (int) $connection->getParameters()->protocol;
3✔
101

102
        for ($i = 0; $i < $sizeOfPipe; ++$i) {
3✔
103
            $command = $commands->dequeue();
3✔
104
            $response = $executed[$i];
3✔
105

106
            if (!$response instanceof ResponseInterface) {
3✔
107
                if ($protocolVersion === 2) {
1✔
108
                    $responses[] = $command->parseResponse($response);
1✔
109
                } else {
110
                    $responses[] = $command->parseResp3Response($response);
×
111
                }
112
            } elseif ($response instanceof ErrorResponseInterface && $exceptions) {
2✔
113
                $this->exception($connection, $response);
×
114
            } else {
115
                $responses[] = $response;
2✔
116
            }
117

118
            unset($executed[$i]);
3✔
119
        }
120

121
        return $responses;
3✔
122
    }
123

124
    /**
125
     * @param  ConnectionInterface $connection
126
     * @param  SplQueue            $commands
127
     * @return void
128
     * @throws Throwable
129
     */
130
    protected function queuePipeline(ConnectionInterface $connection, SplQueue $commands)
6✔
131
    {
132
        $commandFactory = $this->getClient()->getCommandFactory();
6✔
133
        $this->writeToSingleNode($connection, $commands);
6✔
134

135
        foreach ($commands as $command) {
6✔
136
            $response = $connection->readResponse($command);
6✔
137

138
            if ($response instanceof ErrorResponseInterface) {
6✔
139
                $this->executeCommandWithRetry($connection, $commandFactory->create('discard'));
2✔
140
                throw new ServerException($response->getMessage());
2✔
141
            }
142
        }
143
    }
144

145
    /**
146
     * @param  ConnectionInterface $connection
147
     * @param  Command             $command
148
     * @return mixed
149
     * @throws Throwable
150
     */
151
    protected function executeCommandWithRetry(ConnectionInterface $connection, CommandInterface $command)
6✔
152
    {
153
        $retry = $connection->getParameters()->retry;
6✔
154

155
        return $retry->callWithRetry(function () use ($connection, $command) {
6✔
156
            return $connection->executeCommand($command);
6✔
157
        }, function (Throwable $e) {
6✔
NEW
158
            if ($e instanceof CommunicationException) {
×
NEW
159
                $e->getConnection()->disconnect();
×
160
            }
161
        });
6✔
162
    }
163
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc