• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

leonchen83 / redis-replicator / #2570

11 Oct 2025 04:55AM UTC coverage: 72.201% (-0.005%) from 72.206%
#2570

push

chenby
redis-8.2

7241 of 10029 relevant lines covered (72.2%)

0.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.2
/src/main/java/com/moilioncircle/redis/replicator/AbstractReplicator.java
1
/*
2
 * Copyright 2016-2018 Leon Chen
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package com.moilioncircle.redis.replicator;
18

19
import static com.moilioncircle.redis.replicator.Status.CONNECTED;
20
import static com.moilioncircle.redis.replicator.Status.DISCONNECTED;
21
import static com.moilioncircle.redis.replicator.Status.DISCONNECTING;
22
import static com.moilioncircle.redis.replicator.util.Tuples.of;
23

24
import java.io.EOFException;
25
import java.io.IOException;
26
import java.io.UncheckedIOException;
27
import java.util.Map;
28
import java.util.concurrent.ConcurrentHashMap;
29
import java.util.concurrent.atomic.AtomicReference;
30

31
import com.moilioncircle.redis.replicator.cmd.Command;
32
import com.moilioncircle.redis.replicator.cmd.CommandName;
33
import com.moilioncircle.redis.replicator.cmd.CommandParser;
34
import com.moilioncircle.redis.replicator.cmd.parser.AppendParser;
35
import com.moilioncircle.redis.replicator.cmd.parser.BLMoveParser;
36
import com.moilioncircle.redis.replicator.cmd.parser.BRPopLPushParser;
37
import com.moilioncircle.redis.replicator.cmd.parser.BitFieldParser;
38
import com.moilioncircle.redis.replicator.cmd.parser.BitOpParser;
39
import com.moilioncircle.redis.replicator.cmd.parser.CopyParser;
40
import com.moilioncircle.redis.replicator.cmd.parser.DecrByParser;
41
import com.moilioncircle.redis.replicator.cmd.parser.DecrParser;
42
import com.moilioncircle.redis.replicator.cmd.parser.DelParser;
43
import com.moilioncircle.redis.replicator.cmd.parser.EvalParser;
44
import com.moilioncircle.redis.replicator.cmd.parser.EvalShaParser;
45
import com.moilioncircle.redis.replicator.cmd.parser.ExecParser;
46
import com.moilioncircle.redis.replicator.cmd.parser.ExpireAtParser;
47
import com.moilioncircle.redis.replicator.cmd.parser.ExpireParser;
48
import com.moilioncircle.redis.replicator.cmd.parser.FlushAllParser;
49
import com.moilioncircle.redis.replicator.cmd.parser.FlushDBParser;
50
import com.moilioncircle.redis.replicator.cmd.parser.FunctionParser;
51
import com.moilioncircle.redis.replicator.cmd.parser.GeoAddParser;
52
import com.moilioncircle.redis.replicator.cmd.parser.GeoSearchStoreParser;
53
import com.moilioncircle.redis.replicator.cmd.parser.GetSetParser;
54
import com.moilioncircle.redis.replicator.cmd.parser.HDelParser;
55
import com.moilioncircle.redis.replicator.cmd.parser.HIncrByParser;
56
import com.moilioncircle.redis.replicator.cmd.parser.HMSetParser;
57
import com.moilioncircle.redis.replicator.cmd.parser.HPExpireAtParser;
58
import com.moilioncircle.redis.replicator.cmd.parser.HPersistParser;
59
import com.moilioncircle.redis.replicator.cmd.parser.HSetExParser;
60
import com.moilioncircle.redis.replicator.cmd.parser.HSetNxParser;
61
import com.moilioncircle.redis.replicator.cmd.parser.HSetParser;
62
import com.moilioncircle.redis.replicator.cmd.parser.IncrByParser;
63
import com.moilioncircle.redis.replicator.cmd.parser.IncrParser;
64
import com.moilioncircle.redis.replicator.cmd.parser.LInsertParser;
65
import com.moilioncircle.redis.replicator.cmd.parser.LMoveParser;
66
import com.moilioncircle.redis.replicator.cmd.parser.LPopParser;
67
import com.moilioncircle.redis.replicator.cmd.parser.LPushParser;
68
import com.moilioncircle.redis.replicator.cmd.parser.LPushXParser;
69
import com.moilioncircle.redis.replicator.cmd.parser.LRemParser;
70
import com.moilioncircle.redis.replicator.cmd.parser.LSetParser;
71
import com.moilioncircle.redis.replicator.cmd.parser.LTrimParser;
72
import com.moilioncircle.redis.replicator.cmd.parser.MSetNxParser;
73
import com.moilioncircle.redis.replicator.cmd.parser.MSetParser;
74
import com.moilioncircle.redis.replicator.cmd.parser.MoveParser;
75
import com.moilioncircle.redis.replicator.cmd.parser.MultiParser;
76
import com.moilioncircle.redis.replicator.cmd.parser.PExpireAtParser;
77
import com.moilioncircle.redis.replicator.cmd.parser.PExpireParser;
78
import com.moilioncircle.redis.replicator.cmd.parser.PFAddParser;
79
import com.moilioncircle.redis.replicator.cmd.parser.PFCountParser;
80
import com.moilioncircle.redis.replicator.cmd.parser.PFMergeParser;
81
import com.moilioncircle.redis.replicator.cmd.parser.PSetExParser;
82
import com.moilioncircle.redis.replicator.cmd.parser.PersistParser;
83
import com.moilioncircle.redis.replicator.cmd.parser.PingParser;
84
import com.moilioncircle.redis.replicator.cmd.parser.PublishParser;
85
import com.moilioncircle.redis.replicator.cmd.parser.RPopLPushParser;
86
import com.moilioncircle.redis.replicator.cmd.parser.RPopParser;
87
import com.moilioncircle.redis.replicator.cmd.parser.RPushParser;
88
import com.moilioncircle.redis.replicator.cmd.parser.RPushXParser;
89
import com.moilioncircle.redis.replicator.cmd.parser.RenameNxParser;
90
import com.moilioncircle.redis.replicator.cmd.parser.RenameParser;
91
import com.moilioncircle.redis.replicator.cmd.parser.ReplConfParser;
92
import com.moilioncircle.redis.replicator.cmd.parser.RestoreParser;
93
import com.moilioncircle.redis.replicator.cmd.parser.SAddParser;
94
import com.moilioncircle.redis.replicator.cmd.parser.SDiffStoreParser;
95
import com.moilioncircle.redis.replicator.cmd.parser.SInterStoreParser;
96
import com.moilioncircle.redis.replicator.cmd.parser.SMoveParser;
97
import com.moilioncircle.redis.replicator.cmd.parser.SPublishParser;
98
import com.moilioncircle.redis.replicator.cmd.parser.SRemParser;
99
import com.moilioncircle.redis.replicator.cmd.parser.SUnionStoreParser;
100
import com.moilioncircle.redis.replicator.cmd.parser.ScriptParser;
101
import com.moilioncircle.redis.replicator.cmd.parser.SelectParser;
102
import com.moilioncircle.redis.replicator.cmd.parser.SetBitParser;
103
import com.moilioncircle.redis.replicator.cmd.parser.SetExParser;
104
import com.moilioncircle.redis.replicator.cmd.parser.SetNxParser;
105
import com.moilioncircle.redis.replicator.cmd.parser.SetParser;
106
import com.moilioncircle.redis.replicator.cmd.parser.SetRangeParser;
107
import com.moilioncircle.redis.replicator.cmd.parser.SortParser;
108
import com.moilioncircle.redis.replicator.cmd.parser.SwapDBParser;
109
import com.moilioncircle.redis.replicator.cmd.parser.UnLinkParser;
110
import com.moilioncircle.redis.replicator.cmd.parser.XAckDelParser;
111
import com.moilioncircle.redis.replicator.cmd.parser.XAckParser;
112
import com.moilioncircle.redis.replicator.cmd.parser.XAddParser;
113
import com.moilioncircle.redis.replicator.cmd.parser.XClaimParser;
114
import com.moilioncircle.redis.replicator.cmd.parser.XDelExParser;
115
import com.moilioncircle.redis.replicator.cmd.parser.XDelParser;
116
import com.moilioncircle.redis.replicator.cmd.parser.XGroupParser;
117
import com.moilioncircle.redis.replicator.cmd.parser.XSetIdParser;
118
import com.moilioncircle.redis.replicator.cmd.parser.XTrimParser;
119
import com.moilioncircle.redis.replicator.cmd.parser.ZAddParser;
120
import com.moilioncircle.redis.replicator.cmd.parser.ZDiffStoreParser;
121
import com.moilioncircle.redis.replicator.cmd.parser.ZIncrByParser;
122
import com.moilioncircle.redis.replicator.cmd.parser.ZInterStoreParser;
123
import com.moilioncircle.redis.replicator.cmd.parser.ZPopMaxParser;
124
import com.moilioncircle.redis.replicator.cmd.parser.ZPopMinParser;
125
import com.moilioncircle.redis.replicator.cmd.parser.ZRemParser;
126
import com.moilioncircle.redis.replicator.cmd.parser.ZRemRangeByLexParser;
127
import com.moilioncircle.redis.replicator.cmd.parser.ZRemRangeByRankParser;
128
import com.moilioncircle.redis.replicator.cmd.parser.ZRemRangeByScoreParser;
129
import com.moilioncircle.redis.replicator.cmd.parser.ZUnionStoreParser;
130
import com.moilioncircle.redis.replicator.event.AbstractEvent;
131
import com.moilioncircle.redis.replicator.event.Event;
132
import com.moilioncircle.redis.replicator.io.RedisInputStream;
133
import com.moilioncircle.redis.replicator.rdb.DefaultRdbVisitor;
134
import com.moilioncircle.redis.replicator.rdb.RdbVisitor;
135
import com.moilioncircle.redis.replicator.rdb.datatype.Module;
136
import com.moilioncircle.redis.replicator.rdb.module.ModuleKey;
137
import com.moilioncircle.redis.replicator.rdb.module.ModuleParser;
138
import com.moilioncircle.redis.replicator.util.type.Tuple2;
139

140
/**
141
 * @author Leon Chen
142
 * @version 2.1.1
143
 * @since 2.1.0
144
 */
145
public abstract class AbstractReplicator extends AbstractReplicatorListener implements Replicator {
1✔
146
    protected Configuration configuration;
147
    protected RedisInputStream inputStream;
148
    protected RdbVisitor rdbVisitor = new DefaultRdbVisitor(this);
1✔
149
    protected final AtomicReference<Status> connected = new AtomicReference<>(DISCONNECTED);
1✔
150
    protected final Map<ModuleKey, ModuleParser<? extends Module>> modules = new ConcurrentHashMap<>();
1✔
151
    protected final Map<CommandName, CommandParser<? extends Command>> commands = new ConcurrentHashMap<>();
1✔
152
    
153
    @Override
154
    public CommandParser<? extends Command> getCommandParser(CommandName command) {
155
        return commands.get(command);
×
156
    }
157
    
158
    @Override
159
    public <T extends Command> void addCommandParser(CommandName command, CommandParser<T> parser) {
160
        commands.put(command, parser);
1✔
161
    }
1✔
162
    
163
    @Override
164
    public CommandParser<? extends Command> removeCommandParser(CommandName command) {
165
        return commands.remove(command);
1✔
166
    }
167
    
168
    @Override
169
    public ModuleParser<? extends Module> getModuleParser(String moduleName, int moduleVersion) {
170
        return modules.get(ModuleKey.key(moduleName, moduleVersion));
1✔
171
    }
172
    
173
    @Override
174
    public <T extends Module> void addModuleParser(String moduleName, int moduleVersion, ModuleParser<T> parser) {
175
        modules.put(ModuleKey.key(moduleName, moduleVersion), parser);
1✔
176
    }
1✔
177
    
178
    @Override
179
    public ModuleParser<? extends Module> removeModuleParser(String moduleName, int moduleVersion) {
180
        return modules.remove(ModuleKey.key(moduleName, moduleVersion));
×
181
    }
182

183
    public void submitEvent(Event event) {
184
        long offset = configuration.getReplOffset();
1✔
185
        submitEvent(event, of(offset, offset));
1✔
186
    }
1✔
187

188
    public void submitEvent(Event event, Tuple2<Long, Long> offsets) {
189
        try {
190
            dress(event, offsets);
1✔
191
            doEventListener(this, event);
1✔
192
        } catch (UncheckedIOException e) {
×
193
            throw e;
×
194
            //ignore UncheckedIOException so that to propagate to caller.
195
        } catch (Throwable e) {
×
196
            doExceptionListener(this, e, event);
×
197
        }
1✔
198
    }
1✔
199

200
    protected void dress(Event event, Tuple2<Long, Long> offsets) {
201
        if (event instanceof AbstractEvent) {
1✔
202
            ((AbstractEvent) event).getContext().setOffsets(offsets);
1✔
203
        }
204
    }
1✔
205

206
    protected boolean compareAndSet(Status prev, Status next) {
207
        boolean result = connected.compareAndSet(prev, next);
1✔
208
        if (result) doStatusListener(this, next);
1✔
209
        return result;
1✔
210
    }
211
    
212
    protected void setStatus(Status next) {
213
        connected.set(next);
1✔
214
        doStatusListener(this, next);
1✔
215
    }
1✔
216
    
217
    @Override
218
    public boolean verbose() {
219
        return configuration != null && configuration.isVerbose();
1✔
220
    }
221
    
222
    @Override
223
    public Status getStatus() {
224
        return connected.get();
1✔
225
    }
226
    
227
    @Override
228
    public Configuration getConfiguration() {
229
        return configuration;
1✔
230
    }
231
    
232
    @Override
233
    public void setRdbVisitor(RdbVisitor rdbVisitor) {
234
        this.rdbVisitor = rdbVisitor;
1✔
235
    }
1✔
236
    
237
    @Override
238
    public RdbVisitor getRdbVisitor() {
239
        return this.rdbVisitor;
1✔
240
    }
241
    
242
    @Override
243
    public void builtInCommandParserRegister() {
244
        addCommandParser(CommandName.name("PING"), new PingParser());
1✔
245
        addCommandParser(CommandName.name("APPEND"), new AppendParser());
1✔
246
        addCommandParser(CommandName.name("SET"), new SetParser());
1✔
247
        addCommandParser(CommandName.name("SETEX"), new SetExParser());
1✔
248
        addCommandParser(CommandName.name("MSET"), new MSetParser());
1✔
249
        addCommandParser(CommandName.name("DEL"), new DelParser());
1✔
250
        addCommandParser(CommandName.name("SADD"), new SAddParser());
1✔
251
        addCommandParser(CommandName.name("HMSET"), new HMSetParser());
1✔
252
        addCommandParser(CommandName.name("HSET"), new HSetParser());
1✔
253
        addCommandParser(CommandName.name("LSET"), new LSetParser());
1✔
254
        addCommandParser(CommandName.name("EXPIRE"), new ExpireParser());
1✔
255
        addCommandParser(CommandName.name("EXPIREAT"), new ExpireAtParser());
1✔
256
        addCommandParser(CommandName.name("GETSET"), new GetSetParser());
1✔
257
        addCommandParser(CommandName.name("HSETNX"), new HSetNxParser());
1✔
258
        addCommandParser(CommandName.name("MSETNX"), new MSetNxParser());
1✔
259
        addCommandParser(CommandName.name("PSETEX"), new PSetExParser());
1✔
260
        addCommandParser(CommandName.name("SETNX"), new SetNxParser());
1✔
261
        addCommandParser(CommandName.name("SETRANGE"), new SetRangeParser());
1✔
262
        addCommandParser(CommandName.name("HDEL"), new HDelParser());
1✔
263
        addCommandParser(CommandName.name("LPOP"), new LPopParser());
1✔
264
        addCommandParser(CommandName.name("LPUSH"), new LPushParser());
1✔
265
        addCommandParser(CommandName.name("LPUSHX"), new LPushXParser());
1✔
266
        addCommandParser(CommandName.name("LRem"), new LRemParser());
1✔
267
        addCommandParser(CommandName.name("RPOP"), new RPopParser());
1✔
268
        addCommandParser(CommandName.name("RPUSH"), new RPushParser());
1✔
269
        addCommandParser(CommandName.name("RPUSHX"), new RPushXParser());
1✔
270
        addCommandParser(CommandName.name("ZREM"), new ZRemParser());
1✔
271
        addCommandParser(CommandName.name("RENAME"), new RenameParser());
1✔
272
        addCommandParser(CommandName.name("INCR"), new IncrParser());
1✔
273
        addCommandParser(CommandName.name("DECR"), new DecrParser());
1✔
274
        addCommandParser(CommandName.name("INCRBY"), new IncrByParser());
1✔
275
        addCommandParser(CommandName.name("DECRBY"), new DecrByParser());
1✔
276
        addCommandParser(CommandName.name("PERSIST"), new PersistParser());
1✔
277
        addCommandParser(CommandName.name("SELECT"), new SelectParser());
1✔
278
        addCommandParser(CommandName.name("FLUSHALL"), new FlushAllParser());
1✔
279
        addCommandParser(CommandName.name("FLUSHDB"), new FlushDBParser());
1✔
280
        addCommandParser(CommandName.name("HINCRBY"), new HIncrByParser());
1✔
281
        addCommandParser(CommandName.name("ZINCRBY"), new ZIncrByParser());
1✔
282
        addCommandParser(CommandName.name("MOVE"), new MoveParser());
1✔
283
        addCommandParser(CommandName.name("SMOVE"), new SMoveParser());
1✔
284
        addCommandParser(CommandName.name("PFADD"), new PFAddParser());
1✔
285
        addCommandParser(CommandName.name("PFCOUNT"), new PFCountParser());
1✔
286
        addCommandParser(CommandName.name("PFMERGE"), new PFMergeParser());
1✔
287
        addCommandParser(CommandName.name("SDIFFSTORE"), new SDiffStoreParser());
1✔
288
        addCommandParser(CommandName.name("SINTERSTORE"), new SInterStoreParser());
1✔
289
        addCommandParser(CommandName.name("SUNIONSTORE"), new SUnionStoreParser());
1✔
290
        addCommandParser(CommandName.name("ZADD"), new ZAddParser());
1✔
291
        addCommandParser(CommandName.name("ZINTERSTORE"), new ZInterStoreParser());
1✔
292
        addCommandParser(CommandName.name("ZUNIONSTORE"), new ZUnionStoreParser());
1✔
293
        addCommandParser(CommandName.name("BRPOPLPUSH"), new BRPopLPushParser());
1✔
294
        addCommandParser(CommandName.name("LINSERT"), new LInsertParser());
1✔
295
        addCommandParser(CommandName.name("RENAMENX"), new RenameNxParser());
1✔
296
        addCommandParser(CommandName.name("RESTORE"), new RestoreParser());
1✔
297
        addCommandParser(CommandName.name("PEXPIRE"), new PExpireParser());
1✔
298
        addCommandParser(CommandName.name("PEXPIREAT"), new PExpireAtParser());
1✔
299
        addCommandParser(CommandName.name("GEOADD"), new GeoAddParser());
1✔
300
        addCommandParser(CommandName.name("EVAL"), new EvalParser());
1✔
301
        addCommandParser(CommandName.name("EVALSHA"), new EvalShaParser());
1✔
302
        addCommandParser(CommandName.name("SCRIPT"), new ScriptParser());
1✔
303
        addCommandParser(CommandName.name("PUBLISH"), new PublishParser());
1✔
304
        addCommandParser(CommandName.name("BITOP"), new BitOpParser());
1✔
305
        addCommandParser(CommandName.name("BITFIELD"), new BitFieldParser());
1✔
306
        addCommandParser(CommandName.name("SETBIT"), new SetBitParser());
1✔
307
        addCommandParser(CommandName.name("SREM"), new SRemParser());
1✔
308
        addCommandParser(CommandName.name("UNLINK"), new UnLinkParser());
1✔
309
        addCommandParser(CommandName.name("SWAPDB"), new SwapDBParser());
1✔
310
        addCommandParser(CommandName.name("MULTI"), new MultiParser());
1✔
311
        addCommandParser(CommandName.name("EXEC"), new ExecParser());
1✔
312
        addCommandParser(CommandName.name("ZREMRANGEBYSCORE"), new ZRemRangeByScoreParser());
1✔
313
        addCommandParser(CommandName.name("ZREMRANGEBYRANK"), new ZRemRangeByRankParser());
1✔
314
        addCommandParser(CommandName.name("ZREMRANGEBYLEX"), new ZRemRangeByLexParser());
1✔
315
        addCommandParser(CommandName.name("LTRIM"), new LTrimParser());
1✔
316
        addCommandParser(CommandName.name("SORT"), new SortParser());
1✔
317
        addCommandParser(CommandName.name("RPOPLPUSH"), new RPopLPushParser());
1✔
318
        addCommandParser(CommandName.name("ZPOPMIN"), new ZPopMinParser());
1✔
319
        addCommandParser(CommandName.name("ZPOPMAX"), new ZPopMaxParser());
1✔
320
        addCommandParser(CommandName.name("REPLCONF"), new ReplConfParser());
1✔
321
        addCommandParser(CommandName.name("XACK"), new XAckParser());
1✔
322
        addCommandParser(CommandName.name("XADD"), new XAddParser());
1✔
323
        addCommandParser(CommandName.name("XCLAIM"), new XClaimParser());
1✔
324
        addCommandParser(CommandName.name("XDEL"), new XDelParser());
1✔
325
        addCommandParser(CommandName.name("XGROUP"), new XGroupParser());
1✔
326
        addCommandParser(CommandName.name("XTRIM"), new XTrimParser());
1✔
327
        addCommandParser(CommandName.name("XSETID"), new XSetIdParser());
1✔
328
        // since redis 6.2
329
        addCommandParser(CommandName.name("COPY"), new CopyParser());
1✔
330
        addCommandParser(CommandName.name("LMOVE"), new LMoveParser());
1✔
331
        addCommandParser(CommandName.name("BLMOVE"), new BLMoveParser());
1✔
332
        addCommandParser(CommandName.name("ZDIFFSTORE"), new ZDiffStoreParser());
1✔
333
        addCommandParser(CommandName.name("GEOSEARCHSTORE"), new GeoSearchStoreParser());
1✔
334
        // since redis 7.0
335
        addCommandParser(CommandName.name("SPUBLISH"), new SPublishParser());
1✔
336
        addCommandParser(CommandName.name("FUNCTION"), new FunctionParser());
1✔
337
        // since redis 7.4
338
        addCommandParser(CommandName.name("HSETEX"), new HSetExParser());
1✔
339
        addCommandParser(CommandName.name("HPEXPIREAT"), new HPExpireAtParser());
1✔
340
        addCommandParser(CommandName.name("HPERSIST"), new HPersistParser());
1✔
341
        // since redis 8.2
342
        addCommandParser(CommandName.name("XACKDEL"), new XAckDelParser());
1✔
343
        addCommandParser(CommandName.name("XDELEX"), new XDelExParser());
1✔
344
    }
1✔
345
    
346
    @Override
347
    public void open() throws IOException {
348
        if (!compareAndSet(DISCONNECTED, CONNECTED)) return;
1✔
349
        try {
350
            doOpen();
1✔
351
        } catch (UncheckedIOException e) {
×
352
            if (!(e.getCause() instanceof EOFException)) throw e.getCause();
×
353
        } finally {
354
            doClose();
1✔
355
            doCloseListener(this);
1✔
356
        }
357
    }
1✔
358
    
359
    @Override
360
    public void close() throws IOException {
361
        compareAndSet(CONNECTED, DISCONNECTING);
1✔
362
    }
1✔
363
    
364
    protected void doOpen() throws IOException {
365
        // NOP
366
    }
×
367
    
368
    protected void doClose() throws IOException {
369
        // NOP
370
    }
×
371
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc