• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

leonchen83 / redis-replicator / #2187

07 Jun 2025 09:11AM UTC coverage: 69.535% (-0.5%) from 69.989%
#2187

push

leonchen83
redis-8.0

1 of 123 new or added lines in 13 files covered. (0.81%)

1 existing line in 1 file now uncovered.

6642 of 9552 relevant lines covered (69.54%)

0.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.97
/src/main/java/com/moilioncircle/redis/replicator/rdb/DefaultRdbValueVisitor.java
1

2
package com.moilioncircle.redis.replicator.rdb;
3

4
import static com.moilioncircle.redis.replicator.Constants.MODULE_SET;
5
import static com.moilioncircle.redis.replicator.Constants.QUICKLIST_NODE_CONTAINER_PACKED;
6
import static com.moilioncircle.redis.replicator.Constants.QUICKLIST_NODE_CONTAINER_PLAIN;
7
import static com.moilioncircle.redis.replicator.Constants.RDB_LOAD_NONE;
8
import static com.moilioncircle.redis.replicator.Constants.RDB_MODULE_OPCODE_EOF;
9
import static com.moilioncircle.redis.replicator.Constants.STREAM_ITEM_FLAG_DELETED;
10
import static com.moilioncircle.redis.replicator.Constants.STREAM_ITEM_FLAG_SAMEFIELDS;
11
import static com.moilioncircle.redis.replicator.rdb.BaseRdbParser.StringHelper.listPackEntry;
12

13
import java.io.IOException;
14
import java.util.ArrayList;
15
import java.util.LinkedHashSet;
16
import java.util.List;
17
import java.util.Map;
18
import java.util.NavigableMap;
19
import java.util.NoSuchElementException;
20
import java.util.Set;
21
import java.util.TreeMap;
22

23
import org.slf4j.Logger;
24
import org.slf4j.LoggerFactory;
25

26
import com.moilioncircle.redis.replicator.Constants;
27
import com.moilioncircle.redis.replicator.Replicator;
28
import com.moilioncircle.redis.replicator.io.RedisInputStream;
29
import com.moilioncircle.redis.replicator.rdb.datatype.Function;
30
import com.moilioncircle.redis.replicator.rdb.datatype.Module;
31
import com.moilioncircle.redis.replicator.rdb.datatype.Stream;
32
import com.moilioncircle.redis.replicator.rdb.datatype.TTLValue;
33
import com.moilioncircle.redis.replicator.rdb.datatype.ZSetEntry;
34
import com.moilioncircle.redis.replicator.rdb.module.ModuleParser;
35
import com.moilioncircle.redis.replicator.rdb.skip.SkipRdbParser;
36
import com.moilioncircle.redis.replicator.util.ByteArray;
37
import com.moilioncircle.redis.replicator.util.ByteArrayList;
38
import com.moilioncircle.redis.replicator.util.ByteArrayMap;
39
import com.moilioncircle.redis.replicator.util.ByteArraySet;
40
import com.moilioncircle.redis.replicator.util.Strings;
41
import com.moilioncircle.redis.replicator.util.TTLByteArrayMap;
42

43
/**
44
 * @author Leon Chen
45
 * @since 3.1.0
46
 */
47
@SuppressWarnings({"unchecked", "deprecation"})
48
public class DefaultRdbValueVisitor extends RdbValueVisitor {
49

50
    protected static final Logger logger = LoggerFactory.getLogger(DefaultRdbValueVisitor.class);
1✔
51

52
    protected final Replicator replicator;
53

54
    public DefaultRdbValueVisitor(final Replicator replicator) {
1✔
55
        this.replicator = replicator;
1✔
56
    }
1✔
57
    
58
    @Override
59
    public <T> T applyFunction(RedisInputStream in, int version) throws IOException {
60
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
61
        Function function = new Function();
1✔
62
        function.setName(parser.rdbLoadPlainStringObject().first());
1✔
63
        function.setEngineName(parser.rdbLoadPlainStringObject().first());
1✔
64
        long hasDesc = parser.rdbLoadLen().len;
1✔
65
        if (hasDesc == 1) {
1✔
66
            function.setDescription(parser.rdbLoadPlainStringObject().first());
1✔
67
        }
68
        function.setCode(parser.rdbLoadPlainStringObject().first());
1✔
69
        return (T) function;
1✔
70
    }
71
    
72
    @Override
73
    public <T> T applyFunction2(RedisInputStream in, int version) throws IOException {
74
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
75
        Function function = new Function();
1✔
76
        function.setCode(parser.rdbLoadPlainStringObject().first());
1✔
77
        return (T) function;
1✔
78
    }
79

80
    @Override
81
    public <T> T applyString(RedisInputStream in, int version) throws IOException {
82
        /*
83
         * |       <content>       |
84
         * |    string contents    |
85
         */
86
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
87

88
        byte[] val = parser.rdbLoadEncodedStringObject().first();
1✔
89
        return (T) val;
1✔
90
    }
91

92
    @Override
93
    public <T> T applyList(RedisInputStream in, int version) throws IOException {
94
        /*
95
         * |    <len>     |       <content>       |
96
         * | 1 or 5 bytes |    string contents    |
97
         */
98
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
99

100
        long len = parser.rdbLoadLen().len;
1✔
101
        List<byte[]> list = new ByteArrayList();
1✔
102
        while (len > 0) {
1✔
103
            byte[] element = parser.rdbLoadEncodedStringObject().first();
1✔
104
            list.add(element);
1✔
105
            len--;
1✔
106
        }
1✔
107
        return (T) list;
1✔
108
    }
109

110
    @Override
111
    public <T> T applySet(RedisInputStream in, int version) throws IOException {
112
        /*
113
         * |    <len>     |       <content>       |
114
         * | 1 or 5 bytes |    string contents    |
115
         */
116
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
117

118
        long len = parser.rdbLoadLen().len;
1✔
119
        Set<byte[]> set = new ByteArraySet();
1✔
120
        while (len > 0) {
1✔
121
            byte[] element = parser.rdbLoadEncodedStringObject().first();
1✔
122
            set.add(element);
1✔
123
            len--;
1✔
124
        }
1✔
125
        return (T) set;
1✔
126
    }
127
    
128
    @Override
129
    public <T> T applySetListPack(RedisInputStream in, int version) throws IOException {
130
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
131
    
132
        RedisInputStream listPack = new RedisInputStream(parser.rdbLoadPlainStringObject());
1✔
133
        Set<byte[]> set = new ByteArraySet();
1✔
134
        listPack.skip(4); // total-bytes
1✔
135
        int len = listPack.readInt(2);
1✔
136
        while (len > 0) {
1✔
137
            byte[] element = listPackEntry(listPack);
1✔
138
            set.add(element);
1✔
139
            len--;
1✔
140
        }
1✔
141
        int lpend = listPack.read(); // lp-end
1✔
142
        if (lpend != 255) {
1✔
143
            throw new AssertionError("listpack expect 255 but " + lpend);
×
144
        }
145
        return (T) set;
1✔
146
    }
147

148
    @Override
149
    public <T> T applyZSet(RedisInputStream in, int version) throws IOException {
150
        /*
151
         * |    <len>     |       <content>       |        <score>       |
152
         * | 1 or 5 bytes |    string contents    |    double content    |
153
         */
154
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
155

156
        long len = parser.rdbLoadLen().len;
1✔
157
        Set<ZSetEntry> zset = new LinkedHashSet<>();
1✔
158
        while (len > 0) {
1✔
159
            byte[] element = parser.rdbLoadEncodedStringObject().first();
1✔
160
            double score = parser.rdbLoadDoubleValue();
1✔
161
            zset.add(new ZSetEntry(element, score));
1✔
162
            len--;
1✔
163
        }
1✔
164
        return (T) zset;
1✔
165
    }
166

167
    @Override
168
    public <T> T applyZSet2(RedisInputStream in, int version) throws IOException {
169
        /*
170
         * |    <len>     |       <content>       |        <score>       |
171
         * | 1 or 5 bytes |    string contents    |    binary double     |
172
         */
173
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
174

175
        /* rdb version 8*/
176
        long len = parser.rdbLoadLen().len;
1✔
177
        Set<ZSetEntry> zset = new LinkedHashSet<>();
1✔
178
        while (len > 0) {
1✔
179
            byte[] element = parser.rdbLoadEncodedStringObject().first();
1✔
180
            double score = parser.rdbLoadBinaryDoubleValue();
1✔
181
            zset.add(new ZSetEntry(element, score));
1✔
182
            len--;
1✔
183
        }
1✔
184
        return (T) zset;
1✔
185
    }
186

187
    @Override
188
    public <T> T applyHash(RedisInputStream in, int version) throws IOException {
189
        /*
190
         * |    <len>     |       <content>       |
191
         * | 1 or 5 bytes |    string contents    |
192
         */
193
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
194

195
        long len = parser.rdbLoadLen().len;
1✔
196
        ByteArrayMap map = new ByteArrayMap();
1✔
197
        while (len > 0) {
1✔
198
            byte[] field = parser.rdbLoadEncodedStringObject().first();
1✔
199
            byte[] value = parser.rdbLoadEncodedStringObject().first();
1✔
200
            map.put(field, value);
1✔
201
            len--;
1✔
202
        }
1✔
203
        return (T) map;
1✔
204
    }
205

206
    @Override
207
    public <T> T applyHashZipMap(RedisInputStream in, int version) throws IOException {
208
        /*
209
         * |<zmlen> |   <len>     |"foo"    |    <len>   | <free> |   "bar" |<zmend> |
210
         * | 1 byte | 1 or 5 byte | content |1 or 5 byte | 1 byte | content | 1 byte |
211
         */
212
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
213

214
        RedisInputStream stream = new RedisInputStream(parser.rdbLoadPlainStringObject());
1✔
215
        ByteArrayMap map = new ByteArrayMap();
1✔
216
        BaseRdbParser.LenHelper.zmlen(stream); // zmlen
1✔
217
        while (true) {
218
            int zmEleLen = BaseRdbParser.LenHelper.zmElementLen(stream);
1✔
219
            if (zmEleLen == 255) {
1✔
220
                return (T) map;
1✔
221
            }
222
            byte[] field = BaseRdbParser.StringHelper.bytes(stream, zmEleLen);
1✔
223
            zmEleLen = BaseRdbParser.LenHelper.zmElementLen(stream);
1✔
224
            if (zmEleLen == 255) {
1✔
225
                //value is null
226
                map.put(field, null);
×
227
                return (T) map;
×
228
            }
229
            int free = BaseRdbParser.LenHelper.free(stream);
1✔
230
            byte[] value = BaseRdbParser.StringHelper.bytes(stream, zmEleLen);
1✔
231
            BaseRdbParser.StringHelper.skip(stream, free);
1✔
232
            map.put(field, value);
1✔
233
        }
1✔
234
    }
235

236
    @Override
237
    public <T> T applyListZipList(RedisInputStream in, int version) throws IOException {
238
        /*
239
         * |<zlbytes>| <zltail>| <zllen>| <entry> ...<entry> | <zlend>|
240
         * | 4 bytes | 4 bytes | 2bytes | zipListEntry ...   | 1byte  |
241
         */
242
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
243

244
        RedisInputStream stream = new RedisInputStream(parser.rdbLoadPlainStringObject());
1✔
245
        List<byte[]> list = new ByteArrayList();
1✔
246
        BaseRdbParser.LenHelper.zlbytes(stream); // zlbytes
1✔
247
        BaseRdbParser.LenHelper.zltail(stream); // zltail
1✔
248
        int zllen = BaseRdbParser.LenHelper.zllen(stream);
1✔
249
        for (int i = 0; i < zllen; i++) {
1✔
250
            byte[] e = BaseRdbParser.StringHelper.zipListEntry(stream);
1✔
251
            list.add(e);
1✔
252
        }
253
        int zlend = BaseRdbParser.LenHelper.zlend(stream);
1✔
254
        if (zlend != 255) {
1✔
255
            throw new AssertionError("zlend expect 255 but " + zlend);
×
256
        }
257
        return (T) list;
1✔
258
    }
259

260
    @Override
261
    public <T> T applySetIntSet(RedisInputStream in, int version) throws IOException {
262
        /*
263
         * |<encoding>| <length-of-contents>|              <contents>                            |
264
         * | 4 bytes  |            4 bytes  | 2 bytes element| 4 bytes element | 8 bytes element |
265
         */
266
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
267

268
        RedisInputStream stream = new RedisInputStream(parser.rdbLoadPlainStringObject());
1✔
269
        Set<byte[]> set = new ByteArraySet();
1✔
270
        int encoding = BaseRdbParser.LenHelper.encoding(stream);
1✔
271
        long lenOfContent = BaseRdbParser.LenHelper.lenOfContent(stream);
1✔
272
        for (long i = 0; i < lenOfContent; i++) {
1✔
273
            switch (encoding) {
1✔
274
                case 2:
275
                    String element = String.valueOf(stream.readInt(2));
1✔
276
                    set.add(element.getBytes());
1✔
277
                    break;
1✔
278
                case 4:
279
                    element = String.valueOf(stream.readInt(4));
1✔
280
                    set.add(element.getBytes());
1✔
281
                    break;
1✔
282
                case 8:
283
                    element = String.valueOf(stream.readLong(8));
1✔
284
                    set.add(element.getBytes());
1✔
285
                    break;
1✔
286
                default:
287
                    throw new AssertionError("expect encoding [2,4,8] but:" + encoding);
×
288
            }
289
        }
290
        return (T) set;
1✔
291
    }
292

293
    @Override
294
    public <T> T applyZSetZipList(RedisInputStream in, int version) throws IOException {
295
        /*
296
         * |<zlbytes>| <zltail>| <zllen>| <entry> ...<entry> | <zlend>|
297
         * | 4 bytes | 4 bytes | 2bytes | zipListEntry ...   | 1byte  |
298
         */
299
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
300

301
        RedisInputStream stream = new RedisInputStream(parser.rdbLoadPlainStringObject());
1✔
302
        Set<ZSetEntry> zset = new LinkedHashSet<>();
1✔
303
        BaseRdbParser.LenHelper.zlbytes(stream); // zlbytes
1✔
304
        BaseRdbParser.LenHelper.zltail(stream); // zltail
1✔
305
        int zllen = BaseRdbParser.LenHelper.zllen(stream);
1✔
306
        while (zllen > 0) {
1✔
307
            byte[] element = BaseRdbParser.StringHelper.zipListEntry(stream);
1✔
308
            zllen--;
1✔
309
            double score = Double.valueOf(Strings.toString(BaseRdbParser.StringHelper.zipListEntry(stream)));
1✔
310
            zllen--;
1✔
311
            zset.add(new ZSetEntry(element, score));
1✔
312
        }
1✔
313
        int zlend = BaseRdbParser.LenHelper.zlend(stream);
1✔
314
        if (zlend != 255) {
1✔
315
            throw new AssertionError("zlend expect 255 but " + zlend);
×
316
        }
317
        return (T) zset;
1✔
318
    }
319
    
320
    @Override
321
    public <T> T applyZSetListPack(RedisInputStream in, int version) throws IOException {
322
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
323

324
        RedisInputStream listPack = new RedisInputStream(parser.rdbLoadPlainStringObject());
1✔
325
        Set<ZSetEntry> zset = new LinkedHashSet<>();
1✔
326
        listPack.skip(4); // total-bytes
1✔
327
        int len = listPack.readInt(2);
1✔
328
        while (len > 0) {
1✔
329
            byte[] element = listPackEntry(listPack);
1✔
330
            len--;
1✔
331
            double score = Double.valueOf(Strings.toString(listPackEntry(listPack)));
1✔
332
            len--;
1✔
333
            zset.add(new ZSetEntry(element, score));
1✔
334
        }
1✔
335
        int lpend = listPack.read(); // lp-end
1✔
336
        if (lpend != 255) {
1✔
337
            throw new AssertionError("listpack expect 255 but " + lpend);
×
338
        }
339
        return (T) zset;
1✔
340
    }
341

342
    @Override
343
    public <T> T applyHashZipList(RedisInputStream in, int version) throws IOException {
344
        /*
345
         * |<zlbytes>| <zltail>| <zllen>| <entry> ...<entry> | <zlend>|
346
         * | 4 bytes | 4 bytes | 2bytes | zipListEntry ...   | 1byte  |
347
         */
348
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
349

350
        RedisInputStream stream = new RedisInputStream(parser.rdbLoadPlainStringObject());
1✔
351
        ByteArrayMap map = new ByteArrayMap();
1✔
352
        BaseRdbParser.LenHelper.zlbytes(stream); // zlbytes
1✔
353
        BaseRdbParser.LenHelper.zltail(stream); // zltail
1✔
354
        int zllen = BaseRdbParser.LenHelper.zllen(stream);
1✔
355
        while (zllen > 0) {
1✔
356
            byte[] field = BaseRdbParser.StringHelper.zipListEntry(stream);
1✔
357
            zllen--;
1✔
358
            byte[] value = BaseRdbParser.StringHelper.zipListEntry(stream);
1✔
359
            zllen--;
1✔
360
            map.put(field, value);
1✔
361
        }
1✔
362
        int zlend = BaseRdbParser.LenHelper.zlend(stream);
1✔
363
        if (zlend != 255) {
1✔
364
            throw new AssertionError("zlend expect 255 but " + zlend);
×
365
        }
366
        return (T) map;
1✔
367
    }
368
    
369
    @Override
370
    public <T> T applyHashListPack(RedisInputStream in, int version) throws IOException {
371
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
372

373
        RedisInputStream listPack = new RedisInputStream(parser.rdbLoadPlainStringObject());
1✔
374
        ByteArrayMap map = new ByteArrayMap();
1✔
375
        listPack.skip(4); // total-bytes
1✔
376
        int len = listPack.readInt(2);
1✔
377
        while (len > 0) {
1✔
378
            byte[] field = listPackEntry(listPack);
1✔
379
            len--;
1✔
380
            byte[] value = listPackEntry(listPack);
1✔
381
            len--;
1✔
382
            map.put(field, value);
1✔
383
        }
1✔
384
        int lpend = listPack.read(); // lp-end
1✔
385
        if (lpend != 255) {
1✔
386
            throw new AssertionError("listpack expect 255 but " + lpend);
×
387
        }
388
        return (T) map;
1✔
389
    }
390

391
    @Override
392
    public <T> T applyListQuickList(RedisInputStream in, int version) throws IOException {
393
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
394

395
        long len = parser.rdbLoadLen().len;
1✔
396
        List<byte[]> list = new ByteArrayList();
1✔
397
        for (long i = 0; i < len; i++) {
1✔
398
            RedisInputStream stream = new RedisInputStream(parser.rdbGenericLoadStringObject(RDB_LOAD_NONE));
1✔
399

400
            BaseRdbParser.LenHelper.zlbytes(stream); // zlbytes
1✔
401
            BaseRdbParser.LenHelper.zltail(stream); // zltail
1✔
402
            int zllen = BaseRdbParser.LenHelper.zllen(stream);
1✔
403
            for (int j = 0; j < zllen; j++) {
1✔
404
                byte[] e = BaseRdbParser.StringHelper.zipListEntry(stream);
1✔
405
                list.add(e);
1✔
406
            }
407
            int zlend = BaseRdbParser.LenHelper.zlend(stream);
1✔
408
            if (zlend != 255) {
1✔
409
                throw new AssertionError("zlend expect 255 but " + zlend);
×
410
            }
411
        }
412
        return (T) list;
1✔
413
    }
414
    
415
    @Override
416
    public <T> T applyListQuickList2(RedisInputStream in, int version) throws IOException {
417
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
418

419
        long len = parser.rdbLoadLen().len;
1✔
420
        List<byte[]> list = new ByteArrayList();
1✔
421
        for (long i = 0; i < len; i++) {
1✔
422
            long container = parser.rdbLoadLen().len;
1✔
423
            ByteArray bytes = parser.rdbLoadPlainStringObject();
1✔
424
            if (container == QUICKLIST_NODE_CONTAINER_PLAIN) {
1✔
425
                list.add(bytes.first());
1✔
426
            } else if (container == QUICKLIST_NODE_CONTAINER_PACKED) {
1✔
427
                RedisInputStream listPack = new RedisInputStream(bytes);
1✔
428
                listPack.skip(4); // total-bytes
1✔
429
                int innerLen = listPack.readInt(2);
1✔
430
                for (int j = 0; j < innerLen; j++) {
1✔
431
                    byte[] e = listPackEntry(listPack);
1✔
432
                    list.add(e);
1✔
433
                }
434
                int lpend = listPack.read(); // lp-end
1✔
435
                if (lpend != 255) {
1✔
436
                    throw new AssertionError("listpack expect 255 but " + lpend);
×
437
                }
438
            } else {
1✔
439
                throw new UnsupportedOperationException(String.valueOf(container));
×
440
            }
441
        }
442
        return (T) list;
1✔
443
    }
444
    
445
    @Override
446
    public <T> T applyHashMetadata(RedisInputStream in, int version) throws IOException{
NEW
447
        BaseRdbParser parser = new BaseRdbParser(in);
×
NEW
448
        long minExpire = parser.rdbLoadMillisecondTime();
×
NEW
449
        long len = parser.rdbLoadLen().len;
×
NEW
450
        TTLByteArrayMap map = new TTLByteArrayMap();
×
NEW
451
        while (len > 0) {
×
NEW
452
            long ttl = parser.rdbLoadLen().len;
×
NEW
453
            byte[] field = parser.rdbLoadEncodedStringObject().first();
×
NEW
454
            byte[] value = parser.rdbLoadEncodedStringObject().first();
×
NEW
455
            map.put(field, new TTLValue(ttl != 0 ? ttl + minExpire - 1 : null, value));
×
NEW
456
            len--;
×
NEW
457
        }
×
NEW
458
        return (T) map;
×
459
    }
460
    
461
    @Override
462
    public <T> T applyHashListPackEx(RedisInputStream in, int version) throws IOException {
NEW
463
        BaseRdbParser parser = new BaseRdbParser(in);
×
NEW
464
        long minExpire = parser.rdbLoadMillisecondTime();
×
NEW
465
        RedisInputStream listPack = new RedisInputStream(parser.rdbLoadPlainStringObject());
×
NEW
466
        TTLByteArrayMap map = new TTLByteArrayMap();
×
NEW
467
        listPack.skip(4); // total-bytes
×
NEW
468
        int len = listPack.readInt(2);
×
NEW
469
        while (len > 0) {
×
NEW
470
            byte[] field = listPackEntry(listPack);
×
NEW
471
            len--;
×
NEW
472
            byte[] value = listPackEntry(listPack);
×
NEW
473
            len--;
×
NEW
474
            long ttl = Long.parseLong(new String(listPackEntry(listPack)));
×
NEW
475
            len--;
×
NEW
476
            map.put(field, ttl != 0 ? new TTLValue(ttl, value) : new TTLValue(value));
×
NEW
477
        }
×
NEW
478
        int lpend = listPack.read(); // lp-end
×
NEW
479
        if (lpend != 255) {
×
NEW
480
            throw new AssertionError("listpack expect 255 but " + lpend);
×
481
        }
NEW
482
        return (T) map;
×
483
    }
484

485
    @Override
486
    public <T> T applyModule(RedisInputStream in, int version) throws IOException {
487
        //|6|6|6|6|6|6|6|6|6|10|
488
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
489

490
        char[] c = new char[9];
1✔
491
        long moduleid = parser.rdbLoadLen().len;
1✔
492
        for (int i = 0; i < c.length; i++) {
1✔
493
            c[i] = MODULE_SET[(int) (moduleid >>> (10 + (c.length - 1 - i) * 6) & 63)];
1✔
494
        }
495
        String moduleName = new String(c);
1✔
496
        int moduleVersion = (int) (moduleid & 1023);
1✔
497
        ModuleParser<? extends Module> moduleParser = lookupModuleParser(moduleName, moduleVersion);
1✔
498
        if (moduleParser == null) {
1✔
499
            throw new NoSuchElementException("module parser[" + moduleName + ", " + moduleVersion + "] not register. rdb type: [RDB_TYPE_MODULE]");
1✔
500
        }
501
        Module module = moduleParser.parse(in, 1);
1✔
502
        return (T) module;
1✔
503
    }
504

505
    @Override
506
    public <T> T applyModule2(RedisInputStream in, int version) throws IOException {
507
        //|6|6|6|6|6|6|6|6|6|10|
508
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
509

510
        char[] c = new char[9];
1✔
511
        long moduleid = parser.rdbLoadLen().len;
1✔
512
        for (int i = 0; i < c.length; i++) {
1✔
513
            c[i] = MODULE_SET[(int) (moduleid >>> (10 + (c.length - 1 - i) * 6) & 63)];
1✔
514
        }
515
        String moduleName = new String(c);
1✔
516
        int moduleVersion = (int) (moduleid & 1023);
1✔
517
        ModuleParser<? extends Module> moduleParser = lookupModuleParser(moduleName, moduleVersion);
1✔
518
        Module module = null;
1✔
519
        if (moduleParser == null) {
1✔
520
            logger.warn("module parser[{}, {}] not register. rdb type: [RDB_TYPE_MODULE_2]. module parse skipped.", moduleName, moduleVersion);
1✔
521
            SkipRdbParser skipRdbParser = new SkipRdbParser(in);
1✔
522
            skipRdbParser.rdbLoadCheckModuleValue();
1✔
523
        } else {
1✔
524
            module = moduleParser.parse(in, 2);
1✔
525
            long eof = parser.rdbLoadLen().len;
1✔
526
            if (eof != RDB_MODULE_OPCODE_EOF) {
1✔
527
                throw new UnsupportedOperationException("The RDB file contains module data for the module '" + moduleName + "' that is not terminated by the proper module value EOF marker");
×
528
            }
529
        }
530
        return (T) module;
1✔
531
    }
532

533
    protected ModuleParser<? extends Module> lookupModuleParser(String moduleName, int moduleVersion) {
534
        return replicator.getModuleParser(moduleName, moduleVersion);
1✔
535
    }
536

537
    @Override
538
    public <T> T applyStreamListPacks(RedisInputStream in, int version) throws IOException {
539
        return applyStreamListPacks(in, version, Constants.RDB_TYPE_STREAM_LISTPACKS);
1✔
540
    }
541
    
542
    @Override
543
    public <T> T applyStreamListPacks2(RedisInputStream in, int version) throws IOException {
544
        return applyStreamListPacks(in, version, Constants.RDB_TYPE_STREAM_LISTPACKS_2);
1✔
545
    }
546
    
547
    @Override
548
    public <T> T applyStreamListPacks3(RedisInputStream in, int version) throws IOException {
549
        return applyStreamListPacks(in, version, Constants.RDB_TYPE_STREAM_LISTPACKS_3);
1✔
550
    }
551
    
552
    @SuppressWarnings("resource")
553
    protected <T> T applyStreamListPacks(RedisInputStream in, int version, int type) throws IOException {
554
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
555
    
556
        Stream stream = new Stream();
1✔
557
    
558
        // Entries
559
        NavigableMap<Stream.ID, Stream.Entry> entries = new TreeMap<>(Stream.ID.COMPARATOR);
1✔
560
        long listPacks = parser.rdbLoadLen().len;
1✔
561
        while (listPacks-- > 0) {
1✔
562
            RedisInputStream rawId = new RedisInputStream(parser.rdbLoadPlainStringObject());
1✔
563
            Stream.ID baseId = new Stream.ID(rawId.readLong(8, false), rawId.readLong(8, false));
1✔
564
            RedisInputStream listPack = new RedisInputStream(parser.rdbLoadPlainStringObject());
1✔
565
            listPack.skip(4); // total-bytes
1✔
566
            listPack.skip(2); // num-elements
1✔
567
            /*
568
             * Master entry
569
             * +-------+---------+------------+---------+--/--+---------+---------+-+
570
             * | count | deleted | num-fields | field_1 | field_2 | ... | field_N |0|
571
             * +-------+---------+------------+---------+--/--+---------+---------+-+
572
             */
573
            long count = Long.parseLong(Strings.toString(listPackEntry(listPack))); // count
1✔
574
            long deleted = Long.parseLong(Strings.toString(listPackEntry(listPack))); // deleted
1✔
575
            int numFields = Integer.parseInt(Strings.toString(listPackEntry(listPack))); // num-fields
1✔
576
            byte[][] tempFields = new byte[numFields][];
1✔
577
            for (int i = 0; i < numFields; i++) {
1✔
578
                tempFields[i] = listPackEntry(listPack);
1✔
579
            }
580
            listPackEntry(listPack); // 0
1✔
581
        
582
            long total = count + deleted;
1✔
583
            while (total-- > 0) {
1✔
584
                Map<byte[], byte[]> fields = new ByteArrayMap();
1✔
585
                /*
586
                 * FLAG
587
                 * +-----+--------+
588
                 * |flags|entry-id|
589
                 * +-----+--------+
590
                 */
591
                int flag = Integer.parseInt(Strings.toString(listPackEntry(listPack)));
1✔
592
                long ms = Long.parseLong(Strings.toString(listPackEntry(listPack)));
1✔
593
                long seq = Long.parseLong(Strings.toString(listPackEntry(listPack)));
1✔
594
                Stream.ID id = baseId.delta(ms, seq);
1✔
595
                boolean delete = (flag & STREAM_ITEM_FLAG_DELETED) != 0;
1✔
596
                if ((flag & STREAM_ITEM_FLAG_SAMEFIELDS) != 0) {
1✔
597
                    /*
598
                     * SAMEFIELD
599
                     * +-------+-/-+-------+--------+
600
                     * |value-1|...|value-N|lp-count|
601
                     * +-------+-/-+-------+--------+
602
                     */
603
                    for (int i = 0; i < numFields; i++) {
1✔
604
                        byte[] value = listPackEntry(listPack);
1✔
605
                        byte[] field = tempFields[i];
1✔
606
                        fields.put(field, value);
1✔
607
                    }
608
                    entries.put(id, new Stream.Entry(id, delete, fields));
1✔
609
                } else {
610
                    /*
611
                     * NONEFIELD
612
                     * +----------+-------+-------+-/-+-------+-------+--------+
613
                     * |num-fields|field-1|value-1|...|field-N|value-N|lp-count|
614
                     * +----------+-------+-------+-/-+-------+-------+--------+
615
                     */
616
                    numFields = Integer.parseInt(Strings.toString(listPackEntry(listPack)));
1✔
617
                    for (int i = 0; i < numFields; i++) {
1✔
618
                        byte[] field = listPackEntry(listPack);
1✔
619
                        byte[] value = listPackEntry(listPack);
1✔
620
                        fields.put(field, value);
1✔
621
                    }
622
                    entries.put(id, new Stream.Entry(id, delete, fields));
1✔
623
                }
624
                listPackEntry(listPack); // lp-count
1✔
625
            }
1✔
626
            int lpend = listPack.read(); // lp-end
1✔
627
            if (lpend != 255) {
1✔
628
                throw new AssertionError("listpack expect 255 but " + lpend);
×
629
            }
630
        }
1✔
631
    
632
        long length = parser.rdbLoadLen().len;
1✔
633
        Stream.ID lastId = new Stream.ID(parser.rdbLoadLen().len, parser.rdbLoadLen().len);
1✔
634
    
635
        Stream.ID firstId = null;
1✔
636
        Stream.ID maxDeletedEntryId = null;
1✔
637
        Long entriesAdded = null;
1✔
638
        if (type >= Constants.RDB_TYPE_STREAM_LISTPACKS_2) {
1✔
639
            firstId = new Stream.ID(parser.rdbLoadLen().len, parser.rdbLoadLen().len);
1✔
640
            maxDeletedEntryId = new Stream.ID(parser.rdbLoadLen().len, parser.rdbLoadLen().len);
1✔
641
            entriesAdded = parser.rdbLoadLen().len;
1✔
642
        }
643
    
644
        // Group
645
        List<Stream.Group> groups = new ArrayList<>();
1✔
646
        long groupCount = parser.rdbLoadLen().len;
1✔
647
        while (groupCount-- > 0) {
1✔
648
            Stream.Group group = new Stream.Group();
1✔
649
            byte[] groupName = parser.rdbLoadPlainStringObject().first();
1✔
650
            Stream.ID groupLastId = new Stream.ID(parser.rdbLoadLen().len, parser.rdbLoadLen().len);
1✔
651
            Long entriesRead = null;
1✔
652
            if (type >= Constants.RDB_TYPE_STREAM_LISTPACKS_2) {
1✔
653
                entriesRead = parser.rdbLoadLen().len;
1✔
654
            }
655
            
656
            // Group PEL
657
            NavigableMap<Stream.ID, Stream.Nack> groupPendingEntries = new TreeMap<>(Stream.ID.COMPARATOR);
1✔
658
            long globalPel = parser.rdbLoadLen().len;
1✔
659
            while (globalPel-- > 0) {
1✔
660
                Stream.ID rawId = new Stream.ID(in.readLong(8, false), in.readLong(8, false));
1✔
661
                long deliveryTime = parser.rdbLoadMillisecondTime();
1✔
662
                long deliveryCount = parser.rdbLoadLen().len;
1✔
663
                groupPendingEntries.put(rawId, new Stream.Nack(rawId, null, deliveryTime, deliveryCount));
1✔
664
            }
1✔
665
        
666
            // Consumer
667
            List<Stream.Consumer> consumers = new ArrayList<>();
1✔
668
            long consumerCount = parser.rdbLoadLen().len;
1✔
669
            while (consumerCount-- > 0) {
1✔
670
                Stream.Consumer consumer = new Stream.Consumer();
1✔
671
                byte[] consumerName = parser.rdbLoadPlainStringObject().first();
1✔
672
                long seenTime = parser.rdbLoadMillisecondTime();
1✔
673
                long activeTime = -1;
1✔
674
                if (type >= Constants.RDB_TYPE_STREAM_LISTPACKS_3) {
1✔
675
                    activeTime = parser.rdbLoadMillisecondTime();
1✔
676
                }
677
            
678
                // Consumer PEL
679
                NavigableMap<Stream.ID, Stream.Nack> consumerPendingEntries = new TreeMap<>(Stream.ID.COMPARATOR);
1✔
680
                long pel = parser.rdbLoadLen().len;
1✔
681
                while (pel-- > 0) {
1✔
682
                    Stream.ID rawId = new Stream.ID(in.readLong(8, false), in.readLong(8, false));
1✔
683
                    Stream.Nack nack = groupPendingEntries.get(rawId);
1✔
684
                    nack.setConsumer(consumer);
1✔
685
                    consumerPendingEntries.put(rawId, nack);
1✔
686
                }
1✔
687
            
688
                consumer.setName(consumerName);
1✔
689
                consumer.setSeenTime(seenTime);
1✔
690
                if (type >= Constants.RDB_TYPE_STREAM_LISTPACKS_3) {
1✔
691
                    consumer.setActiveTime(activeTime);
1✔
692
                }
693
                consumer.setPendingEntries(consumerPendingEntries);
1✔
694
                consumers.add(consumer);
1✔
695
            }
1✔
696
        
697
            group.setName(groupName);
1✔
698
            group.setLastId(groupLastId);
1✔
699
            if (type >= Constants.RDB_TYPE_STREAM_LISTPACKS_2) {
1✔
700
                group.setEntriesRead(entriesRead);
1✔
701
            }
702
            group.setPendingEntries(groupPendingEntries);
1✔
703
            group.setConsumers(consumers);
1✔
704
            groups.add(group);
1✔
705
        }
1✔
706
    
707
        stream.setLastId(lastId);
1✔
708
        if (type >= Constants.RDB_TYPE_STREAM_LISTPACKS_2) {
1✔
709
            stream.setFirstId(firstId);
1✔
710
            stream.setMaxDeletedEntryId(maxDeletedEntryId);
1✔
711
            stream.setEntriesAdded(entriesAdded);
1✔
712
        }
713
        stream.setEntries(entries);
1✔
714
        stream.setLength(length);
1✔
715
        stream.setGroups(groups);
1✔
716
    
717
        return (T) stream;
1✔
718
    }
719
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc