• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

leonchen83 / redis-replicator / #2153

06 Jun 2025 04:52AM UTC coverage: 71.352% (-0.3%) from 71.609%
#2153

push

chenby
redis 8.0

3 of 33 new or added lines in 14 files covered. (9.09%)

3 existing lines in 2 files now uncovered.

6630 of 9292 relevant lines covered (71.35%)

0.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.5
/src/main/java/com/moilioncircle/redis/replicator/rdb/dump/DumpRdbValueVisitor.java
1
/*
2
 * Copyright 2016-2017 Leon Chen
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package com.moilioncircle.redis.replicator.rdb.dump;
18

19
import static com.moilioncircle.redis.replicator.Constants.MODULE_SET;
20
import static com.moilioncircle.redis.replicator.Constants.QUICKLIST_NODE_CONTAINER_PACKED;
21
import static com.moilioncircle.redis.replicator.Constants.QUICKLIST_NODE_CONTAINER_PLAIN;
22
import static com.moilioncircle.redis.replicator.Constants.RDB_LOAD_NONE;
23
import static com.moilioncircle.redis.replicator.Constants.RDB_MODULE_OPCODE_EOF;
24
import static com.moilioncircle.redis.replicator.Constants.RDB_OPCODE_FUNCTION;
25
import static com.moilioncircle.redis.replicator.Constants.RDB_OPCODE_FUNCTION2;
26
import static com.moilioncircle.redis.replicator.Constants.RDB_TYPE_HASH;
27
import static com.moilioncircle.redis.replicator.Constants.RDB_TYPE_HASH_LISTPACK;
28
import static com.moilioncircle.redis.replicator.Constants.RDB_TYPE_HASH_ZIPLIST;
29
import static com.moilioncircle.redis.replicator.Constants.RDB_TYPE_HASH_ZIPMAP;
30
import static com.moilioncircle.redis.replicator.Constants.RDB_TYPE_LIST;
31
import static com.moilioncircle.redis.replicator.Constants.RDB_TYPE_LIST_QUICKLIST;
32
import static com.moilioncircle.redis.replicator.Constants.RDB_TYPE_LIST_QUICKLIST_2;
33
import static com.moilioncircle.redis.replicator.Constants.RDB_TYPE_LIST_ZIPLIST;
34
import static com.moilioncircle.redis.replicator.Constants.RDB_TYPE_MODULE;
35
import static com.moilioncircle.redis.replicator.Constants.RDB_TYPE_MODULE_2;
36
import static com.moilioncircle.redis.replicator.Constants.RDB_TYPE_SET;
37
import static com.moilioncircle.redis.replicator.Constants.RDB_TYPE_SET_INTSET;
38
import static com.moilioncircle.redis.replicator.Constants.RDB_TYPE_SET_LISTPACK;
39
import static com.moilioncircle.redis.replicator.Constants.RDB_TYPE_STREAM_LISTPACKS;
40
import static com.moilioncircle.redis.replicator.Constants.RDB_TYPE_STREAM_LISTPACKS_2;
41
import static com.moilioncircle.redis.replicator.Constants.RDB_TYPE_STREAM_LISTPACKS_3;
42
import static com.moilioncircle.redis.replicator.Constants.RDB_TYPE_STRING;
43
import static com.moilioncircle.redis.replicator.Constants.RDB_TYPE_ZSET;
44
import static com.moilioncircle.redis.replicator.Constants.RDB_TYPE_ZSET_2;
45
import static com.moilioncircle.redis.replicator.Constants.RDB_TYPE_ZSET_LISTPACK;
46
import static com.moilioncircle.redis.replicator.Constants.RDB_TYPE_ZSET_ZIPLIST;
47
import static com.moilioncircle.redis.replicator.rdb.BaseRdbParser.StringHelper.listPackEntry;
48
import static com.moilioncircle.redis.replicator.util.CRC64.crc64;
49
import static com.moilioncircle.redis.replicator.util.CRC64.longToByteArray;
50

51
import java.io.IOException;
52
import java.nio.ByteBuffer;
53
import java.util.List;
54
import java.util.NoSuchElementException;
55

56
import com.moilioncircle.redis.replicator.Replicator;
57
import com.moilioncircle.redis.replicator.io.ByteBufferOutputStream;
58
import com.moilioncircle.redis.replicator.io.RawByteListener;
59
import com.moilioncircle.redis.replicator.io.RedisInputStream;
60
import com.moilioncircle.redis.replicator.rdb.BaseRdbEncoder;
61
import com.moilioncircle.redis.replicator.rdb.BaseRdbParser;
62
import com.moilioncircle.redis.replicator.rdb.DefaultRdbValueVisitor;
63
import com.moilioncircle.redis.replicator.rdb.datatype.Module;
64
import com.moilioncircle.redis.replicator.rdb.dump.datatype.DumpFunction;
65
import com.moilioncircle.redis.replicator.rdb.module.ModuleParser;
66
import com.moilioncircle.redis.replicator.rdb.skip.SkipRdbParser;
67
import com.moilioncircle.redis.replicator.util.ByteArray;
68
import com.moilioncircle.redis.replicator.util.ByteBuilder;
69
import com.moilioncircle.redis.replicator.util.Strings;
70

71
/**
72
 * @author Leon Chen
73
 * @since 3.1.0
74
 */
75
@SuppressWarnings("unchecked")
76
public class DumpRdbValueVisitor extends DefaultRdbValueVisitor {
77

78
    private class DefaultRawByteListener implements RawByteListener {
79
        private final int version;
80
        private final ByteBuilder builder;
81

82
        private DefaultRawByteListener(byte type, int version) {
1✔
83
            this.builder = ByteBuilder.allocate(DumpRdbValueVisitor.this.size);
1✔
84
            this.builder.put(type);
1✔
85
            int ver = DumpRdbValueVisitor.this.version;
1✔
86
            this.version = ver == -1 ? version : ver;
1✔
87
        }
1✔
88

89
        @Override
90
        public void handle(byte... rawBytes) {
91
            this.builder.put(rawBytes);
1✔
92
        }
1✔
93
    
94
        public void handle(ByteBuffer buffer) {
95
            this.builder.put(buffer);
1✔
96
        }
1✔
97

98
        public byte[] getBytes() {
99
            this.builder.put((byte) version);
1✔
100
            this.builder.put((byte) 0x00);
1✔
101
            List<ByteBuffer> buffers = this.builder.buffers();
1✔
102
            byte[] crc = longToByteArray(crc64(buffers));
1✔
103
            for (byte b : crc) {
1✔
104
                this.builder.put(b);
1✔
105
            }
106
            return this.builder.array();
1✔
107
        }
108
    }
109

110
    private final int size;
111
    private final int version;
112

113
    public DumpRdbValueVisitor(Replicator replicator) {
114
        this(replicator, -1);
×
115
    }
×
116

117
    public DumpRdbValueVisitor(Replicator replicator, int version) {
118
        this(replicator, version, 8192);
×
119
    }
×
120

121
    public DumpRdbValueVisitor(Replicator replicator, int version, int size) {
122
        super(replicator);
1✔
123
        this.version = version;
1✔
124
        this.size = size;
1✔
125
    }
1✔
126
    
127
    @Override
128
    public <T> T applyFunction(RedisInputStream in, int version) throws IOException {
129
        DefaultRawByteListener listener = new DefaultRawByteListener((byte) RDB_OPCODE_FUNCTION, version);
1✔
130
        replicator.addRawByteListener(listener);
1✔
131
        try {
132
            SkipRdbParser parser = new SkipRdbParser(in);
1✔
133
            parser.rdbGenericLoadStringObject(); // name
1✔
134
            parser.rdbGenericLoadStringObject(); // engine name
1✔
135
            long hasDesc = parser.rdbLoadLen().len;
1✔
136
            if (hasDesc == 1) {
1✔
137
                parser.rdbGenericLoadStringObject(); // description
1✔
138
            }
139
            parser.rdbGenericLoadStringObject(); // code
1✔
140
        } finally {
141
            replicator.removeRawByteListener(listener);
1✔
142
        }
143
        DumpFunction function = new DumpFunction();
1✔
144
        function.setSerialized(listener.getBytes());
1✔
145
        return (T) function;
1✔
146
    }
147
    
148
    @Override
149
    public <T> T applyFunction2(RedisInputStream in, int version) throws IOException {
150
        DefaultRawByteListener listener = new DefaultRawByteListener((byte) RDB_OPCODE_FUNCTION2, version);
1✔
151
        replicator.addRawByteListener(listener);
1✔
152
        try {
153
            SkipRdbParser parser = new SkipRdbParser(in);
1✔
154
            parser.rdbGenericLoadStringObject(); // code
1✔
155
        } finally {
156
            replicator.removeRawByteListener(listener);
1✔
157
        }
158
        DumpFunction function = new DumpFunction();
1✔
159
        function.setSerialized(listener.getBytes());
1✔
160
        return (T) function;
1✔
161
    }
162

163
    @Override
164
    public <T> T applyString(RedisInputStream in, int version) throws IOException {
165
        DefaultRawByteListener listener = new DefaultRawByteListener((byte) RDB_TYPE_STRING, version);
1✔
166
        replicator.addRawByteListener(listener);
1✔
167
        try {
168
            new SkipRdbParser(in).rdbLoadEncodedStringObject();
1✔
169
        } finally {
170
            replicator.removeRawByteListener(listener);
1✔
171
        }
172
        return (T) listener.getBytes();
1✔
173
    }
174

175
    @Override
176
    public <T> T applyList(RedisInputStream in, int version) throws IOException {
177
        DefaultRawByteListener listener = new DefaultRawByteListener((byte) RDB_TYPE_LIST, version);
1✔
178
        replicator.addRawByteListener(listener);
1✔
179
        try {
180
            SkipRdbParser skipParser = new SkipRdbParser(in);
1✔
181
            long len = skipParser.rdbLoadLen().len;
1✔
182
            while (len > 0) {
1✔
183
                skipParser.rdbLoadEncodedStringObject();
1✔
184
                len--;
1✔
185
            }
186
        } finally {
187
            replicator.removeRawByteListener(listener);
1✔
188
        }
189
        return (T) listener.getBytes();
1✔
190
    }
191

192
    @Override
193
    public <T> T applySet(RedisInputStream in, int version) throws IOException {
194
        DefaultRawByteListener listener = new DefaultRawByteListener((byte) RDB_TYPE_SET, version);
1✔
195
        replicator.addRawByteListener(listener);
1✔
196
        try {
197
            SkipRdbParser skipParser = new SkipRdbParser(in);
1✔
198
            long len = skipParser.rdbLoadLen().len;
1✔
199
            while (len > 0) {
1✔
200
                skipParser.rdbLoadEncodedStringObject();
1✔
201
                len--;
1✔
202
            }
203
        } finally {
204
            replicator.removeRawByteListener(listener);
1✔
205
        }
206
        return (T) listener.getBytes();
1✔
207
    }
208
    
209
    @Override
210
    public <T> T applySetListPack(RedisInputStream in, int version) throws IOException {
211
        if (this.version != -1 && this.version < 11 /* since redis rdb version 11 */) {
1✔
212
            // downgrade to RDB_TYPE_SET
213
            BaseRdbParser parser = new BaseRdbParser(in);
1✔
214
            BaseRdbEncoder encoder = new BaseRdbEncoder();
1✔
215
            DefaultRawByteListener listener = new DefaultRawByteListener((byte) RDB_TYPE_SET, version);
1✔
216
            try (ByteBufferOutputStream out = new ByteBufferOutputStream(size)) {
1✔
217
                RedisInputStream listPack = new RedisInputStream(parser.rdbLoadPlainStringObject());
1✔
218
                listPack.skip(4); // total-bytes
1✔
219
                int len = listPack.readInt(2);
1✔
220
                listener.handle(encoder.rdbSaveLen(len));
1✔
221
                while (len > 0) {
1✔
222
                    byte[] element = listPackEntry(listPack);
1✔
223
                    encoder.rdbGenericSaveStringObject(new ByteArray(element), out);
1✔
224
                    len--;
1✔
225
                }
1✔
226
                int lpend = listPack.read(); // lp-end
1✔
227
                if (lpend != 255) {
1✔
228
                    throw new AssertionError("listpack expect 255 but " + lpend);
×
229
                }
230
                listener.handle(out.toByteBuffer());
1✔
231
                return (T) listener.getBytes();
1✔
232
            }
233
        } else {
234
            DefaultRawByteListener listener = new DefaultRawByteListener((byte) RDB_TYPE_SET_LISTPACK, version);
1✔
235
            replicator.addRawByteListener(listener);
1✔
236
            try {
237
                SkipRdbParser skipParser = new SkipRdbParser(in);
1✔
238
                skipParser.rdbLoadPlainStringObject();
1✔
239
            } finally {
240
                replicator.removeRawByteListener(listener);
1✔
241
            }
242
            return (T) listener.getBytes();
1✔
243
        }
244
    }
245

246
    @Override
247
    public <T> T applyZSet(RedisInputStream in, int version) throws IOException {
248
        DefaultRawByteListener listener = new DefaultRawByteListener((byte) RDB_TYPE_ZSET, version);
1✔
249
        replicator.addRawByteListener(listener);
1✔
250
        try {
251
            SkipRdbParser skipParser = new SkipRdbParser(in);
1✔
252
            long len = skipParser.rdbLoadLen().len;
1✔
253
            while (len > 0) {
1✔
254
                skipParser.rdbLoadEncodedStringObject();
1✔
255
                skipParser.rdbLoadDoubleValue();
1✔
256
                len--;
1✔
257
            }
258
        } finally {
259
            replicator.removeRawByteListener(listener);
1✔
260
        }
261
        return (T) listener.getBytes();
1✔
262
    }
263

264
    @Override
265
    public <T> T applyZSet2(RedisInputStream in, int version) throws IOException {
266
        if (this.version != -1 && this.version < 8 /* since redis rdb version 8 */) {
1✔
267
            // downgrade to RDB_TYPE_ZSET
268
            BaseRdbParser parser = new BaseRdbParser(in);
1✔
269
            BaseRdbEncoder encoder = new BaseRdbEncoder();
1✔
270
            DefaultRawByteListener listener = new DefaultRawByteListener((byte) RDB_TYPE_ZSET, version);
1✔
271
            try (ByteBufferOutputStream out = new ByteBufferOutputStream(size)) {
1✔
272
                long len = parser.rdbLoadLen().len;
1✔
273
                listener.handle(encoder.rdbSaveLen(len));
1✔
274
                while (len > 0) {
1✔
275
                    ByteArray element = parser.rdbLoadEncodedStringObject();
1✔
276
                    encoder.rdbGenericSaveStringObject(element, out);
1✔
277
                    double score = parser.rdbLoadBinaryDoubleValue();
1✔
278
                    encoder.rdbSaveDoubleValue(score, out);
1✔
279
                    len--;
1✔
280
                }
1✔
281
                listener.handle(out.toByteBuffer());
1✔
282
                return (T) listener.getBytes();
1✔
283
            }
284
        } else {
285
            DefaultRawByteListener listener = new DefaultRawByteListener((byte) RDB_TYPE_ZSET_2, version);
1✔
286
            replicator.addRawByteListener(listener);
1✔
287
            try {
288
                SkipRdbParser skipParser = new SkipRdbParser(in);
1✔
289
                long len = skipParser.rdbLoadLen().len;
1✔
290
                while (len > 0) {
1✔
291
                    skipParser.rdbLoadEncodedStringObject();
1✔
292
                    skipParser.rdbLoadBinaryDoubleValue();
1✔
293
                    len--;
1✔
294
                }
295
            } finally {
296
                replicator.removeRawByteListener(listener);
1✔
297
            }
298
            return (T) listener.getBytes();
1✔
299
        }
300
    }
301

302
    @Override
303
    public <T> T applyHash(RedisInputStream in, int version) throws IOException {
304
        DefaultRawByteListener listener = new DefaultRawByteListener((byte) RDB_TYPE_HASH, version);
1✔
305
        replicator.addRawByteListener(listener);
1✔
306
        try {
307
            SkipRdbParser skipParser = new SkipRdbParser(in);
1✔
308
            long len = skipParser.rdbLoadLen().len;
1✔
309
            while (len > 0) {
1✔
310
                skipParser.rdbLoadEncodedStringObject();
1✔
311
                skipParser.rdbLoadEncodedStringObject();
1✔
312
                len--;
1✔
313
            }
314
        } finally {
315
            replicator.removeRawByteListener(listener);
1✔
316
        }
317
        return (T) listener.getBytes();
1✔
318
    }
319

320
    @Override
321
    public <T> T applyHashZipMap(RedisInputStream in, int version) throws IOException {
322
        DefaultRawByteListener listener = new DefaultRawByteListener((byte) RDB_TYPE_HASH_ZIPMAP, version);
1✔
323
        replicator.addRawByteListener(listener);
1✔
324
        try {
325
            new SkipRdbParser(in).rdbLoadPlainStringObject();
1✔
326
        } finally {
327
            replicator.removeRawByteListener(listener);
1✔
328
        }
329
        return (T) listener.getBytes();
1✔
330
    }
331

332
    @Override
333
    public <T> T applyListZipList(RedisInputStream in, int version) throws IOException {
334
        DefaultRawByteListener listener = new DefaultRawByteListener((byte) RDB_TYPE_LIST_ZIPLIST, version);
1✔
335
        replicator.addRawByteListener(listener);
1✔
336
        try {
337
            new SkipRdbParser(in).rdbLoadPlainStringObject();
1✔
338
        } finally {
339
            replicator.removeRawByteListener(listener);
1✔
340
        }
341
        return (T) listener.getBytes();
1✔
342
    }
343

344
    @Override
345
    public <T> T applySetIntSet(RedisInputStream in, int version) throws IOException {
346
        DefaultRawByteListener listener = new DefaultRawByteListener((byte) RDB_TYPE_SET_INTSET, version);
1✔
347
        replicator.addRawByteListener(listener);
1✔
348
        try {
349
            new SkipRdbParser(in).rdbLoadPlainStringObject();
1✔
350
        } finally {
351
            replicator.removeRawByteListener(listener);
1✔
352
        }
353
        return (T) listener.getBytes();
1✔
354
    }
355

356
    @Override
357
    public <T> T applyZSetZipList(RedisInputStream in, int version) throws IOException {
358
        DefaultRawByteListener listener = new DefaultRawByteListener((byte) RDB_TYPE_ZSET_ZIPLIST, version);
1✔
359
        replicator.addRawByteListener(listener);
1✔
360
        try {
361
            new SkipRdbParser(in).rdbLoadPlainStringObject();
1✔
362
        } finally {
363
            replicator.removeRawByteListener(listener);
1✔
364
        }
365
        return (T) listener.getBytes();
1✔
366
    }
367
    
368
    @Override
369
    public <T> T applyZSetListPack(RedisInputStream in, int version) throws IOException {
370
        if (this.version != -1 && this.version < 10 /* since redis rdb version 10 */) {
1✔
371
            // downgrade to RDB_TYPE_ZSET
372
            BaseRdbParser parser = new BaseRdbParser(in);
1✔
373
            BaseRdbEncoder encoder = new BaseRdbEncoder();
1✔
374
            DefaultRawByteListener listener = new DefaultRawByteListener((byte) RDB_TYPE_ZSET, version);
1✔
375
            try (ByteBufferOutputStream out = new ByteBufferOutputStream(size)) {
1✔
376
                RedisInputStream listPack = new RedisInputStream(parser.rdbLoadPlainStringObject());
1✔
377
                listPack.skip(4); // total-bytes
1✔
378
                int len = listPack.readInt(2);
1✔
379
                listener.handle(encoder.rdbSaveLen(len / 2));
1✔
380
                while (len > 0) {
1✔
381
                    byte[] element = listPackEntry(listPack);
1✔
382
                    encoder.rdbGenericSaveStringObject(new ByteArray(element), out);
1✔
383
                    len--;
1✔
384
                    double score = Double.valueOf(Strings.toString(listPackEntry(listPack)));
1✔
385
                    encoder.rdbSaveDoubleValue(score, out);
1✔
386
                    len--;
1✔
387
                }
1✔
388
                int lpend = listPack.read(); // lp-end
1✔
389
                if (lpend != 255) {
1✔
390
                    throw new AssertionError("listpack expect 255 but " + lpend);
×
391
                }
392
                listener.handle(out.toByteBuffer());
1✔
393
                return (T) listener.getBytes();
1✔
394
            }
395
        } else {
396
            DefaultRawByteListener listener = new DefaultRawByteListener((byte) RDB_TYPE_ZSET_LISTPACK, version);
1✔
397
            replicator.addRawByteListener(listener);
1✔
398
            try {
399
                SkipRdbParser skipParser = new SkipRdbParser(in);
1✔
400
                skipParser.rdbLoadPlainStringObject();
1✔
401
            } finally {
402
                replicator.removeRawByteListener(listener);
1✔
403
            }
404
            return (T) listener.getBytes();
1✔
405
        }
406
    }
407

408
    @Override
409
    public <T> T applyHashZipList(RedisInputStream in, int version) throws IOException {
410
        DefaultRawByteListener listener = new DefaultRawByteListener((byte) RDB_TYPE_HASH_ZIPLIST, version);
1✔
411
        replicator.addRawByteListener(listener);
1✔
412
        try {
413
            new SkipRdbParser(in).rdbLoadPlainStringObject();
1✔
414
        } finally {
415
            replicator.removeRawByteListener(listener);
1✔
416
        }
417
        return (T) listener.getBytes();
1✔
418
    }
419
    
420
    @Override
421
    public <T> T applyHashListPack(RedisInputStream in, int version) throws IOException {
422
        if (this.version != -1 && this.version < 10 /* since redis rdb version 10 */) {
1✔
423
            // downgrade to RDB_TYPE_HASH
424
            BaseRdbParser parser = new BaseRdbParser(in);
1✔
425
            BaseRdbEncoder encoder = new BaseRdbEncoder();
1✔
426
            DefaultRawByteListener listener = new DefaultRawByteListener((byte) RDB_TYPE_HASH, version);
1✔
427
            try (ByteBufferOutputStream out = new ByteBufferOutputStream(size)) {
1✔
428
                RedisInputStream listPack = new RedisInputStream(parser.rdbLoadPlainStringObject());
1✔
429
                listPack.skip(4); // total-bytes
1✔
430
                int len = listPack.readInt(2);
1✔
431
                listener.handle(encoder.rdbSaveLen(len / 2));
1✔
432
                while (len > 0) {
1✔
433
                    byte[] field = listPackEntry(listPack);
1✔
434
                    encoder.rdbGenericSaveStringObject(new ByteArray(field), out);
1✔
435
                    len--;
1✔
436
                    byte[] value = listPackEntry(listPack);
1✔
437
                    encoder.rdbGenericSaveStringObject(new ByteArray(value), out);
1✔
438
                    len--;
1✔
439
                }
1✔
440
                int lpend = listPack.read(); // lp-end
1✔
441
                if (lpend != 255) {
1✔
442
                    throw new AssertionError("listpack expect 255 but " + lpend);
×
443
                }
444
                listener.handle(out.toByteBuffer());
1✔
445
                return (T) listener.getBytes();
1✔
446
            }
447
        } else {
448
            DefaultRawByteListener listener = new DefaultRawByteListener((byte) RDB_TYPE_HASH_LISTPACK, version);
1✔
449
            replicator.addRawByteListener(listener);
1✔
450
            try {
451
                SkipRdbParser skipParser = new SkipRdbParser(in);
1✔
452
                skipParser.rdbLoadPlainStringObject();
1✔
453
            } finally {
454
                replicator.removeRawByteListener(listener);
1✔
455
            }
456
            return (T) listener.getBytes();
1✔
457
        }
458
    }
459

460
    @Override
461
    public <T> T applyListQuickList(RedisInputStream in, int version) throws IOException {
462
        if (this.version != -1 && this.version < 7 /* since redis rdb version 7 */) {
1✔
463
            // downgrade to RDB_TYPE_LIST
464
            BaseRdbParser parser = new BaseRdbParser(in);
1✔
465
            BaseRdbEncoder encoder = new BaseRdbEncoder();
1✔
466
            try (ByteBufferOutputStream out = new ByteBufferOutputStream(size)) {
1✔
467
                int total = 0;
1✔
468
                long len = parser.rdbLoadLen().len;
1✔
469
                for (long i = 0; i < len; i++) {
1✔
470
                    RedisInputStream stream = new RedisInputStream(parser.rdbGenericLoadStringObject(RDB_LOAD_NONE));
1✔
471
            
472
                    BaseRdbParser.LenHelper.zlbytes(stream); // zlbytes
1✔
473
                    BaseRdbParser.LenHelper.zltail(stream); // zltail
1✔
474
                    int zllen = BaseRdbParser.LenHelper.zllen(stream);
1✔
475
                    for (int j = 0; j < zllen; j++) {
1✔
476
                        byte[] e = BaseRdbParser.StringHelper.zipListEntry(stream);
1✔
477
                        encoder.rdbGenericSaveStringObject(new ByteArray(e), out);
1✔
478
                        total++;
1✔
479
                    }
480
                    int zlend = BaseRdbParser.LenHelper.zlend(stream);
1✔
481
                    if (zlend != 255) {
1✔
482
                        throw new AssertionError("zlend expect 255 but " + zlend);
×
483
                    }
484
                }
485
        
486
                DefaultRawByteListener listener = new DefaultRawByteListener((byte) RDB_TYPE_LIST, version);
1✔
487
                listener.handle(encoder.rdbSaveLen(total));
1✔
488
                listener.handle(out.toByteBuffer());
1✔
489
                return (T) listener.getBytes();
1✔
490
            }
491
        } else {
492
            DefaultRawByteListener listener = new DefaultRawByteListener((byte) RDB_TYPE_LIST_QUICKLIST, version);
1✔
493
            replicator.addRawByteListener(listener);
1✔
494
            try {
495
                SkipRdbParser skipParser = new SkipRdbParser(in);
1✔
496
                long len = skipParser.rdbLoadLen().len;
1✔
497
                for (long i = 0; i < len; i++) {
1✔
498
                    skipParser.rdbGenericLoadStringObject();
1✔
499
                }
500
            } finally {
501
                replicator.removeRawByteListener(listener);
1✔
502
            }
503
            return (T) listener.getBytes();
1✔
504
        }
505
    }
506
    
507
    @Override
508
    public <T> T applyListQuickList2(RedisInputStream in, int version) throws IOException {
509
        if (this.version != -1 && this.version < 10 /* since redis rdb version 10 */) {
1✔
510
            // downgrade to RDB_TYPE_LIST
511
            BaseRdbParser parser = new BaseRdbParser(in);
1✔
512
            BaseRdbEncoder encoder = new BaseRdbEncoder();
1✔
513
            try (ByteBufferOutputStream out = new ByteBufferOutputStream(size)) {
1✔
514
                int total = 0;
1✔
515
                long len = parser.rdbLoadLen().len;
1✔
516
                for (long i = 0; i < len; i++) {
1✔
517
                    long container = parser.rdbLoadLen().len;
1✔
518
                    ByteArray bytes = parser.rdbLoadPlainStringObject();
1✔
519
                    if (container == QUICKLIST_NODE_CONTAINER_PLAIN) {
1✔
520
                        encoder.rdbGenericSaveStringObject(new ByteArray(bytes.first()), out);
1✔
521
                        total++;
1✔
522
                    } else if (container == QUICKLIST_NODE_CONTAINER_PACKED) {
1✔
523
                        RedisInputStream listPack = new RedisInputStream(bytes);
1✔
524
                        listPack.skip(4); // total-bytes
1✔
525
                        int innerLen = listPack.readInt(2);
1✔
526
                        for (int j = 0; j < innerLen; j++) {
1✔
527
                            byte[] e = listPackEntry(listPack);
1✔
528
                            encoder.rdbGenericSaveStringObject(new ByteArray(e), out);
1✔
529
                            total++;
1✔
530
                        }
531
                        int lpend = listPack.read(); // lp-end
1✔
532
                        if (lpend != 255) {
1✔
533
                            throw new AssertionError("listpack expect 255 but " + lpend);
×
534
                        }
535
                    } else {
1✔
536
                        throw new UnsupportedOperationException(String.valueOf(container));
×
537
                    }
538
                }
539
            
540
                DefaultRawByteListener listener = new DefaultRawByteListener((byte) RDB_TYPE_LIST, version);
1✔
541
                listener.handle(encoder.rdbSaveLen(total));
1✔
542
                listener.handle(out.toByteBuffer());
1✔
543
                return (T) listener.getBytes();
1✔
544
            }
545
        } else {
546
            DefaultRawByteListener listener = new DefaultRawByteListener((byte) RDB_TYPE_LIST_QUICKLIST_2, version);
1✔
547
            replicator.addRawByteListener(listener);
1✔
548
            try {
549
                SkipRdbParser skipParser = new SkipRdbParser(in);
1✔
550
                long len = skipParser.rdbLoadLen().len;
1✔
551
                for (long i = 0; i < len; i++) {
1✔
552
                    skipParser.rdbLoadLen();
1✔
553
                    skipParser.rdbLoadPlainStringObject();
1✔
554
                }
555
            } finally {
556
                replicator.removeRawByteListener(listener);
1✔
557
            }
558
            return (T) listener.getBytes();
1✔
559
        }
560
    }
561

562
    @Override
563
    public <T> T applyModule(RedisInputStream in, int version) throws IOException {
564
        DefaultRawByteListener listener = new DefaultRawByteListener((byte) RDB_TYPE_MODULE, version);
1✔
565
        replicator.addRawByteListener(listener);
1✔
566
        try {
567
            SkipRdbParser skipParser = new SkipRdbParser(in);
1✔
568
            char[] c = new char[9];
1✔
569
            long moduleid = skipParser.rdbLoadLen().len;
1✔
570
            for (int i = 0; i < c.length; i++) {
1✔
571
                c[i] = MODULE_SET[(int) (moduleid >>> (10 + (c.length - 1 - i) * 6) & 63)];
1✔
572
            }
573
            String moduleName = new String(c);
1✔
574
            int moduleVersion = (int) (moduleid & 1023);
1✔
575
            ModuleParser<? extends Module> moduleParser = lookupModuleParser(moduleName, moduleVersion);
1✔
576
            if (moduleParser == null) {
1✔
577
                throw new NoSuchElementException("module parser[" + moduleName + ", " + moduleVersion + "] not register. rdb type: [RDB_TYPE_MODULE]");
1✔
578
            }
579
            moduleParser.parse(in, 1);
1✔
580
        } finally {
581
            replicator.removeRawByteListener(listener);
1✔
582
        }
583
        return (T) listener.getBytes();
1✔
584
    }
585

586
    @Override
587
    public <T> T applyModule2(RedisInputStream in, int version) throws IOException {
588
        DefaultRawByteListener listener = new DefaultRawByteListener((byte) RDB_TYPE_MODULE_2, version);
1✔
589
        replicator.addRawByteListener(listener);
1✔
590
        try {
591
            BaseRdbParser parser = new BaseRdbParser(in);
1✔
592
            SkipRdbParser skipParser = new SkipRdbParser(in);
1✔
593
            char[] c = new char[9];
1✔
594
            long moduleid = skipParser.rdbLoadLen().len;
1✔
595
            for (int i = 0; i < c.length; i++) {
1✔
596
                c[i] = MODULE_SET[(int) (moduleid >>> (10 + (c.length - 1 - i) * 6) & 63)];
1✔
597
            }
598
            String moduleName = new String(c);
1✔
599
            int moduleVersion = (int) (moduleid & 1023);
1✔
600
            ModuleParser<? extends Module> moduleParser = lookupModuleParser(moduleName, moduleVersion);
1✔
601
            if (moduleParser == null) {
1✔
602
                SkipRdbParser skipRdbParser = new SkipRdbParser(in);
1✔
603
                skipRdbParser.rdbLoadCheckModuleValue();
1✔
604
            } else {
1✔
605
                moduleParser.parse(in, 2);
1✔
606
                long eof = parser.rdbLoadLen().len;
1✔
607
                if (eof != RDB_MODULE_OPCODE_EOF) {
1✔
608
                    throw new UnsupportedOperationException("The RDB file contains module data for the module '" + moduleName + "' that is not terminated by the proper module value EOF marker");
×
609
                }
610
            }
611
        } finally {
612
            replicator.removeRawByteListener(listener);
1✔
613
        }
614
        return (T) listener.getBytes();
1✔
615
    }
616

617
    @Override
618
    @SuppressWarnings("resource")
619
    public <T> T applyStreamListPacks(RedisInputStream in, int version) throws IOException {
620
        DefaultRawByteListener listener = new DefaultRawByteListener((byte) RDB_TYPE_STREAM_LISTPACKS, version);
1✔
621
        replicator.addRawByteListener(listener);
1✔
622
        try {
623
            SkipRdbParser skipParser = new SkipRdbParser(in);
1✔
624
            long listPacks = skipParser.rdbLoadLen().len;
1✔
625
            while (listPacks-- > 0) {
1✔
626
                skipParser.rdbLoadPlainStringObject();
1✔
627
                skipParser.rdbLoadPlainStringObject();
1✔
628
            }
629
            skipParser.rdbLoadLen();
1✔
630
            skipParser.rdbLoadLen();
1✔
631
            skipParser.rdbLoadLen();
1✔
632
            long groupCount = skipParser.rdbLoadLen().len;
1✔
633
            while (groupCount-- > 0) {
1✔
634
                skipParser.rdbLoadPlainStringObject();
1✔
635
                skipParser.rdbLoadLen();
1✔
636
                skipParser.rdbLoadLen();
1✔
637
                long groupPel = skipParser.rdbLoadLen().len;
1✔
638
                while (groupPel-- > 0) {
1✔
639
                    in.skip(16);
1✔
640
                    skipParser.rdbLoadMillisecondTime();
1✔
641
                    skipParser.rdbLoadLen();
1✔
642
                }
643
                long consumerCount = skipParser.rdbLoadLen().len;
1✔
644
                while (consumerCount-- > 0) {
1✔
645
                    skipParser.rdbLoadPlainStringObject();
1✔
646
                    skipParser.rdbLoadMillisecondTime();
1✔
647
                    long consumerPel = skipParser.rdbLoadLen().len;
1✔
648
                    while (consumerPel-- > 0) {
1✔
649
                        in.skip(16);
1✔
650
                    }
651
                }
1✔
652
            }
1✔
653
        } finally {
654
            replicator.removeRawByteListener(listener);
1✔
655
        }
656
        return (T) listener.getBytes();
1✔
657
    }
658
    
659
    @Override
660
    @SuppressWarnings("resource")
661
    public <T> T applyStreamListPacks2(RedisInputStream in, int version) throws IOException {
662
        if (this.version != -1 && this.version < 10 /* since redis rdb version 10 */) {
1✔
663
            // downgrade to RDB_TYPE_STREAM_LISTPACKS
664
            DefaultRawByteListener listener = new DefaultRawByteListener((byte) RDB_TYPE_STREAM_LISTPACKS, version);
1✔
665
            replicator.addRawByteListener(listener);
1✔
666
            try {
667
                SkipRdbParser skipParser = new SkipRdbParser(in);
1✔
668
                long listPacks = skipParser.rdbLoadLen().len;
1✔
669
                while (listPacks-- > 0) {
1✔
670
                    skipParser.rdbLoadPlainStringObject();
1✔
671
                    skipParser.rdbLoadPlainStringObject();
1✔
672
                }
673
                skipParser.rdbLoadLen(); // length
1✔
674
                skipParser.rdbLoadLen(); // lastId
1✔
675
                skipParser.rdbLoadLen(); // lastId
1✔
676
                replicator.removeRawByteListener(listener);
1✔
677
                skipParser.rdbLoadLen(); // firstId
1✔
678
                skipParser.rdbLoadLen(); // firstId
1✔
679
                skipParser.rdbLoadLen(); // maxDeletedEntryId
1✔
680
                skipParser.rdbLoadLen(); // maxDeletedEntryId
1✔
681
                skipParser.rdbLoadLen(); // entriesAdded
1✔
682
                replicator.addRawByteListener(listener);
1✔
683
                long groupCount = skipParser.rdbLoadLen().len;
1✔
684
                while (groupCount-- > 0) {
1✔
685
                    skipParser.rdbLoadPlainStringObject();
1✔
686
                    skipParser.rdbLoadLen();
1✔
687
                    skipParser.rdbLoadLen();
1✔
688
                    replicator.removeRawByteListener(listener);
1✔
689
                    skipParser.rdbLoadLen(); // entriesRead
1✔
690
                    replicator.addRawByteListener(listener);
1✔
691
                    long groupPel = skipParser.rdbLoadLen().len;
1✔
692
                    while (groupPel-- > 0) {
1✔
693
                        in.skip(16);
1✔
694
                        skipParser.rdbLoadMillisecondTime();
1✔
695
                        skipParser.rdbLoadLen();
1✔
696
                    }
697
                    long consumerCount = skipParser.rdbLoadLen().len;
1✔
698
                    while (consumerCount-- > 0) {
1✔
699
                        skipParser.rdbLoadPlainStringObject();
1✔
700
                        skipParser.rdbLoadMillisecondTime();
1✔
701
                        long consumerPel = skipParser.rdbLoadLen().len;
1✔
702
                        while (consumerPel-- > 0) {
1✔
703
                            in.skip(16);
1✔
704
                        }
705
                    }
1✔
706
                }
1✔
707
            } finally {
708
                replicator.removeRawByteListener(listener);
1✔
709
            }
710
            return (T) listener.getBytes();
1✔
711
        } else {
712
            DefaultRawByteListener listener = new DefaultRawByteListener((byte) RDB_TYPE_STREAM_LISTPACKS_2, version);
1✔
713
            replicator.addRawByteListener(listener);
1✔
714
            try {
715
                SkipRdbParser skipParser = new SkipRdbParser(in);
1✔
716
                long listPacks = skipParser.rdbLoadLen().len;
1✔
717
                while (listPacks-- > 0) {
1✔
718
                    skipParser.rdbLoadPlainStringObject();
1✔
719
                    skipParser.rdbLoadPlainStringObject();
1✔
720
                }
721
                skipParser.rdbLoadLen(); // length
1✔
722
                skipParser.rdbLoadLen(); // lastId
1✔
723
                skipParser.rdbLoadLen(); // lastId
1✔
724
                skipParser.rdbLoadLen(); // firstId
1✔
725
                skipParser.rdbLoadLen(); // firstId
1✔
726
                skipParser.rdbLoadLen(); // maxDeletedEntryId
1✔
727
                skipParser.rdbLoadLen(); // maxDeletedEntryId
1✔
728
                skipParser.rdbLoadLen(); // entriesAdded
1✔
729
                long groupCount = skipParser.rdbLoadLen().len;
1✔
730
                while (groupCount-- > 0) {
1✔
731
                    skipParser.rdbLoadPlainStringObject();
1✔
732
                    skipParser.rdbLoadLen();
1✔
733
                    skipParser.rdbLoadLen();
1✔
734
                    skipParser.rdbLoadLen(); // entriesRead
1✔
735
                    long groupPel = skipParser.rdbLoadLen().len;
1✔
736
                    while (groupPel-- > 0) {
1✔
737
                        in.skip(16);
1✔
738
                        skipParser.rdbLoadMillisecondTime();
1✔
739
                        skipParser.rdbLoadLen();
1✔
740
                    }
741
                    long consumerCount = skipParser.rdbLoadLen().len;
1✔
742
                    while (consumerCount-- > 0) {
1✔
743
                        skipParser.rdbLoadPlainStringObject();
1✔
744
                        skipParser.rdbLoadMillisecondTime();
1✔
745
                        long consumerPel = skipParser.rdbLoadLen().len;
1✔
746
                        while (consumerPel-- > 0) {
1✔
747
                            in.skip(16);
1✔
748
                        }
749
                    }
1✔
750
                }
1✔
751
            } finally {
752
                replicator.removeRawByteListener(listener);
1✔
753
            }
754
            return (T) listener.getBytes();
1✔
755
        }
756
    }
757
    
758
    @Override
759
    @SuppressWarnings("resource")
760
    public <T> T applyStreamListPacks3(RedisInputStream in, int version) throws IOException {
761
        if (this.version != -1 && this.version < 11 /* since redis rdb version 11 */) {
1✔
762
            // downgrade to RDB_TYPE_STREAM_LISTPACKS
763
            DefaultRawByteListener listener = new DefaultRawByteListener((byte) RDB_TYPE_STREAM_LISTPACKS, version);
1✔
764
            replicator.addRawByteListener(listener);
1✔
765
            try {
766
                SkipRdbParser skipParser = new SkipRdbParser(in);
1✔
767
                long listPacks = skipParser.rdbLoadLen().len;
1✔
768
                while (listPacks-- > 0) {
1✔
769
                    skipParser.rdbLoadPlainStringObject();
1✔
770
                    skipParser.rdbLoadPlainStringObject();
1✔
771
                }
772
                skipParser.rdbLoadLen(); // length
1✔
773
                skipParser.rdbLoadLen(); // lastId
1✔
774
                skipParser.rdbLoadLen(); // lastId
1✔
775
                replicator.removeRawByteListener(listener);
1✔
776
                skipParser.rdbLoadLen(); // firstId
1✔
777
                skipParser.rdbLoadLen(); // firstId
1✔
778
                skipParser.rdbLoadLen(); // maxDeletedEntryId
1✔
779
                skipParser.rdbLoadLen(); // maxDeletedEntryId
1✔
780
                skipParser.rdbLoadLen(); // entriesAdded
1✔
781
                replicator.addRawByteListener(listener);
1✔
782
                long groupCount = skipParser.rdbLoadLen().len;
1✔
783
                while (groupCount-- > 0) {
1✔
784
                    skipParser.rdbLoadPlainStringObject();
1✔
785
                    skipParser.rdbLoadLen();
1✔
786
                    skipParser.rdbLoadLen();
1✔
787
                    replicator.removeRawByteListener(listener);
1✔
788
                    skipParser.rdbLoadLen(); // entriesRead
1✔
789
                    replicator.addRawByteListener(listener);
1✔
790
                    long groupPel = skipParser.rdbLoadLen().len;
1✔
791
                    while (groupPel-- > 0) {
1✔
792
                        in.skip(16);
1✔
793
                        skipParser.rdbLoadMillisecondTime();
1✔
794
                        skipParser.rdbLoadLen();
1✔
795
                    }
796
                    long consumerCount = skipParser.rdbLoadLen().len;
1✔
797
                    while (consumerCount-- > 0) {
1✔
798
                        skipParser.rdbLoadPlainStringObject();
1✔
799
                        skipParser.rdbLoadMillisecondTime(); // seenTime
1✔
800
                        replicator.removeRawByteListener(listener);
1✔
801
                        skipParser.rdbLoadMillisecondTime(); // activeTime
1✔
802
                        replicator.addRawByteListener(listener);
1✔
803
                        long consumerPel = skipParser.rdbLoadLen().len;
1✔
804
                        while (consumerPel-- > 0) {
1✔
805
                            in.skip(16);
1✔
806
                        }
807
                    }
1✔
808
                }
1✔
809
            } finally {
810
                replicator.removeRawByteListener(listener);
1✔
811
            }
812
            return (T) listener.getBytes();
1✔
813
        } else {
814
            DefaultRawByteListener listener = new DefaultRawByteListener((byte) RDB_TYPE_STREAM_LISTPACKS_3, version);
1✔
815
            replicator.addRawByteListener(listener);
1✔
816
            try {
817
                SkipRdbParser skipParser = new SkipRdbParser(in);
1✔
818
                long listPacks = skipParser.rdbLoadLen().len;
1✔
819
                while (listPacks-- > 0) {
1✔
820
                    skipParser.rdbLoadPlainStringObject();
1✔
821
                    skipParser.rdbLoadPlainStringObject();
1✔
822
                }
823
                skipParser.rdbLoadLen(); // length
1✔
824
                skipParser.rdbLoadLen(); // lastId
1✔
825
                skipParser.rdbLoadLen(); // lastId
1✔
826
                skipParser.rdbLoadLen(); // firstId
1✔
827
                skipParser.rdbLoadLen(); // firstId
1✔
828
                skipParser.rdbLoadLen(); // maxDeletedEntryId
1✔
829
                skipParser.rdbLoadLen(); // maxDeletedEntryId
1✔
830
                skipParser.rdbLoadLen(); // entriesAdded
1✔
831
                long groupCount = skipParser.rdbLoadLen().len;
1✔
832
                while (groupCount-- > 0) {
1✔
833
                    skipParser.rdbLoadPlainStringObject();
1✔
834
                    skipParser.rdbLoadLen();
1✔
835
                    skipParser.rdbLoadLen();
1✔
836
                    skipParser.rdbLoadLen(); // entriesRead
1✔
837
                    long groupPel = skipParser.rdbLoadLen().len;
1✔
838
                    while (groupPel-- > 0) {
1✔
839
                        in.skip(16);
1✔
840
                        skipParser.rdbLoadMillisecondTime();
1✔
841
                        skipParser.rdbLoadLen();
1✔
842
                    }
843
                    long consumerCount = skipParser.rdbLoadLen().len;
1✔
844
                    while (consumerCount-- > 0) {
1✔
845
                        skipParser.rdbLoadPlainStringObject();
1✔
846
                        skipParser.rdbLoadMillisecondTime(); // seenTime
1✔
847
                        skipParser.rdbLoadMillisecondTime(); // activeTime
1✔
848
                        long consumerPel = skipParser.rdbLoadLen().len;
1✔
849
                        while (consumerPel-- > 0) {
1✔
850
                            in.skip(16);
1✔
851
                        }
852
                    }
1✔
853
                }
1✔
854
            } finally {
855
                replicator.removeRawByteListener(listener);
1✔
856
            }
857
            return (T) listener.getBytes();
1✔
858
        }
859
    }
860
    
861
    @Override
862
    public <T> T applyHashMetadata(RedisInputStream in, int version) throws IOException{
863
        // TODO
NEW
864
        return null;
×
865
    }
866
    
867
    @Override
868
    public <T> T applyHashListPackEx(RedisInputStream in, int version) throws IOException {
869
        // TODO
NEW
870
        return null;
×
871
    }
872
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc