• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

predis / predis / 16929978125

13 Aug 2025 06:59AM UTC coverage: 93.114% (+0.004%) from 93.11%
16929978125

push

github

web-flow
Refactor pipeline data writing depends on connection type (#1586)

* Refactor pipeline data writing depends on connection type

* Updated CHANGELOG.md

17 of 20 new or added lines in 4 files covered. (85.0%)

3 existing lines in 2 files now uncovered.

7681 of 8249 relevant lines covered (93.11%)

112.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.0
/src/Pipeline/Atomic.php
1
<?php
2

3
/*
4
 * This file is part of the Predis package.
5
 *
6
 * (c) 2009-2020 Daniele Alessandri
7
 * (c) 2021-2025 Till Krüss
8
 *
9
 * For the full copyright and license information, please view the LICENSE
10
 * file that was distributed with this source code.
11
 */
12

13
namespace Predis\Pipeline;
14

15
use Predis\ClientException;
16
use Predis\ClientInterface;
17
use Predis\Connection\AggregateConnectionInterface;
18
use Predis\Connection\ConnectionInterface;
19
use Predis\Connection\NodeConnectionInterface;
20
use Predis\Response\ErrorInterface as ErrorResponseInterface;
21
use Predis\Response\ResponseInterface;
22
use Predis\Response\ServerException;
23
use SplQueue;
24

25
/**
26
 * Command pipeline wrapped into a MULTI / EXEC transaction.
27
 */
28
class Atomic extends Pipeline
29
{
30
    /**
31
     * {@inheritdoc}
32
     */
33
    public function __construct(ClientInterface $client)
7✔
34
    {
35
        if (!$client->getCommandFactory()->supports('multi', 'exec', 'discard')) {
7✔
36
            throw new ClientException(
×
37
                "'MULTI', 'EXEC' and 'DISCARD' are not supported by the current command factory."
×
38
            );
×
39
        }
40

41
        parent::__construct($client);
7✔
42
    }
43

44
    /**
45
     * {@inheritdoc}
46
     */
47
    protected function getConnection()
6✔
48
    {
49
        $connection = $this->getClient()->getConnection();
6✔
50

51
        if (!$connection instanceof NodeConnectionInterface) {
6✔
52
            $class = __CLASS__;
1✔
53

54
            throw new ClientException("The class '$class' does not support aggregate connections.");
1✔
55
        }
56

57
        return $connection;
5✔
58
    }
59

60
    /**
61
     * {@inheritdoc}
62
     */
63
    protected function executePipeline(ConnectionInterface $connection, SplQueue $commands)
5✔
64
    {
65
        $commandFactory = $this->getClient()->getCommandFactory();
5✔
66
        $connection->executeCommand($commandFactory->create('multi'));
5✔
67

68
        if ($connection instanceof AggregateConnectionInterface) {
5✔
NEW
69
            $this->writeToMultiNode($connection, $commands);
×
70
        } else {
71
            $this->writeToSingleNode($connection, $commands);
5✔
72
        }
73

74
        foreach ($commands as $command) {
5✔
75
            $response = $connection->readResponse($command);
5✔
76

77
            if ($response instanceof ErrorResponseInterface) {
5✔
78
                $connection->executeCommand($commandFactory->create('discard'));
2✔
79
                throw new ServerException($response->getMessage());
2✔
80
            }
81
        }
82

83
        $executed = $connection->executeCommand($commandFactory->create('exec'));
3✔
84

85
        if (!isset($executed)) {
3✔
86
            throw new ClientException(
1✔
87
                'The underlying transaction has been aborted by the server.'
1✔
88
            );
1✔
89
        }
90

91
        if (count($executed) !== count($commands)) {
2✔
92
            $expected = count($commands);
×
93
            $received = count($executed);
×
94

95
            throw new ClientException(
×
96
                "Invalid number of responses [expected $expected, received $received]."
×
97
            );
×
98
        }
99

100
        $responses = [];
2✔
101
        $sizeOfPipe = count($commands);
2✔
102
        $exceptions = $this->throwServerExceptions();
2✔
103
        $protocolVersion = (int) $connection->getParameters()->protocol;
2✔
104

105
        for ($i = 0; $i < $sizeOfPipe; ++$i) {
2✔
106
            $command = $commands->dequeue();
2✔
107
            $response = $executed[$i];
2✔
108

109
            if (!$response instanceof ResponseInterface) {
2✔
110
                if ($protocolVersion === 2) {
×
111
                    $responses[] = $command->parseResponse($response);
×
112
                } else {
113
                    $responses[] = $command->parseResp3Response($response);
×
114
                }
115
            } elseif ($response instanceof ErrorResponseInterface && $exceptions) {
2✔
116
                $this->exception($connection, $response);
×
117
            } else {
118
                $responses[] = $response;
2✔
119
            }
120

121
            unset($executed[$i]);
2✔
122
        }
123

124
        return $responses;
2✔
125
    }
126
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc