• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

predis / predis / 14450042814

14 Apr 2025 03:51PM UTC coverage: 92.662% (-0.09%) from 92.752%
14450042814

push

github

web-flow
Merge pull request #1527 from predis/vv-readme-8.0-support

7147 of 7713 relevant lines covered (92.66%)

111.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.99
/src/Connection/RelayConnection.php
1
<?php
2

3
/*
4
 * This file is part of the Predis package.
5
 *
6
 * (c) 2009-2020 Daniele Alessandri
7
 * (c) 2021-2025 Till Krüss
8
 *
9
 * For the full copyright and license information, please view the LICENSE
10
 * file that was distributed with this source code.
11
 */
12

13
namespace Predis\Connection;
14

15
use InvalidArgumentException;
16
use Predis\ClientException;
17
use Predis\Command\CommandInterface;
18
use Predis\NotSupportedException;
19
use Predis\Response\ErrorInterface as ErrorResponseInterface;
20
use Predis\Response\ServerException;
21
use Relay\Exception as RelayException;
22
use Relay\Relay;
23

24
/**
25
 * This class provides the implementation of a Predis connection that
26
 * uses Relay for network communication and in-memory caching.
27
 *
28
 * Using Relay allows for:
29
 * 1) significantly faster reads thanks to in-memory caching
30
 * 2) fast data serialization using igbinary
31
 * 3) fast data compression using lzf, lz4 or zstd
32
 *
33
 * Usage of igbinary serialization and zstd compresses reduces
34
 * network traffic and Redis memory usage by ~75%.
35
 *
36
 * For instructions on how to install the Relay extension, please consult
37
 * the repository of the project: https://relay.so/docs/installation
38
 *
39
 * The connection parameters supported by this class are:
40
 *
41
 *  - scheme: it can be either 'tcp', 'tls' or 'unix'.
42
 *  - host: hostname or IP address of the server.
43
 *  - port: TCP port of the server.
44
 *  - path: path of a UNIX domain socket when scheme is 'unix'.
45
 *  - timeout: timeout to perform the connection.
46
 *  - read_write_timeout: timeout of read / write operations.
47
 *  - cache: whether to use in-memory caching
48
 *  - serializer: data serializer
49
 *  - compression: data compression algorithm
50
 *
51
 * @see https://github.com/cachewerk/relay
52
 */
53
class RelayConnection extends AbstractConnection
54
{
55
    use RelayMethods;
56

57
    /**
58
     * The Relay instance.
59
     *
60
     * @var Relay
61
     */
62
    protected $client;
63

64
    /**
65
     * These commands must be called on the client, not using `Relay::rawCommand()`.
66
     *
67
     * @var string[]
68
     */
69
    public $atypicalCommands = [
70
        'AUTH',
71
        'SELECT',
72

73
        'TYPE',
74

75
        'MULTI',
76
        'EXEC',
77
        'DISCARD',
78

79
        'WATCH',
80
        'UNWATCH',
81

82
        'SUBSCRIBE',
83
        'UNSUBSCRIBE',
84
        'PSUBSCRIBE',
85
        'PUNSUBSCRIBE',
86
        'SSUBSCRIBE',
87
        'SUNSUBSCRIBE',
88
    ];
89

90
    /**
91
     * {@inheritdoc}
92
     */
93
    public function __construct(ParametersInterface $parameters, Relay $client)
1,048✔
94
    {
95
        $this->assertExtensions();
1,048✔
96

97
        $this->parameters = $this->assertParameters($parameters);
1,048✔
98
        $this->client = $client;
1,048✔
99
    }
100

101
    /**
102
     * {@inheritdoc}
103
     */
104
    public function isConnected()
1,037✔
105
    {
106
        return $this->client->isConnected();
1,037✔
107
    }
108

109
    /**
110
     * {@inheritdoc}
111
     */
112
    public function disconnect()
1,038✔
113
    {
114
        if ($this->client->isConnected()) {
1,038✔
115
            $this->client->close();
1,030✔
116
        }
117
    }
118

119
    /**
120
     * Checks if the Relay extension is loaded in PHP.
121
     */
122
    private function assertExtensions()
1,048✔
123
    {
124
        if (!extension_loaded('relay')) {
1,048✔
125
            throw new NotSupportedException(
×
126
                'The "relay" extension is required by this connection backend.'
×
127
            );
×
128
        }
129
    }
130

131
    /**
132
     * Creates a new instance of the client.
133
     *
134
     * @return Relay
135
     */
136
    private function createClient()
×
137
    {
138
        $client = new Relay();
×
139

140
        // throw when errors occur and return `null` for non-existent keys
141
        $client->setOption(Relay::OPT_PHPREDIS_COMPATIBILITY, false);
×
142

143
        // use reply literals
144
        $client->setOption(Relay::OPT_REPLY_LITERAL, true);
×
145

146
        // disable Relay's command/connection retry
147
        $client->setOption(Relay::OPT_MAX_RETRIES, 0);
×
148

149
        // whether to use in-memory caching
150
        $client->setOption(Relay::OPT_USE_CACHE, $this->parameters->cache ?? true);
×
151

152
        // set data serializer
153
        $client->setOption(Relay::OPT_SERIALIZER, constant(sprintf(
×
154
            '%s::SERIALIZER_%s',
×
155
            Relay::class,
×
156
            strtoupper($this->parameters->serializer ?? 'none')
×
157
        )));
×
158

159
        // set data compression algorithm
160
        $client->setOption(Relay::OPT_COMPRESSION, constant(sprintf(
×
161
            '%s::COMPRESSION_%s',
×
162
            Relay::class,
×
163
            strtoupper($this->parameters->compression ?? 'none')
×
164
        )));
×
165

166
        return $client;
×
167
    }
168

169
    /**
170
     * Returns the underlying client.
171
     *
172
     * @return Relay
173
     */
174
    public function getClient()
9✔
175
    {
176
        return $this->client;
9✔
177
    }
178

179
    /**
180
     * @param  ParametersInterface $parameters
181
     * @param                      $address
182
     * @param                      $flags
183
     * @return Relay
184
     */
185
    protected function connectWithConfiguration(ParametersInterface $parameters, $address, $flags)
1,033✔
186
    {
187
        $timeout = isset($parameters->timeout) ? (float) $parameters->timeout : 5.0;
1,033✔
188

189
        $retry_interval = 0;
1,033✔
190
        $read_timeout = 5.0;
1,033✔
191

192
        if (isset($parameters->read_write_timeout)) {
1,033✔
193
            $read_timeout = (float) $parameters->read_write_timeout;
5✔
194
            $read_timeout = $read_timeout > 0 ? $read_timeout : 0;
5✔
195
        }
196

197
        try {
198
            $this->client->connect(
1,033✔
199
                $parameters->path ?? $parameters->host,
1,033✔
200
                isset($parameters->path) ? 0 : $parameters->port,
1,033✔
201
                $timeout,
1,033✔
202
                null,
1,033✔
203
                $retry_interval,
1,033✔
204
                $read_timeout
1,033✔
205
            );
1,033✔
206
        } catch (RelayException $ex) {
×
207
            $this->onConnectionError($ex->getMessage(), $ex->getCode());
×
208
        }
209

210
        return $this->client;
1,033✔
211
    }
212

213
    /**
214
     * {@inheritdoc}
215
     */
216
    public function getIdentifier()
28✔
217
    {
218
        try {
219
            return $this->client->endpointId();
28✔
220
        } catch (RelayException $ex) {
27✔
221
            return parent::getIdentifier();
27✔
222
        }
223
    }
224

225
    /**
226
     * {@inheritdoc}
227
     */
228
    public function executeCommand(CommandInterface $command)
1,035✔
229
    {
230
        if (!$this->client->isConnected()) {
1,035✔
231
            $this->getResource();
7✔
232
        }
233

234
        try {
235
            $name = $command->getId();
1,035✔
236

237
            // When using compression or a serializer, we'll need a dedicated
238
            // handler for `Predis\Command\RawCommand` calls, currently both
239
            // parameters are unsupported until a future Relay release
240
            return in_array($name, $this->atypicalCommands)
1,035✔
241
                ? $this->client->{$name}(...$command->getArguments())
1,006✔
242
                : $this->client->rawCommand($name, ...$command->getArguments());
1,035✔
243
        } catch (RelayException $ex) {
182✔
244
            $exception = $this->onCommandError($ex, $command);
181✔
245

246
            if ($exception instanceof ErrorResponseInterface) {
181✔
247
                return $exception;
179✔
248
            }
249

250
            throw $exception;
2✔
251
        }
252
    }
253

254
    /**
255
     * {@inheritdoc}
256
     */
257
    public function onCommandError(RelayException $exception, CommandInterface $command)
181✔
258
    {
259
        $code = $exception->getCode();
181✔
260
        $message = $exception->getMessage();
181✔
261

262
        if (strpos($message, 'RELAY_ERR_IO') !== false) {
181✔
263
            return new ConnectionException($this, $message, $code, $exception);
×
264
        }
265

266
        if (strpos($message, 'RELAY_ERR_REDIS') !== false) {
181✔
267
            return new ServerException($message, $code, $exception);
179✔
268
        }
269

270
        if (strpos($message, 'RELAY_ERR_WRONGTYPE') !== false && strpos($message, "Got reply-type 'status'") !== false) {
2✔
271
            $message = 'Operation against a key holding the wrong kind of value';
×
272
        }
273

274
        return new ClientException($message, $code, $exception);
2✔
275
    }
276

277
    /**
278
     * Applies the configured serializer and compression to given value.
279
     *
280
     * @param  mixed  $value
281
     * @return string
282
     */
283
    public function pack($value)
×
284
    {
285
        return $this->client->_pack($value);
×
286
    }
287

288
    /**
289
     * Deserializes and decompresses to given value.
290
     *
291
     * @param  mixed  $value
292
     * @return string
293
     */
294
    public function unpack($value)
×
295
    {
296
        return $this->client->_unpack($value);
×
297
    }
298

299
    /**
300
     * {@inheritdoc}
301
     */
302
    public function writeRequest(CommandInterface $command)
×
303
    {
304
        throw new NotSupportedException('The "relay" extension does not support writing requests.');
×
305
    }
306

307
    /**
308
     * {@inheritdoc}
309
     */
310
    public function readResponse(CommandInterface $command)
×
311
    {
312
        throw new NotSupportedException('The "relay" extension does not support reading responses.');
×
313
    }
314

315
    /**
316
     * {@inheritdoc}
317
     */
318
    public function __destruct()
1,037✔
319
    {
320
        $this->disconnect();
1,037✔
321
    }
322

323
    /**
324
     * {@inheritdoc}
325
     */
326
    protected function createResource()
1,033✔
327
    {
328
        switch ($this->parameters->scheme) {
1,033✔
329
            case 'tcp':
1,033✔
330
            case 'redis':
×
331
                return $this->initializeTcpConnection($this->parameters);
1,033✔
332

333
            case 'unix':
×
334
                return $this->initializeUnixConnection($this->parameters);
×
335

336
            default:
337
                throw new InvalidArgumentException("Invalid scheme: '{$this->parameters->scheme}'.");
×
338
        }
339
    }
340

341
    /**
342
     * Initializes a TCP connection via client.
343
     *
344
     * @param ParametersInterface $parameters Initialization parameters for the connection.
345
     *
346
     * @return Relay
347
     */
348
    protected function initializeTcpConnection(ParametersInterface $parameters)
1,033✔
349
    {
350
        if (!filter_var($parameters->host, FILTER_VALIDATE_IP, FILTER_FLAG_IPV6)) {
1,033✔
351
            $address = "tcp://$parameters->host:$parameters->port";
1,033✔
352
        } else {
353
            $address = "tcp://[$parameters->host]:$parameters->port";
×
354
        }
355

356
        $flags = STREAM_CLIENT_CONNECT;
1,033✔
357

358
        if (isset($parameters->async_connect) && $parameters->async_connect) {
1,033✔
359
            $flags |= STREAM_CLIENT_ASYNC_CONNECT;
×
360
        }
361

362
        if (isset($parameters->persistent)) {
1,033✔
363
            if (false !== $persistent = filter_var($parameters->persistent, FILTER_VALIDATE_BOOLEAN, FILTER_NULL_ON_FAILURE)) {
×
364
                $flags |= STREAM_CLIENT_PERSISTENT;
×
365

366
                if ($persistent === null) {
×
367
                    $address = "{$address}/{$parameters->persistent}";
×
368
                }
369
            }
370
        }
371

372
        return $this->connectWithConfiguration($parameters, $address, $flags);
1,033✔
373
    }
374

375
    /**
376
     * Initializes a UNIX connection via client.
377
     *
378
     * @param ParametersInterface $parameters Initialization parameters for the connection.
379
     *
380
     * @return Relay
381
     */
382
    protected function initializeUnixConnection(ParametersInterface $parameters)
×
383
    {
384
        if (!isset($parameters->path)) {
×
385
            throw new InvalidArgumentException('Missing UNIX domain socket path.');
×
386
        }
387

388
        $flags = STREAM_CLIENT_CONNECT;
×
389

390
        if (isset($parameters->persistent)) {
×
391
            if (false !== $persistent = filter_var($parameters->persistent, FILTER_VALIDATE_BOOLEAN, FILTER_NULL_ON_FAILURE)) {
×
392
                $flags |= STREAM_CLIENT_PERSISTENT;
×
393

394
                if ($persistent === null) {
×
395
                    throw new InvalidArgumentException(
×
396
                        'Persistent connection IDs are not supported when using UNIX domain sockets.'
×
397
                    );
×
398
                }
399
            }
400
        }
401

402
        return $this->connectWithConfiguration($parameters, "unix://{$parameters->path}", $flags);
×
403
    }
404

405
    /**
406
     * {@inheritdoc}
407
     */
408
    public function connect()
1,036✔
409
    {
410
        if (parent::connect() && $this->initCommands) {
1,036✔
411
            foreach ($this->initCommands as $command) {
1,004✔
412
                $response = $this->executeCommand($command);
1,004✔
413

414
                if ($response instanceof ErrorResponseInterface && ($command->getId() === 'CLIENT')) {
1,004✔
415
                    // Do nothing on CLIENT SETINFO command failure
416
                } elseif ($response instanceof ErrorResponseInterface) {
1,004✔
417
                    $this->onConnectionError("`{$command->getId()}` failed: {$response->getMessage()}", 0);
2✔
418
                }
419
            }
420
        }
421
    }
422

423
    /**
424
     * {@inheritdoc}
425
     */
426
    public function read()
1✔
427
    {
428
        throw new NotSupportedException('The "relay" extension does not support reading responses.');
1✔
429
    }
430

431
    /**
432
     * {@inheritdoc}
433
     */
434
    protected function assertParameters(ParametersInterface $parameters)
1,048✔
435
    {
436
        if (!in_array($parameters->scheme, ['tcp', 'tls', 'unix', 'redis', 'rediss'])) {
1,048✔
437
            throw new InvalidArgumentException("Invalid scheme: '{$parameters->scheme}'.");
×
438
        }
439

440
        if (!in_array($parameters->serializer, [null, 'php', 'igbinary', 'msgpack', 'json'])) {
1,048✔
441
            throw new InvalidArgumentException("Invalid serializer: '{$parameters->serializer}'.");
×
442
        }
443

444
        if (!in_array($parameters->compression, [null, 'lzf', 'lz4', 'zstd'])) {
1,048✔
445
            throw new InvalidArgumentException("Invalid compression algorithm: '{$parameters->compression}'.");
×
446
        }
447

448
        return $parameters;
1,048✔
449
    }
450

451
    /**
452
     * {@inheritDoc}
453
     */
454
    public function write(string $buffer): void
1✔
455
    {
456
        throw new NotSupportedException('The "relay" extension does not support writing operations.');
1✔
457
    }
458
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc