• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

predis / predis / 13716769884

07 Mar 2025 08:31AM UTC coverage: 92.8% (+0.03%) from 92.772%
13716769884

push

github

web-flow
Added support for transactions in OSS Cluster (#1497)

* Added support for transactions in OSS Cluster

* Updated test cases

* Updated version restriction

* Updated server version restriction

* Added test skip for Relay

* Added test case for Relay

* Set transaction slot to null in case of failed transaction

* Restrict usage of transaction commands outside of transaction context

* Added handling for Relay responses

* Reverted changes

* Removed unsupported command

* Added dummy arguments

* Codestyle fixes

* Added additional test coverage

* Added CHANGELOG and README entries

* Added missing PR reference

* Missing word

* Fixed README

124 of 136 new or added lines in 11 files covered. (91.18%)

4 existing lines in 1 file now uncovered.

6870 of 7403 relevant lines covered (92.8%)

111.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.97
/src/Transaction/Strategy/ClusterConnectionStrategy.php
1
<?php
2

3
/*
4
 * This file is part of the Predis package.
5
 *
6
 * (c) 2009-2020 Daniele Alessandri
7
 * (c) 2021-2023 Till Krüss
8
 *
9
 * For the full copyright and license information, please view the LICENSE
10
 * file that was distributed with this source code.
11
 */
12

13
namespace Predis\Transaction\Strategy;
14

15
use Predis\Command\CommandInterface;
16
use Predis\Command\Redis\DISCARD;
17
use Predis\Command\Redis\EXEC;
18
use Predis\Command\Redis\MULTI;
19
use Predis\Command\Redis\UNWATCH;
20
use Predis\Command\Redis\WATCH;
21
use Predis\Connection\Cluster\ClusterInterface;
22
use Predis\Response\Error;
23
use Predis\Response\Status;
24
use Predis\Transaction\Exception\TransactionException;
25
use Predis\Transaction\MultiExecState;
26
use Relay\Relay;
27
use SplQueue;
28

29
class ClusterConnectionStrategy implements StrategyInterface
30
{
31
    /**
32
     * @var ClusterInterface
33
     */
34
    private $connection;
35

36
    /**
37
     * Server-matching slot of the current transaction.
38
     *
39
     * @var ?int
40
     */
41
    private $slot;
42

43
    /**
44
     * In cluster environment it needs to be queued to ensure
45
     * that all commands will point to the same node.
46
     *
47
     * @var SplQueue
48
     */
49
    private $commandsQueue;
50

51
    /**
52
     * Shows if transaction context was initialized.
53
     *
54
     * @var bool
55
     */
56
    private $isInitialized = false;
57

58
    /**
59
     * @var \Predis\Cluster\StrategyInterface
60
     */
61
    private $clusterStrategy;
62

63
    /**
64
     * @var MultiExecState
65
     */
66
    private $state;
67

68
    public function __construct(ClusterInterface $connection, MultiExecState $state)
21✔
69
    {
70
        $this->commandsQueue = new SplQueue();
21✔
71
        $this->connection = $connection;
21✔
72
        $this->state = $state;
21✔
73
        $this->clusterStrategy = $this->connection->getClusterStrategy();
21✔
74
    }
75

76
    /**
77
     * {@inheritDoc}
78
     */
79
    public function executeCommand(CommandInterface $command)
14✔
80
    {
81
        if (!$this->isInitialized) {
14✔
82
            throw new TransactionException('Transaction context should be initialized first');
2✔
83
        }
84

85
        $commandSlot = $this->clusterStrategy->getSlot($command);
12✔
86

87
        if (null === $this->slot) {
12✔
88
            $this->slot = $commandSlot;
8✔
89
        }
90

91
        if (null === $commandSlot && null !== $this->slot) {
12✔
92
            $command->setSlot($this->slot);
3✔
93
        }
94

95
        if (is_int($commandSlot) && $commandSlot !== $this->slot) {
12✔
96
            return new Error(
3✔
97
                'To be able to execute a transaction against cluster, all commands should operate on the same hash slot'
3✔
98
            );
3✔
99
        }
100

101
        $this->commandsQueue->enqueue($command);
12✔
102

103
        return new Status('QUEUED');
12✔
104
    }
105

106
    /**
107
     * {@inheritDoc}
108
     */
109
    public function initializeTransaction(): bool
13✔
110
    {
111
        if ($this->isInitialized) {
13✔
112
            return true;
1✔
113
        }
114

115
        $this->commandsQueue->enqueue(new MULTI());
13✔
116
        $this->isInitialized = true;
13✔
117

118
        return true;
13✔
119
    }
120

121
    /**
122
     * {@inheritDoc}
123
     */
124
    public function executeTransaction()
6✔
125
    {
126
        if (!$this->isInitialized) {
6✔
NEW
127
            throw new TransactionException('Transaction context should be initialized first');
×
128
        }
129

130
        $exec = new EXEC();
6✔
131

132
        /** @var MULTI $multi */
133
        $multi = $this->commandsQueue->dequeue();
6✔
134
        $multiResp = $this->setSlotAndExecute($multi);
6✔
135

136
        // Begin transaction
137
        if (('OK' != $multiResp) && !$multiResp instanceof Relay) {
6✔
138
            $this->slot = null;
1✔
139

140
            return null;
1✔
141
        }
142

143
        // Transaction body
144
        while (!$this->commandsQueue->isEmpty()) {
5✔
145
            /** @var CommandInterface $command */
146
            $command = $this->commandsQueue->dequeue();
5✔
147
            $commandResp = $this->setSlotAndExecute($command);
5✔
148

149
            if (('QUEUED' != $commandResp) && !$commandResp instanceof Relay) {
5✔
150
                $this->slot = null;
1✔
151

152
                return null;
1✔
153
            }
154
        }
155

156
        // Execute transaction
157
        $exec = $this->setSlotAndExecute($exec);
4✔
158
        $this->slot = null;
4✔
159

160
        return $exec;
4✔
161
    }
162

163
    /**
164
     * {@inheritDoc}
165
     */
166
    public function multi()
1✔
167
    {
168
        $response = $this->setSlotAndExecute(new MULTI());
1✔
169

170
        if ('OK' == $response) {
1✔
NEW
171
            $this->isInitialized = true;
×
172
        }
173

174
        return $response;
1✔
175
    }
176

177
    /**
178
     * {@inheritDoc}
179
     */
180
    public function watch(array $keys)
7✔
181
    {
182
        if (!$this->clusterStrategy->checkSameSlotForKeys($keys)) {
7✔
183
            throw new TransactionException('WATCHed keys should point to the same hash slot');
2✔
184
        }
185

186
        $this->slot = $this->clusterStrategy->getSlotByKey($keys[0]);
5✔
187

188
        $watch = new WATCH();
5✔
189
        $watch->setArguments($keys);
5✔
190

191
        $response = 'OK' == $this->setSlotAndExecute($watch);
5✔
192

193
        if ($this->state->check(MultiExecState::CAS)) {
5✔
194
            $this->initializeTransaction();
4✔
195
        }
196

197
        return $response;
5✔
198
    }
199

200
    /**
201
     * {@inheritDoc}
202
     */
203
    public function discard()
2✔
204
    {
205
        return $this->setSlotAndExecute(new DISCARD());
2✔
206
    }
207

208
    /**
209
     * {@inheritDoc}
210
     */
211
    public function unwatch()
2✔
212
    {
213
        return $this->setSlotAndExecute(new UNWATCH());
2✔
214
    }
215

216
    /**
217
     * Assigns slot to a command and executes.
218
     *
219
     * @param  CommandInterface $command
220
     * @return mixed
221
     */
222
    private function setSlotAndExecute(CommandInterface $command)
13✔
223
    {
224
        if (null !== $this->slot) {
13✔
225
            $command->setSlot($this->slot);
7✔
226
        }
227

228
        return $this->connection->executeCommand($command);
13✔
229
    }
230
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc