• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

leonchen83 / redis-replicator / #2076

09 Dec 2023 02:37AM UTC coverage: 71.561%. Remained the same
#2076

push

Baoyi Chen
test

3 of 3 new or added lines in 2 files covered. (100.0%)

140 existing lines in 4 files now uncovered.

6628 of 9262 relevant lines covered (71.56%)

0.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.56
/src/main/java/com/moilioncircle/redis/replicator/RedisSocketReplicator.java
1
/*
2
 * Copyright 2016-2018 Leon Chen
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package com.moilioncircle.redis.replicator;
18

19
import static com.moilioncircle.redis.replicator.Constants.DOLLAR;
20
import static com.moilioncircle.redis.replicator.Constants.STAR;
21
import static com.moilioncircle.redis.replicator.RedisSocketReplicator.SyncMode.PSYNC;
22
import static com.moilioncircle.redis.replicator.RedisSocketReplicator.SyncMode.SYNC;
23
import static com.moilioncircle.redis.replicator.RedisSocketReplicator.SyncMode.SYNC_LATER;
24
import static com.moilioncircle.redis.replicator.Status.CONNECTED;
25
import static com.moilioncircle.redis.replicator.Status.CONNECTING;
26
import static com.moilioncircle.redis.replicator.Status.DISCONNECTED;
27
import static com.moilioncircle.redis.replicator.Status.DISCONNECTING;
28
import static com.moilioncircle.redis.replicator.cmd.CommandParsers.toInt;
29
import static com.moilioncircle.redis.replicator.util.Strings.format;
30
import static com.moilioncircle.redis.replicator.util.Strings.isEquals;
31
import static com.moilioncircle.redis.replicator.util.Tuples.of;
32
import static java.util.concurrent.TimeUnit.MILLISECONDS;
33

34
import java.io.IOException;
35
import java.io.InputStream;
36
import java.net.Socket;
37
import java.util.Objects;
38
import java.util.concurrent.ScheduledFuture;
39
import java.util.concurrent.atomic.AtomicBoolean;
40

41
import org.slf4j.Logger;
42
import org.slf4j.LoggerFactory;
43

44
import com.moilioncircle.redis.replicator.cmd.BulkReplyHandler;
45
import com.moilioncircle.redis.replicator.cmd.Command;
46
import com.moilioncircle.redis.replicator.cmd.CommandName;
47
import com.moilioncircle.redis.replicator.cmd.CommandParser;
48
import com.moilioncircle.redis.replicator.cmd.RedisCodec;
49
import com.moilioncircle.redis.replicator.cmd.ReplyParser;
50
import com.moilioncircle.redis.replicator.cmd.impl.SelectCommand;
51
import com.moilioncircle.redis.replicator.event.EventListener;
52
import com.moilioncircle.redis.replicator.event.PostCommandSyncEvent;
53
import com.moilioncircle.redis.replicator.event.PreCommandSyncEvent;
54
import com.moilioncircle.redis.replicator.io.AsyncBufferedInputStream;
55
import com.moilioncircle.redis.replicator.io.RateLimitInputStream;
56
import com.moilioncircle.redis.replicator.io.RedisInputStream;
57
import com.moilioncircle.redis.replicator.io.RedisOutputStream;
58
import com.moilioncircle.redis.replicator.net.RedisSocketFactory;
59
import com.moilioncircle.redis.replicator.rdb.RdbParser;
60
import com.moilioncircle.redis.replicator.util.Strings;
61
import com.moilioncircle.redis.replicator.util.XScheduledExecutorService;
62

63
/**
64
 * @author Leon Chen
65
 * @since 2.1.0
66
 */
67
public class RedisSocketReplicator extends AbstractReplicator {
68
    
69
    protected static final Logger logger = LoggerFactory.getLogger(RedisSocketReplicator.class);
1✔
70
    
71
    protected int db = -1;
1✔
72
    protected Socket socket;
73
    protected ReplyParser replyParser;
74
    protected ScheduledFuture<?> heartbeat;
75
    protected RedisOutputStream outputStream;
76
    protected XScheduledExecutorService executor;
77
    
78
    //
79
    protected final int port;
80
    protected final String host;
81
    protected final ReplFilter[] replFilters;
82
    protected final RedisSocketFactory socketFactory;
83
    protected final AtomicBoolean manual = new AtomicBoolean(false);
1✔
84
    
85
    public RedisSocketReplicator(String host, int port, Configuration configuration) {
1✔
86
        Objects.requireNonNull(host);
1✔
87
        if (port <= 0 || port > 65535) throw new IllegalArgumentException("illegal argument port: " + port);
1✔
88
        Objects.requireNonNull(configuration);
1✔
89
        this.host = host;
1✔
90
        this.port = port;
1✔
91
        this.configuration = configuration;
1✔
92
        this.socketFactory = new RedisSocketFactory(configuration);
1✔
93
        builtInCommandParserRegister();
1✔
94
        if (configuration.isUseDefaultExceptionListener())
1✔
95
            addExceptionListener(new DefaultExceptionListener());
1✔
96
        this.replFilters = configuration.getReplFilters();
1✔
97
        if (this.replFilters != null) {
1✔
98
            for (int i = 0; i < replFilters.length; i++) {
1✔
UNCOV
99
                replFilters[i] = new InitializedReplFilter(replFilters[i], this);
×
100
            }
101
        }
102
    }
1✔
103

104
    public String getHost() {
105
        return this.host;
×
106
    }
107

108
    public int getPort() {
UNCOV
109
        return this.port;
×
110
    }
111
    
112
    /**
113
     * PSYNC
114
     * <p>
115
     *
116
     * @throws IOException when read timeout or connect timeout
117
     */
118
    @Override
119
    public void open() throws IOException {
120
        manual.compareAndSet(true, false);
1✔
121
        this.executor = new XScheduledExecutorService(configuration);
1✔
122
        try {
123
            new RedisSocketReplicatorRetrier().retry(this);
1✔
124
        } finally {
125
            doClose();
1✔
126
            doCloseListener(this);
1✔
127
            this.executor.terminateQuietly(configuration.getConnectionTimeout(), MILLISECONDS);
1✔
128
        }
129
    }
1✔
130
    
131
    @Override
132
    public void close() throws IOException {
133
        super.close();
1✔
134
        manual.compareAndSet(false, true);
1✔
135
    }
1✔
136
    
137
    protected SyncMode trySync(final String reply) throws IOException {
138
        logger.info(reply);
1✔
139
        if (reply.startsWith("FULLRESYNC")) {
1✔
140
            // reset db
141
            this.db = -1;
1✔
142
            parseDump(this);
1✔
143
            String[] ary = reply.split(" ");
1✔
144
            configuration.setReplId(ary[1]);
1✔
145
            configuration.setReplOffset(Long.parseLong(ary[2]));
1✔
146
            return PSYNC;
1✔
147
        } else if (reply.startsWith("CONTINUE")) {
1✔
148
            String[] ary = reply.split(" ");
1✔
149
            // redis-4.0 compatible
150
            String replId = configuration.getReplId();
1✔
151
            if (ary.length > 1 && replId != null && !replId.equals(ary[1])) configuration.setReplId(ary[1]);
1✔
152
            return PSYNC;
1✔
153
        } else if (reply.startsWith("NOMASTERLINK") || reply.startsWith("LOADING")) {
×
154
            return SYNC_LATER;
×
155
        } else {
UNCOV
156
            logger.info("SYNC");
×
UNCOV
157
            send("SYNC".getBytes());
×
158
            // reset db
UNCOV
159
            this.db = -1;
×
UNCOV
160
            parseDump(this);
×
UNCOV
161
            return SYNC;
×
162
        }
163
    }
164
    
165
    protected void parseDump(final AbstractReplicator replicator) throws IOException {
166
        byte[] rawReply = reply(new BulkReplyHandler() {
1✔
167
            @Override
168
            public byte[] handle(long len, RedisInputStream in) throws IOException {
169
                if (len != -1) {
1✔
170
                    logger.info("RDB dump file size:{}", len);
1✔
171
                } else {
UNCOV
172
                    logger.info("Disk-less replication.");
×
173
                }
174
                if (len != -1 && configuration.isDiscardRdbEvent()) {
1✔
UNCOV
175
                    logger.info("discard {} bytes", len);
×
UNCOV
176
                    in.skip(len);
×
177
                } else {
178
                    new RdbParser(in, replicator).parse();
1✔
179
                    // skip 40 bytes delimiter when disk-less replication
180
                    if (len == -1) in.skip(40, false);
1✔
181
                }
182
                return "OK".getBytes();
1✔
183
            }
184
        });
185
        String reply = Strings.toString(rawReply);
1✔
186
        if (Objects.equals(reply, "OK")) return;
1✔
UNCOV
187
        throw new IOException("SYNC failed. reason : [" + reply + "]");
×
188
    }
189
    
190
    protected void establishConnection() throws IOException {
191
        connect();
1✔
192
        if (configuration.getAuthPassword() != null) auth(configuration.getAuthUser(), configuration.getAuthPassword());
1✔
193
        sendPing();
1✔
194
        sendSlavePort();
1✔
195
        sendSlaveIp();
1✔
196
        sendSlaveCapa("eof");
1✔
197
        sendSlaveCapa("psync2");
1✔
198
        if (this.replFilters != null) {
1✔
199
            for (ReplFilter filter : this.replFilters) {
1✔
UNCOV
200
                sendSlaveFilter(filter);
×
201
            }
202
        }
203
    }
1✔
204
    
205
    protected void auth(String user, String password) throws IOException {
206
        if (password != null) {
1✔
207
            // sha256 mask password
208
            String mask = "#" + Strings.mask(password);
1✔
209
            if (user == null) {
1✔
210
                logger.info("AUTH {}", mask);
1✔
211
                send("AUTH".getBytes(), password.getBytes());
1✔
212
            } else {
213
                logger.info("AUTH {} {}", user, mask);
×
214
                send("AUTH".getBytes(), user.getBytes(), password.getBytes());
×
215
            }
216
            final String reply = Strings.toString(reply());
1✔
217
            logger.info(reply);
1✔
218
            if (Objects.equals(reply, "OK")) return;
1✔
219
            if (reply.contains("no password") || reply.contains("without any password")) {
×
UNCOV
220
                if (user == null) {
×
221
                    logger.warn("[AUTH {}] failed. {}", mask, reply);
×
222
                } else {
UNCOV
223
                    logger.warn("[AUTH {} {}] failed. {}", user, mask, reply);
×
224
                }
UNCOV
225
                return;
×
226
            }
227
            if (user == null) {
×
UNCOV
228
                throw new AssertionError("[AUTH " + mask + "] failed. " + reply);
×
229
            } else {
UNCOV
230
                throw new AssertionError("[AUTH " + user + " " + mask + "] failed. " + reply);
×
231
            }
232
        }
UNCOV
233
    }
×
234
    
235
    protected void sendPing() throws IOException {
236
        logger.info("PING");
1✔
237
        send("PING".getBytes());
1✔
238
        final String reply = Strings.toString(reply());
1✔
239
        logger.info(reply);
1✔
240
        if ("PONG".equalsIgnoreCase(reply)) return;
1✔
UNCOV
241
        if (reply.contains("NOAUTH")) throw new AssertionError(reply);
×
UNCOV
242
        if (reply.contains("NOPERM")) throw new AssertionError(reply);
×
UNCOV
243
        if (reply.contains("operation not permitted")) throw new AssertionError("-NOAUTH Authentication required.");
×
UNCOV
244
        logger.warn("[PING] failed. {}", reply);
×
UNCOV
245
    }
×
246
    
247
    protected void sendSlavePort() throws IOException {
248
        // REPLCONF listening-prot ${port}
249
        logger.info("REPLCONF listening-port {}", socket.getLocalPort());
1✔
250
        send("REPLCONF".getBytes(), "listening-port".getBytes(), String.valueOf(socket.getLocalPort()).getBytes());
1✔
251
        final String reply = Strings.toString(reply());
1✔
252
        logger.info(reply);
1✔
253
        if (Objects.equals(reply, "OK")) return;
1✔
UNCOV
254
        logger.warn("[REPLCONF listening-port {}] failed. {}", socket.getLocalPort(), reply);
×
UNCOV
255
    }
×
256
    
257
    protected void sendSlaveIp() throws IOException {
258
        // REPLCONF ip-address ${address}
259
        logger.info("REPLCONF ip-address {}", socket.getLocalAddress().getHostAddress());
1✔
260
        send("REPLCONF".getBytes(), "ip-address".getBytes(), socket.getLocalAddress().getHostAddress().getBytes());
1✔
261
        final String reply = Strings.toString(reply());
1✔
262
        logger.info(reply);
1✔
263
        if (Objects.equals(reply, "OK")) return;
1✔
264
        //redis 3.2+
UNCOV
265
        logger.warn("[REPLCONF ip-address {}] failed. {}", socket.getLocalAddress().getHostAddress(), reply);
×
UNCOV
266
    }
×
267
    
268
    protected void sendSlaveCapa(String cmd) throws IOException {
269
        // REPLCONF capa eof
270
        logger.info("REPLCONF capa {}", cmd);
1✔
271
        send("REPLCONF".getBytes(), "capa".getBytes(), cmd.getBytes());
1✔
272
        final String reply = Strings.toString(reply());
1✔
273
        logger.info(reply);
1✔
274
        if (Objects.equals(reply, "OK")) return;
1✔
275
        logger.warn("[REPLCONF capa {}] failed. {}", cmd, reply);
×
276
    }
×
277
    
278
    protected void sendSlaveFilter(ReplFilter filter) throws IOException {
UNCOV
279
        String[] command = filter.command();
×
280
        String info = String.join(" ", command);
×
281
        logger.info(info);
×
282
        byte[][] args = new byte[command.length - 1][];
×
283
        for (int i = 1, j = 0; i < command.length; i++) {
×
284
            args[j++] = command[i].getBytes();
×
285
        }
286
        send(command[0].getBytes(), args);
×
287
        final String reply = Strings.toString(reply());
×
UNCOV
288
        logger.info(reply);
×
289
        if (Objects.equals(reply, "OK")) {
×
UNCOV
290
            EventListener listener = filter.listener(this);
×
291
            if (listener != null) {
×
292
                this.removeEventListener(listener);
×
UNCOV
293
                this.addEventListener(listener);
×
294
            }
UNCOV
295
            return;
×
296
        }
UNCOV
297
        logger.warn("[{}] failed. {}", info, reply);
×
UNCOV
298
    }
×
299
    
300
    protected void heartbeat() {
301
        assert heartbeat == null || heartbeat.isCancelled();
1✔
302
        heartbeat = executor.scheduleWithFixedDelay(new Runnable() {
1✔
303
            @Override
304
            public void run() {
305
                sendQuietly("REPLCONF".getBytes(), "ACK".getBytes(), String.valueOf(configuration.getReplOffset()).getBytes());
1✔
306
            }
1✔
307
        }, configuration.getHeartbeatPeriod(), configuration.getHeartbeatPeriod(), MILLISECONDS);
1✔
308
        logger.info("heartbeat started.");
1✔
309
    }
1✔
310
    
311
    protected void send(byte[] command) throws IOException {
312
        send(command, new byte[0][]);
1✔
313
    }
1✔
314
    
315
    protected void send(byte[] command, final byte[]... args) throws IOException {
316
        outputStream.write(STAR);
1✔
317
        outputStream.write(String.valueOf(args.length + 1).getBytes());
1✔
318
        outputStream.writeCrLf();
1✔
319
        outputStream.write(DOLLAR);
1✔
320
        outputStream.write(String.valueOf(command.length).getBytes());
1✔
321
        outputStream.writeCrLf();
1✔
322
        outputStream.write(command);
1✔
323
        outputStream.writeCrLf();
1✔
324
        for (final byte[] arg : args) {
1✔
325
            outputStream.write(DOLLAR);
1✔
326
            outputStream.write(String.valueOf(arg.length).getBytes());
1✔
327
            outputStream.writeCrLf();
1✔
328
            outputStream.write(arg);
1✔
329
            outputStream.writeCrLf();
1✔
330
        }
331
        outputStream.flush();
1✔
332
    }
1✔
333
    
334
    protected void sendQuietly(byte[] command, final byte[]... args) {
335
        try {
336
            send(command, args);
1✔
UNCOV
337
        } catch (IOException e) {
×
338
            // NOP
339
        }
1✔
340
    }
1✔
341
    
342
    @SuppressWarnings("unchecked")
343
    protected <T> T reply() throws IOException {
344
        return (T) replyParser.parse();
1✔
345
    }
346
    
347
    @SuppressWarnings("unchecked")
348
    protected <T> T reply(BulkReplyHandler handler) throws IOException {
349
        return (T) replyParser.parse(handler);
1✔
350
    }
351
    
352
    protected void connect() throws IOException {
353
        if (!compareAndSet(DISCONNECTED, CONNECTING)) return;
1✔
354
        try {
355
            socket = socketFactory.createSocket(host, port, configuration.getConnectionTimeout());
1✔
356
            outputStream = new RedisOutputStream(socket.getOutputStream());
1✔
357
            InputStream inputStream = socket.getInputStream();
1✔
358
            if (configuration.getAsyncCachedBytes() > 0) {
1✔
359
                inputStream = new AsyncBufferedInputStream(inputStream, configuration.getAsyncCachedBytes());
1✔
360
            }
361
            if (configuration.getRateLimit() > 0) {
1✔
UNCOV
362
                inputStream = new RateLimitInputStream(inputStream, configuration.getRateLimit());
×
363
            }
364
            this.inputStream = new RedisInputStream(inputStream, configuration.getBufferSize());
1✔
365
            this.inputStream.setRawByteListeners(this.rawByteListeners);
1✔
366
            replyParser = new ReplyParser(this.inputStream, new RedisCodec());
1✔
367
            logger.info("connected to redis-server[{}:{}]", host, port);
1✔
368
        } finally {
369
            setStatus(CONNECTED);
1✔
370
        }
371
    }
1✔
372
    
373
    @Override
374
    protected void doClose() throws IOException {
375
        compareAndSet(CONNECTED, DISCONNECTING);
1✔
376
        
377
        try {
378
            if (heartbeat != null) {
1✔
379
                if (!heartbeat.isCancelled()) heartbeat.cancel(true);
1✔
380
                logger.info("heartbeat canceled.");
1✔
381
            }
382
            
383
            try {
384
                if (inputStream != null) {
1✔
385
                    inputStream.setRawByteListeners(null);
1✔
386
                    inputStream.close();
1✔
387
                }
UNCOV
388
            } catch (IOException e) {
×
389
                // NOP
390
            }
1✔
391
            try {
392
                if (outputStream != null) outputStream.close();
1✔
UNCOV
393
            } catch (IOException e) {
×
394
                // NOP
395
            }
1✔
396
            try {
397
                if (socket != null && !socket.isClosed()) socket.close();
1✔
UNCOV
398
            } catch (IOException e) {
×
399
                // NOP
400
            }
1✔
401
            logger.info("socket closed. redis-server[{}:{}]", host, port);
1✔
402
        } finally {
403
            setStatus(DISCONNECTED);
1✔
404
        }
405
    }
1✔
406
    
407
    protected enum SyncMode {SYNC, PSYNC, SYNC_LATER}
1✔
408
    
409
    private class RedisSocketReplicatorRetrier extends AbstractReplicatorRetrier {
1✔
410
        
411
        @Override
412
        protected boolean connect() throws IOException {
413
            establishConnection();
1✔
414
            return true;
1✔
415
        }
416
        
417
        @Override
418
        protected boolean close(IOException reason) throws IOException {
419
            if (reason != null)
1✔
420
                logger.error("[redis-replicator] socket error. redis-server[{}:{}]", host, port, reason);
1✔
421
            doClose();
1✔
422
            if (reason != null)
1✔
423
                logger.info("reconnecting to redis-server[{}:{}]. retry times:{}", host, port, (retries + 1));
1✔
424
            return true;
1✔
425
        }
426

427
        @Override
428
        protected boolean isManualClosed() {
429
            return manual.get();
1✔
430
        }
431

432
        @Override
433
        protected boolean open() throws IOException {
434
            String replId = configuration.getReplId();
1✔
435
            long replOffset = configuration.getReplOffset();
1✔
436
            logger.info("PSYNC {} {}", replId, String.valueOf(replOffset >= 0 ? replOffset + 1 : replOffset));
1✔
437
            send("PSYNC".getBytes(), replId.getBytes(), String.valueOf(replOffset >= 0 ? replOffset + 1 : replOffset).getBytes());
1✔
438
            final String reply = Strings.toString(reply());
1✔
439
            
440
            SyncMode mode = trySync(reply);
1✔
441
            if (mode == PSYNC && getStatus() == CONNECTED) {
1✔
442
                heartbeat();
1✔
443
            } else if (mode == SYNC_LATER && getStatus() == CONNECTED) {
1✔
UNCOV
444
                return false;
×
445
            }
446
            if (getStatus() != CONNECTED) return true;
1✔
447
            submitEvent(new PreCommandSyncEvent());
1✔
448
            if (db != -1) {
1✔
449
                submitEvent(new SelectCommand(db));
1✔
450
            }
451
            final long[] offset = new long[1];
1✔
452
            while (getStatus() == CONNECTED) {
1✔
453
                Object obj = replyParser.parse(len -> offset[0] = len);
1✔
454
                if (obj instanceof Object[]) {
1✔
455
                    if (verbose() && logger.isDebugEnabled())
1✔
456
                        logger.debug(format((Object[]) obj));
1✔
457
                    Object[] raw = (Object[]) obj;
1✔
458
                    CommandName name = CommandName.name(Strings.toString(raw[0]));
1✔
459
                    final CommandParser<? extends Command> parser;
460
                    if ((parser = commands.get(name)) == null) {
1✔
UNCOV
461
                        logger.warn("command [{}] not register. raw command:{}", name, format(raw));
×
UNCOV
462
                        configuration.addOffset(offset[0]);
×
UNCOV
463
                        offset[0] = 0L;
×
UNCOV
464
                        continue;
×
465
                    }
466
                    final long st = configuration.getReplOffset();
1✔
467
                    final long ed = st + offset[0];
1✔
468
                    if (isEquals(Strings.toString(raw[0]), "SELECT")) {
1✔
469
                        db = toInt(raw[1]);
1✔
470
                        submitEvent(parser.parse(raw), of(st, ed));
1✔
471
                    } else if (isEquals(Strings.toString(raw[0]), "REPLCONF") && isEquals(Strings.toString(raw[1]), "GETACK")) {
1✔
UNCOV
472
                        if (mode == PSYNC) executor.execute(new Runnable() {
×
473
                            @Override
474
                            public void run() {
UNCOV
475
                                sendQuietly("REPLCONF".getBytes(), "ACK".getBytes(), String.valueOf(configuration.getReplOffset()).getBytes());
×
UNCOV
476
                            }
×
477
                        });
478
                    } else {
479
                        // include ping command
480
                        submitEvent(parser.parse(raw), of(st, ed));
1✔
481
                    }
482
                } else {
1✔
UNCOV
483
                    logger.warn("unexpected redis reply:{}", obj);
×
484
                }
485
                configuration.addOffset(offset[0]);
1✔
486
                offset[0] = 0L;
1✔
487
            }
1✔
488
            if (getStatus() == CONNECTED) {
1✔
489
                // should not reach here. add this line for code idempotent.
UNCOV
490
                submitEvent(new PostCommandSyncEvent());
×
491
            }
492
            return true;
1✔
493
        }
494
    }
495
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc