• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

leonchen83 / redis-replicator / #2013

pending completion
#2013

push

leonchen83
redis 7.2-RC

1 of 1 new or added line in 1 file covered. (100.0%)

6630 of 9264 relevant lines covered (71.57%)

0.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.68
/src/main/java/com/moilioncircle/redis/replicator/rdb/ScanRdbGenerator.java
1
/*
2
 * Copyright 2016-2017 Leon Chen
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package com.moilioncircle.redis.replicator.rdb;
18

19
import static com.moilioncircle.redis.replicator.Constants.RDB_OPCODE_AUX;
20
import static com.moilioncircle.redis.replicator.Constants.RDB_OPCODE_EOF;
21
import static com.moilioncircle.redis.replicator.Constants.RDB_OPCODE_EXPIRETIME_MS;
22
import static com.moilioncircle.redis.replicator.Constants.RDB_OPCODE_RESIZEDB;
23
import static com.moilioncircle.redis.replicator.Constants.RDB_OPCODE_SELECTDB;
24
import static com.moilioncircle.redis.replicator.util.Strings.lappend;
25

26
import java.io.BufferedOutputStream;
27
import java.io.EOFException;
28
import java.io.IOException;
29
import java.io.OutputStream;
30
import java.util.HashMap;
31
import java.util.Map;
32
import java.util.Queue;
33

34
import com.moilioncircle.redis.replicator.Configuration;
35
import com.moilioncircle.redis.replicator.client.RESP2;
36
import com.moilioncircle.redis.replicator.client.RESP2Client;
37
import com.moilioncircle.redis.replicator.io.CRCOutputStream;
38
import com.moilioncircle.redis.replicator.rdb.datatype.DB;
39
import com.moilioncircle.redis.replicator.util.ByteArray;
40
import com.moilioncircle.redis.replicator.util.type.Tuple2;
41

42
/**
43
 * @author Leon Chen
44
 * @since 3.7.0
45
 */
46
public class ScanRdbGenerator {
47
    
48
    private int db = 0;
1✔
49
    private RESP2Client client;
50
    
51
    protected final int port;
52
    protected final String host;
53
    private final CRCOutputStream out;
54
    private final Configuration configuration;
55
    
56
    private static Map<String, Integer> VERSIONS = new HashMap<>();
1✔
57
    
58
    static {
59
        VERSIONS.put("2.6", 6);
1✔
60
        VERSIONS.put("2.8", 6);
1✔
61
        VERSIONS.put("3.0", 6);
1✔
62
        VERSIONS.put("3.2", 7);
1✔
63
        VERSIONS.put("4.0", 8);
1✔
64
        VERSIONS.put("5.0", 9);
1✔
65
        VERSIONS.put("6.0", 9);
1✔
66
        VERSIONS.put("6.2", 9);
1✔
67
        VERSIONS.put("7.0", 10);
1✔
68
        VERSIONS.put("7.2", 11);
1✔
69
    }
1✔
70
    
71
    public ScanRdbGenerator(String host, int port, Configuration configuration, OutputStream out) {
1✔
72
        this.host = host;
1✔
73
        this.port = port;
1✔
74
        this.configuration = configuration;
1✔
75
        this.out = new CRCOutputStream(new BufferedOutputStream(out, this.configuration.getBufferSize()));
1✔
76
    }
1✔
77
    
78
    public void generate() throws IOException {
79
        try {
80
            this.client = new RESP2Client(host, port, configuration);
1✔
81
            /*
82
             * rdb version
83
             */
84
            int version = 0;
1✔
85
            String ver = null;
1✔
86
            String bits = null;
1✔
87
            
88
            RESP2.Node server = retry(client -> {
1✔
89
                RESP2Client.Command r = client.newCommand();
1✔
90
                return r.invoke("info", "server");
1✔
91
            });
92
            
93
            if (server.type == RESP2.Type.ERROR) {
1✔
94
                throw new IOException(server.getError());
×
95
            } else {
96
                String value = server.getString();
1✔
97
                String[] lines = value.split("\r\n");
1✔
98
                for (int i = 1; i < lines.length; i++) {
1✔
99
                    String[] kv = lines[i].split(":");
1✔
100
                    String key = kv[0];
1✔
101
                    
102
                    if (key.equals("redis_version")) {
1✔
103
                        String val = kv[1];
1✔
104
                        ver = val;
1✔
105
                        
106
                        val = val.substring(0, val.lastIndexOf('.'));
1✔
107
                        if (!VERSIONS.containsKey(val)) {
1✔
108
                            throw new AssertionError("unsupported redis version :" + val);
×
109
                        }
110
                        
111
                        version = VERSIONS.get(val);
1✔
112
                    } else if (key.equals("arch_bits")) {
1✔
113
                        String val = kv[1];
1✔
114
                        bits = val;
1✔
115
                    }
116
                }
117
            }
118
            
119
            /*
120
             * version
121
             */
122
            out.write("REDIS".getBytes());
1✔
123
            out.write(lappend(version, 4, '0').getBytes());
1✔
124
            
125
            /*
126
             * aux
127
             */
128
            if (version >= 7) {
1✔
129
                generateAux("redis-ver", ver);
1✔
130
                generateAux("redis-bits", bits);
1✔
131
                generateAux("ctime", String.valueOf(System.currentTimeMillis() / 1000L));
1✔
132
                
133
                // used-memory
134
                RESP2.Node memory = retry(client -> {
1✔
135
                    RESP2Client.Command r = client.newCommand();
1✔
136
                    return r.invoke("info", "memory");
1✔
137
                });
138
                
139
                if (memory.type == RESP2.Type.STRING) {
1✔
140
                    String value = memory.getString();
1✔
141
                    String[] lines = value.split("\r\n");
1✔
142
                    for (int i = 1; i < lines.length; i++) {
1✔
143
                        String[] kv = lines[i].split(":");
1✔
144
                        String key = kv[0];
1✔
145
                        
146
                        if (key.equals("used_memory")) {
1✔
147
                            String val = kv[1];
1✔
148
                            generateAux("used-mem", val);
1✔
149
                        }
150
                    }
151
                }
152
            }
153
            
154
            if (version >= 10) {
1✔
155
                /*
156
                 * rdb function
157
                 */
158
                RESP2.Node functions = retry(client -> {
×
159
                    RESP2Client.Command r = client.newCommand();
×
160
                    return r.invoke("function", "dump");
×
161
                });
162
                
163
                if (functions.type == RESP2.Type.ERROR) {
×
164
                    throw new IOException(functions.getError());
×
165
                } else {
166
                    ByteArray funcs = functions.getBytes();
×
167
                    if (funcs != null) {
×
168
                        funcs.writeTo(out, 0, funcs.length() - 10);
×
169
                    }
170
                }
171
            }
172
            
173
            /*
174
             * rdb db info
175
             */
176
            RESP2.Node keyspace = retry(client -> {
1✔
177
                RESP2Client.Command r = client.newCommand();
1✔
178
                return r.invoke("info", "keyspace");
1✔
179
            });
180
            
181
            String[] line = keyspace.getString().split("\r\n");
1✔
182
            for (int i = 1; i < line.length; i++) {
1✔
183
                // db{dbnum}:keys={dbsize},expires={expires},avg_ttl=0
184
                String[] ary = line[i].split(":");
1✔
185
                Integer dbnum = Integer.parseInt(ary[0].substring(2));
1✔
186
                ary = ary[1].split(",");
1✔
187
                long dbsize = Long.parseLong(ary[0].split("=")[1]);
1✔
188
                long expires = Long.parseLong(ary[1].split("=")[1]);
1✔
189
                DB db = new DB(dbnum, dbsize, expires);
1✔
190
                generateDB(db, version);
1✔
191
            }
192
            
193
            out.write(RDB_OPCODE_EOF);
1✔
194
            out.write(out.getCRC64());
1✔
195
            out.flush();
1✔
196
        } finally {
197
            close();
1✔
198
        }
199
    }
1✔
200
    
201
    private void generateAux(String key, String val) throws IOException {
202
        if (val == null) return;
1✔
203
        BaseRdbEncoder encoder = new BaseRdbEncoder();
1✔
204
        out.write(RDB_OPCODE_AUX);
1✔
205
        encoder.rdbGenericSaveStringObject(new ByteArray(key.getBytes()), out);
1✔
206
        encoder.rdbGenericSaveStringObject(new ByteArray(val.getBytes()), out);
1✔
207
    }
1✔
208
    
209
    private void generateDB(DB db, int version) throws IOException {
210
        BaseRdbEncoder encoder = new BaseRdbEncoder();
1✔
211
        RESP2.Node select = retry(client -> {
1✔
212
            RESP2Client.Command r = client.newCommand();
1✔
213
            return r.invoke("select", String.valueOf(db.getDbNumber()));
1✔
214
        });
215
        
216
        /*
217
         * select
218
         */
219
        if (select.type == RESP2.Type.ERROR) {
1✔
220
            throw new IOException(select.getError());
×
221
        } else {
222
            this.db = (int) db.getDbNumber();
1✔
223
        }
224
        
225
        /*
226
         * db
227
         */
228
        out.write(RDB_OPCODE_SELECTDB);
1✔
229
        encoder.rdbSaveLen(db.getDbNumber(), out);
1✔
230
        if (version >= 7) {
1✔
231
            out.write(RDB_OPCODE_RESIZEDB);
1✔
232
            encoder.rdbSaveLen(db.getDbsize(), out);
1✔
233
            encoder.rdbSaveLen(db.getExpires(), out);
1✔
234
        }
235
        
236
        /*
237
         * scan
238
         */
239
        String cursor = "0";
1✔
240
        String step = String.valueOf(configuration.getScanStep());
1✔
241
        do {
242
            String temp = cursor;
1✔
243
            RESP2.Node scan = retry(client -> {
1✔
244
                RESP2Client.Command r = client.newCommand();
1✔
245
                return r.invoke("scan", temp, "count", step);
1✔
246
            });
247
            if (scan.type == RESP2.Type.ERROR) {
1✔
248
                throw new IOException(scan.getError());
×
249
            }
250
            
251
            RESP2.Node[] ary = scan.getArray();
1✔
252
            cursor = ary[0].getString();
1✔
253
            
254
            // key value pipeline
255
            RESP2Client.Command command = retry(client -> {
1✔
256
                RESP2Client.Command r = client.newCommand();
1✔
257
                RESP2.Node[] nodes = ary[1].getArray();
1✔
258
                for (int i = 0; i < nodes.length; i++) {
1✔
259
                    byte[] key = nodes[i].getBytes().first();
1✔
260
                    if (version >= 10) {
1✔
261
                        PExpireTimeNodeConsumer context = new PExpireTimeNodeConsumer();
×
262
                        r.post(context, "pexpiretime".getBytes(), key);
×
263
                        r.post(new DumpNodeConsumer(key, out, context), "dump".getBytes(), key);
×
264
                    } else {
×
265
                        PTTLNodeConsumer context = new PTTLNodeConsumer();
1✔
266
                        r.post(context, "pttl".getBytes(), key);
1✔
267
                        r.post(new DumpNodeConsumer(key, out, context), "dump".getBytes(), key);
1✔
268
                    }
269
                }
270
                return r;
1✔
271
            });
272
            retry(command);
1✔
273
        } while (!cursor.equals("0"));
1✔
274
    }
1✔
275
    
276
    private static interface TTLContext {
277
        Long getTTL();
278
    }
279
    
280
    private static class PTTLNodeConsumer implements RESP2Client.NodeConsumer, TTLContext {
281
        
282
        private Long ttl;
283
        
284
        @Override
285
        public Long getTTL() {
286
            return this.ttl;
1✔
287
        }
288
        
289
        @Override
290
        public void accept(RESP2.Node node) throws IOException {
291
            if (node.type == RESP2.Type.ERROR) {
1✔
292
                throw new IOException(node.getError());
×
293
            }
294
            Long ttl = node.getNumber();
1✔
295
            if (ttl >= 0) {
1✔
296
                this.ttl = System.currentTimeMillis() + ttl;
×
297
            }
298
        }
1✔
299
    }
300
    
301
    private static class PExpireTimeNodeConsumer implements RESP2Client.NodeConsumer, TTLContext {
302
        
303
        private Long ttl;
304
        
305
        @Override
306
        public Long getTTL() {
307
            return this.ttl;
×
308
        }
309
        
310
        @Override
311
        public void accept(RESP2.Node node) throws IOException {
312
            if (node.type == RESP2.Type.ERROR) {
×
313
                throw new IOException(node.getError());
×
314
            }
315
            Long ttl = node.getNumber();
×
316
            if (ttl >= 0) {
×
317
                this.ttl = ttl;
×
318
            }
319
        }
×
320
    }
321
    
322
    private static class DumpNodeConsumer implements RESP2Client.NodeConsumer {
323
        
324
        private byte[] key;
325
        private OutputStream out;
326
        private TTLContext context;
327
        private BaseRdbEncoder encoder = new BaseRdbEncoder();
1✔
328
        
329
        public DumpNodeConsumer(byte[] key, OutputStream out, TTLContext context) {
1✔
330
            this.key = key;
1✔
331
            this.out = out;
1✔
332
            this.context = context;
1✔
333
        }
1✔
334
        
335
        @Override
336
        public void accept(RESP2.Node node) throws IOException {
337
            if (node.type == RESP2.Type.ERROR) {
1✔
338
                throw new IOException(node.getError());
×
339
            }
340
            
341
            if (node.value != null) {
1✔
342
                Long ttl = context.getTTL();
1✔
343
                if (ttl != null) {
1✔
344
                    out.write(RDB_OPCODE_EXPIRETIME_MS);
×
345
                    encoder.rdbSaveMillisecondTime(ttl, out);
×
346
                }
347
                ByteArray value = node.getBytes();
1✔
348
                byte type = value.get(0);
1✔
349
                out.write(type);
1✔
350
                encoder.rdbGenericSaveStringObject(new ByteArray(key), out);
1✔
351
                value.writeTo(out, 1, value.length() - 11);
1✔
352
            }
353
        }
1✔
354
    }
355
    
356
    private RESP2Client recreate(RESP2Client prev, int db, IOException reason) throws IOException {
357
        IOException exception = reason;
×
358
        for (int i = 0; i < configuration.getRetries() || configuration.getRetries() <= 0; i++) {
×
359
            try {
360
                return RESP2Client.valueOf(prev, db, exception, i + 1);
×
361
            } catch (IOException e) {
×
362
                exception = e;
×
363
            }
364
        }
365
        throw exception;
×
366
    }
367
    
368
    private <T> T retry(RESP2Client.Function<RESP2Client, T> function) throws IOException {
369
        IOException exception = null;
1✔
370
        for (int i = 0; i < configuration.getRetries() || configuration.getRetries() <= 0; i++) {
1✔
371
            try {
372
                return function.apply(client);
1✔
373
            } catch (EOFException e) {
×
374
                throw e;
×
375
            } catch (IOException e) {
×
376
                exception = e;
×
377
                this.client = recreate(this.client, this.db, e);
×
378
            }
379
        }
380
        throw exception;
×
381
    }
382
    
383
    private void retry(RESP2Client.Command prev) throws IOException {
384
        IOException exception = null;
1✔
385
        for (int i = 0; i < configuration.getRetries() || configuration.getRetries() <= 0; i++) {
1✔
386
            try {
387
                prev.get();
1✔
388
                return;
1✔
389
            } catch (EOFException e) {
×
390
                throw e;
×
391
            } catch (IOException e) {
×
392
                exception = e;
×
393
                this.client = recreate(this.client, this.db, e);
×
394
                Queue<Tuple2<RESP2Client.NodeConsumer, byte[][]>> commands = prev.getCommands();
×
395
                RESP2Client.Command next = retry(client -> {
×
396
                    RESP2Client.Command r = client.newCommand();
×
397
                    while (!commands.isEmpty()) {
×
398
                        Tuple2<RESP2Client.NodeConsumer, byte[][]> tuple = commands.poll();
×
399
                        r.post(tuple.getV1(), tuple.getV2());
×
400
                    }
×
401
                    return r;
×
402
                });
403
                prev = next;
×
404
            }
405
        }
406
        throw exception;
×
407
    }
408
    
409
    private void close() {
410
        if (client != null) {
1✔
411
            try {
412
                client.close();
1✔
413
            } catch (IOException e) {
×
414
            }
1✔
415
        }
416
    }
1✔
417
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc