• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Moln / php-mysql-replication / 3882357544

pending completion
3882357544

Pull #8

github

GitHub
Merge 51519564d into a3462d9d4
Pull Request #8: fix: Deprecation with PHP 8.1. Add git action check of PHP8.2

122 of 122 new or added lines in 10 files covered. (100.0%)

1252 of 1548 relevant lines covered (80.88%)

27.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.88
/src/MySQLReplication/BinLog/BinLogSocketConnect.php
1
<?php
2
declare(strict_types=1);
3

4
namespace MySQLReplication\BinLog;
5

6
use MySQLReplication\BinaryDataReader\BinaryDataReader;
7
use MySQLReplication\Config\Config;
8
use MySQLReplication\Gtid\GtidCollection;
9
use MySQLReplication\Gtid\GtidException;
10
use MySQLReplication\Repository\RepositoryInterface;
11
use MySQLReplication\Socket\SocketException;
12
use MySQLReplication\Socket\SocketInterface;
13

14
class BinLogSocketConnect
15
{
16
    private const COM_BINLOG_DUMP = 0x12;
17
    private const COM_REGISTER_SLAVE = 0x15;
18
    private const COM_BINLOG_DUMP_GTID = 0x1e;
19

20
    /**
21
     * http://dev.mysql.com/doc/internals/en/auth-phase-fast-path.html 00 FE
22
     */
23
    private $packageOkHeader = [0, 254];
24
    private $binaryDataMaxLength = 16777215;
25
    private $checkSum = false;
26

27
    private $repository;
28
    private $socket;
29
    private $binLogCurrent;
30

31
    /**
32
     * @var Config
33
     */
34
    private $config;
35
    /**
36
     * @var BinLogServerInfo
37
     */
38
    private $binLogServerInfo;
39

40
    /**
41
     * @throws BinLogException
42
     * @throws GtidException
43
     * @throws SocketException
44
     */
45
    public function __construct(
46
        Config $config,
47
        RepositoryInterface $repository,
48
        SocketInterface $socket
49
    ) {
50
        $this->config = $config;
62✔
51
        $this->repository = $repository;
62✔
52
        $this->socket = $socket;
62✔
53
        $this->binLogCurrent = new BinLogCurrent();
62✔
54
    }
55

56
    public function isConnected(): bool
57
    {
58
        return $this->socket->isConnected();
×
59
    }
60

61
    public function connect(): void
62
    {
63
        $this->socket->connectToStream($this->config->getHost(), $this->config->getPort());
62✔
64
        $this->binLogServerInfo = BinLogServerInfo::parsePackage(
62✔
65
            $this->getResponse(false),
62✔
66
            $this->repository->getVersion()
62✔
67
        );
62✔
68
        $this->authenticate();
62✔
69
        $this->getBinlogStream();
62✔
70
    }
71

72
    public function getBinLogServerInfo(): BinLogServerInfo
73
    {
74
        return $this->binLogServerInfo;
62✔
75
    }
76

77
    /**
78
     * @throws BinLogException
79
     * @throws SocketException
80
     */
81
    public function getResponse(bool $checkResponse = true): string
82
    {
83
        $header = $this->socket->readFromSocket(4);
62✔
84
        if ('' === $header) {
62✔
85
            return '';
×
86
        }
87
        $dataLength = unpack('L', $header[0] . $header[1] . $header[2] . chr(0))[1];
62✔
88
        $isMaxDataLength = $dataLength === $this->binaryDataMaxLength;
62✔
89

90
        $result = $this->socket->readFromSocket($dataLength);
62✔
91
        if (true === $checkResponse) {
62✔
92
            $this->isWriteSuccessful($result);
62✔
93
        }
94

95
        // https://dev.mysql.com/doc/internals/en/sending-more-than-16mbyte.html
96
        while ($isMaxDataLength) {
62✔
97
            $header = $this->socket->readFromSocket(4);
×
98
            if ('' === $header) {
×
99
                return $result;
×
100
            }
101
            $dataLength = unpack('L', $header[0] . $header[1] . $header[2] . chr(0))[1];
×
102
            $isMaxDataLength = $dataLength === $this->binaryDataMaxLength;
×
103
            $nextResult = $this->socket->readFromSocket($dataLength);
×
104
            $result .= $nextResult;
×
105
        }
106

107
        return $result;
62✔
108
    }
109

110
    /**
111
     * @throws BinLogException
112
     */
113
    private function isWriteSuccessful(string $data): void
114
    {
115
        $head = ord($data[0]);
62✔
116
        if (! in_array($head, $this->packageOkHeader, true)) {
62✔
117
            $errorCode = unpack('v', $data[1] . $data[2])[1];
×
118
            $errorMessage = '';
×
119
            $packetLength = strlen($data);
×
120
            for ($i = 9; $i < $packetLength; ++$i) {
×
121
                $errorMessage .= $data[$i];
×
122
            }
123

124
            throw new BinLogException($errorMessage, $errorCode);
×
125
        }
126
    }
127

128
    /**
129
     * @throws BinLogException
130
     * @throws SocketException
131
     * @link http://dev.mysql.com/doc/internals/en/secure-password-authentication.html#packet-Authentication::Native41
132
     */
133
    private function authenticate(): void
134
    {
135
        $data = pack('L', self::getCapabilities());
62✔
136
        $data .= pack('L', $this->binaryDataMaxLength);
62✔
137
        $data .= chr(33);
62✔
138
        for ($i = 0; $i < 23; ++$i) {
62✔
139
            $data .= chr(0);
62✔
140
        }
141
        $result = sha1($this->config->getPassword(), true) ^ sha1(
62✔
142
            $this->binLogServerInfo->getSalt() . sha1(sha1($this->config->getPassword(), true), true),
62✔
143
            true
62✔
144
        );
62✔
145

146
        $data = $data . $this->config->getUser() . chr(0) . chr(strlen($result)) . $result;
62✔
147
        $str = pack('L', strlen($data));
62✔
148
        $s = $str[0] . $str[1] . $str[2];
62✔
149
        $data = $s . chr(1) . $data;
62✔
150

151
        $this->socket->writeToSocket($data);
62✔
152
        $this->getResponse();
62✔
153
    }
154

155
    /**
156
     * http://dev.mysql.com/doc/internals/en/capability-flags.html#packet-protocol::capabilityflags
157
     * https://github.com/siddontang/mixer/blob/master/doc/protocol.txt
158
     */
159
    private static function getCapabilities(): int
160
    {
161
        $noSchema = 1 << 4;
62✔
162
        $longPassword = 1;
62✔
163
        $longFlag = 1 << 2;
62✔
164
        $transactions = 1 << 13;
62✔
165
        $secureConnection = 1 << 15;
62✔
166
        $protocol41 = 1 << 9;
62✔
167

168
        return ($longPassword | $longFlag | $transactions | $protocol41 | $secureConnection | $noSchema);
62✔
169
    }
170

171
    /**
172
     * @throws BinLogException
173
     * @throws GtidException
174
     * @throws SocketException
175
     */
176
    private function getBinlogStream(): void
177
    {
178
        $this->checkSum = $this->repository->isCheckSum();
62✔
179
        if ($this->checkSum) {
62✔
180
            $this->execute('SET @master_binlog_checksum = @@global.binlog_checksum');
62✔
181
        }
182

183
        if (0 !== $this->config->getHeartbeatPeriod()) {
62✔
184
            // master_heartbeat_period is in nanoseconds
185
            $this->execute('SET @master_heartbeat_period = ' . $this->config->getHeartbeatPeriod() * 1000000000);
62✔
186
        }
187

188
        $this->registerSlave();
62✔
189

190
        if ('' !== $this->config->getMariaDbGtid()) {
62✔
191
            $this->setBinLogDumpMariaGtid();
×
192
        }
193
        if ('' !== $this->config->getGtid()) {
62✔
194
            $this->setBinLogDumpGtid();
×
195
        } else {
196
            $this->setBinLogDump();
62✔
197
        }
198
    }
199

200
    /**
201
     * @throws BinLogException
202
     * @throws SocketException
203
     */
204
    private function execute(string $sql): void
205
    {
206
        $this->socket->writeToSocket(pack('LC', strlen($sql) + 1, 0x03) . $sql);
62✔
207
        $this->getResponse();
62✔
208
    }
209

210
    /**
211
     * @see https://dev.mysql.com/doc/internals/en/com-register-slave.html
212
     * @throws BinLogException
213
     * @throws SocketException
214
     */
215
    private function registerSlave(): void
216
    {
217
        $host = gethostname();
62✔
218
        $hostLength = strlen($host);
62✔
219
        $userLength = strlen($this->config->getUser());
62✔
220
        $passLength = strlen($this->config->getPassword());
62✔
221

222
        $data = pack('l', 18 + $hostLength + $userLength + $passLength);
62✔
223
        $data .= chr(self::COM_REGISTER_SLAVE);
62✔
224
        $data .= pack('V', $this->config->getSlaveId());
62✔
225
        $data .= pack('C', $hostLength);
62✔
226
        $data .= $host;
62✔
227
        $data .= pack('C', $userLength);
62✔
228
        $data .= $this->config->getUser();
62✔
229
        $data .= pack('C', $passLength);
62✔
230
        $data .= $this->config->getPassword();
62✔
231
        $data .= pack('v', $this->config->getPort());
62✔
232
        $data .= pack('V', 0);
62✔
233
        $data .= pack('V', 0);
62✔
234

235
        $this->socket->writeToSocket($data);
62✔
236
        $this->getResponse();
62✔
237
    }
238

239
    /**
240
     * @throws SocketException
241
     * @throws BinLogException
242
     */
243
    private function setBinLogDumpMariaGtid(): void
244
    {
245
        $this->execute('SET @mariadb_slave_capability = 4');
×
246
        $this->execute('SET @slave_connect_state = \'' . $this->config->getMariaDbGtid() . '\'');
×
247
        $this->execute('SET @slave_gtid_strict_mode = 0');
×
248
        $this->execute('SET @slave_gtid_ignore_duplicates = 0');
×
249

250
        $this->binLogCurrent->setMariaDbGtid($this->config->getMariaDbGtid());
×
251
    }
252

253
    /**
254
     * @see https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html
255
     * @throws BinLogException
256
     * @throws GtidException
257
     * @throws SocketException
258
     */
259
    private function setBinLogDumpGtid(): void
260
    {
261
        $collection = GtidCollection::makeCollectionFromString($this->config->getGtid());
×
262

263
        $data = pack('l', 26 + $collection->getEncodedLength()) . chr(self::COM_BINLOG_DUMP_GTID);
×
264
        $data .= pack('S', 0);
×
265
        $data .= pack('I', $this->config->getSlaveId());
×
266
        $data .= pack('I', 3);
×
267
        $data .= chr(0);
×
268
        $data .= chr(0);
×
269
        $data .= chr(0);
×
270
        $data .= BinaryDataReader::pack64bit(4);
×
271
        $data .= pack('I', $collection->getEncodedLength());
×
272
        $data .= $collection->getEncoded();
×
273

274
        $this->socket->writeToSocket($data);
×
275
        $this->getResponse();
×
276

277
        $this->binLogCurrent->setGtid($this->config->getGtid());
×
278
    }
279

280
    /**
281
     * @see https://dev.mysql.com/doc/internals/en/com-binlog-dump.html
282
     * @throws BinLogException
283
     * @throws SocketException
284
     */
285
    private function setBinLogDump(): void
286
    {
287
        $binLogCurrent = $this->binLogCurrent;
62✔
288
        $binFilePos = $binLogCurrent->getBinLogPosition() ?: $this->config->getBinLogPosition();
62✔
289
        $binFileName = $binLogCurrent->getBinFileName() ?: $this->config->getBinLogFileName();
62✔
290
        if (0 === $binFilePos && '' === $binFileName) {
62✔
291
            $masterStatusDTO = $this->repository->getMasterStatus();
62✔
292
            $binFilePos = $masterStatusDTO->getPosition();
62✔
293
            $binFileName = $masterStatusDTO->getFile();
62✔
294
        }
295

296
        $data = pack('i', strlen($binFileName) + 11) . chr(self::COM_BINLOG_DUMP);
62✔
297
        $data .= pack('I', $binFilePos);
62✔
298
        $data .= pack('v', 0);
62✔
299
        $data .= pack('I', $this->config->getSlaveId());
62✔
300
        $data .= $binFileName;
62✔
301

302
        $this->socket->writeToSocket($data);
62✔
303
        $this->getResponse();
62✔
304

305
        $binLogCurrent->setBinLogPosition($binFilePos);
62✔
306
        $binLogCurrent->setBinFileName($binFileName);
62✔
307
    }
308

309
    public function getBinLogCurrent(): BinLogCurrent
310
    {
311
        return $this->binLogCurrent;
62✔
312
    }
313

314
    public function getCheckSum(): bool
315
    {
316
        return $this->checkSum;
62✔
317
    }
318
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc