• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

leonchen83 / redis-replicator / #2155

06 Jun 2025 04:52AM UTC coverage: 71.406% (-0.2%) from 71.609%
#2155

push

chenby
redis 8.0

3 of 33 new or added lines in 14 files covered. (9.09%)

3 existing lines in 2 files now uncovered.

6635 of 9292 relevant lines covered (71.41%)

0.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.78
/src/main/java/com/moilioncircle/redis/replicator/rdb/DefaultRdbValueVisitor.java
1

2
package com.moilioncircle.redis.replicator.rdb;
3

4
import static com.moilioncircle.redis.replicator.Constants.MODULE_SET;
5
import static com.moilioncircle.redis.replicator.Constants.QUICKLIST_NODE_CONTAINER_PACKED;
6
import static com.moilioncircle.redis.replicator.Constants.QUICKLIST_NODE_CONTAINER_PLAIN;
7
import static com.moilioncircle.redis.replicator.Constants.RDB_LOAD_NONE;
8
import static com.moilioncircle.redis.replicator.Constants.RDB_MODULE_OPCODE_EOF;
9
import static com.moilioncircle.redis.replicator.Constants.STREAM_ITEM_FLAG_DELETED;
10
import static com.moilioncircle.redis.replicator.Constants.STREAM_ITEM_FLAG_SAMEFIELDS;
11
import static com.moilioncircle.redis.replicator.rdb.BaseRdbParser.StringHelper.listPackEntry;
12

13
import java.io.IOException;
14
import java.util.ArrayList;
15
import java.util.LinkedHashSet;
16
import java.util.List;
17
import java.util.Map;
18
import java.util.NavigableMap;
19
import java.util.NoSuchElementException;
20
import java.util.Set;
21
import java.util.TreeMap;
22

23
import org.slf4j.Logger;
24
import org.slf4j.LoggerFactory;
25

26
import com.moilioncircle.redis.replicator.Constants;
27
import com.moilioncircle.redis.replicator.Replicator;
28
import com.moilioncircle.redis.replicator.io.RedisInputStream;
29
import com.moilioncircle.redis.replicator.rdb.datatype.Function;
30
import com.moilioncircle.redis.replicator.rdb.datatype.Module;
31
import com.moilioncircle.redis.replicator.rdb.datatype.Stream;
32
import com.moilioncircle.redis.replicator.rdb.datatype.ZSetEntry;
33
import com.moilioncircle.redis.replicator.rdb.module.ModuleParser;
34
import com.moilioncircle.redis.replicator.rdb.skip.SkipRdbParser;
35
import com.moilioncircle.redis.replicator.util.ByteArray;
36
import com.moilioncircle.redis.replicator.util.ByteArrayList;
37
import com.moilioncircle.redis.replicator.util.ByteArrayMap;
38
import com.moilioncircle.redis.replicator.util.ByteArraySet;
39
import com.moilioncircle.redis.replicator.util.Strings;
40

41
/**
42
 * @author Leon Chen
43
 * @since 3.1.0
44
 */
45
@SuppressWarnings({"unchecked", "deprecation"})
46
public class DefaultRdbValueVisitor extends RdbValueVisitor {
47

48
    protected static final Logger logger = LoggerFactory.getLogger(DefaultRdbValueVisitor.class);
1✔
49

50
    protected final Replicator replicator;
51

52
    public DefaultRdbValueVisitor(final Replicator replicator) {
1✔
53
        this.replicator = replicator;
1✔
54
    }
1✔
55
    
56
    @Override
57
    public <T> T applyFunction(RedisInputStream in, int version) throws IOException {
58
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
59
        Function function = new Function();
1✔
60
        function.setName(parser.rdbLoadPlainStringObject().first());
1✔
61
        function.setEngineName(parser.rdbLoadPlainStringObject().first());
1✔
62
        long hasDesc = parser.rdbLoadLen().len;
1✔
63
        if (hasDesc == 1) {
1✔
64
            function.setDescription(parser.rdbLoadPlainStringObject().first());
1✔
65
        }
66
        function.setCode(parser.rdbLoadPlainStringObject().first());
1✔
67
        return (T) function;
1✔
68
    }
69
    
70
    @Override
71
    public <T> T applyFunction2(RedisInputStream in, int version) throws IOException {
72
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
73
        Function function = new Function();
1✔
74
        function.setCode(parser.rdbLoadPlainStringObject().first());
1✔
75
        return (T) function;
1✔
76
    }
77

78
    @Override
79
    public <T> T applyString(RedisInputStream in, int version) throws IOException {
80
        /*
81
         * |       <content>       |
82
         * |    string contents    |
83
         */
84
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
85

86
        byte[] val = parser.rdbLoadEncodedStringObject().first();
1✔
87
        return (T) val;
1✔
88
    }
89

90
    @Override
91
    public <T> T applyList(RedisInputStream in, int version) throws IOException {
92
        /*
93
         * |    <len>     |       <content>       |
94
         * | 1 or 5 bytes |    string contents    |
95
         */
96
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
97

98
        long len = parser.rdbLoadLen().len;
1✔
99
        List<byte[]> list = new ByteArrayList();
1✔
100
        while (len > 0) {
1✔
101
            byte[] element = parser.rdbLoadEncodedStringObject().first();
1✔
102
            list.add(element);
1✔
103
            len--;
1✔
104
        }
1✔
105
        return (T) list;
1✔
106
    }
107

108
    @Override
109
    public <T> T applySet(RedisInputStream in, int version) throws IOException {
110
        /*
111
         * |    <len>     |       <content>       |
112
         * | 1 or 5 bytes |    string contents    |
113
         */
114
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
115

116
        long len = parser.rdbLoadLen().len;
1✔
117
        Set<byte[]> set = new ByteArraySet();
1✔
118
        while (len > 0) {
1✔
119
            byte[] element = parser.rdbLoadEncodedStringObject().first();
1✔
120
            set.add(element);
1✔
121
            len--;
1✔
122
        }
1✔
123
        return (T) set;
1✔
124
    }
125
    
126
    @Override
127
    public <T> T applySetListPack(RedisInputStream in, int version) throws IOException {
128
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
129
    
130
        RedisInputStream listPack = new RedisInputStream(parser.rdbLoadPlainStringObject());
1✔
131
        Set<byte[]> set = new ByteArraySet();
1✔
132
        listPack.skip(4); // total-bytes
1✔
133
        int len = listPack.readInt(2);
1✔
134
        while (len > 0) {
1✔
135
            byte[] element = listPackEntry(listPack);
1✔
136
            set.add(element);
1✔
137
            len--;
1✔
138
        }
1✔
139
        int lpend = listPack.read(); // lp-end
1✔
140
        if (lpend != 255) {
1✔
141
            throw new AssertionError("listpack expect 255 but " + lpend);
×
142
        }
143
        return (T) set;
1✔
144
    }
145

146
    @Override
147
    public <T> T applyZSet(RedisInputStream in, int version) throws IOException {
148
        /*
149
         * |    <len>     |       <content>       |        <score>       |
150
         * | 1 or 5 bytes |    string contents    |    double content    |
151
         */
152
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
153

154
        long len = parser.rdbLoadLen().len;
1✔
155
        Set<ZSetEntry> zset = new LinkedHashSet<>();
1✔
156
        while (len > 0) {
1✔
157
            byte[] element = parser.rdbLoadEncodedStringObject().first();
1✔
158
            double score = parser.rdbLoadDoubleValue();
1✔
159
            zset.add(new ZSetEntry(element, score));
1✔
160
            len--;
1✔
161
        }
1✔
162
        return (T) zset;
1✔
163
    }
164

165
    @Override
166
    public <T> T applyZSet2(RedisInputStream in, int version) throws IOException {
167
        /*
168
         * |    <len>     |       <content>       |        <score>       |
169
         * | 1 or 5 bytes |    string contents    |    binary double     |
170
         */
171
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
172

173
        /* rdb version 8*/
174
        long len = parser.rdbLoadLen().len;
1✔
175
        Set<ZSetEntry> zset = new LinkedHashSet<>();
1✔
176
        while (len > 0) {
1✔
177
            byte[] element = parser.rdbLoadEncodedStringObject().first();
1✔
178
            double score = parser.rdbLoadBinaryDoubleValue();
1✔
179
            zset.add(new ZSetEntry(element, score));
1✔
180
            len--;
1✔
181
        }
1✔
182
        return (T) zset;
1✔
183
    }
184

185
    @Override
186
    public <T> T applyHash(RedisInputStream in, int version) throws IOException {
187
        /*
188
         * |    <len>     |       <content>       |
189
         * | 1 or 5 bytes |    string contents    |
190
         */
191
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
192

193
        long len = parser.rdbLoadLen().len;
1✔
194
        ByteArrayMap map = new ByteArrayMap();
1✔
195
        while (len > 0) {
1✔
196
            byte[] field = parser.rdbLoadEncodedStringObject().first();
1✔
197
            byte[] value = parser.rdbLoadEncodedStringObject().first();
1✔
198
            map.put(field, value);
1✔
199
            len--;
1✔
200
        }
1✔
201
        return (T) map;
1✔
202
    }
203

204
    @Override
205
    public <T> T applyHashZipMap(RedisInputStream in, int version) throws IOException {
206
        /*
207
         * |<zmlen> |   <len>     |"foo"    |    <len>   | <free> |   "bar" |<zmend> |
208
         * | 1 byte | 1 or 5 byte | content |1 or 5 byte | 1 byte | content | 1 byte |
209
         */
210
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
211

212
        RedisInputStream stream = new RedisInputStream(parser.rdbLoadPlainStringObject());
1✔
213
        ByteArrayMap map = new ByteArrayMap();
1✔
214
        BaseRdbParser.LenHelper.zmlen(stream); // zmlen
1✔
215
        while (true) {
216
            int zmEleLen = BaseRdbParser.LenHelper.zmElementLen(stream);
1✔
217
            if (zmEleLen == 255) {
1✔
218
                return (T) map;
1✔
219
            }
220
            byte[] field = BaseRdbParser.StringHelper.bytes(stream, zmEleLen);
1✔
221
            zmEleLen = BaseRdbParser.LenHelper.zmElementLen(stream);
1✔
222
            if (zmEleLen == 255) {
1✔
223
                //value is null
224
                map.put(field, null);
×
225
                return (T) map;
×
226
            }
227
            int free = BaseRdbParser.LenHelper.free(stream);
1✔
228
            byte[] value = BaseRdbParser.StringHelper.bytes(stream, zmEleLen);
1✔
229
            BaseRdbParser.StringHelper.skip(stream, free);
1✔
230
            map.put(field, value);
1✔
231
        }
1✔
232
    }
233

234
    @Override
235
    public <T> T applyListZipList(RedisInputStream in, int version) throws IOException {
236
        /*
237
         * |<zlbytes>| <zltail>| <zllen>| <entry> ...<entry> | <zlend>|
238
         * | 4 bytes | 4 bytes | 2bytes | zipListEntry ...   | 1byte  |
239
         */
240
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
241

242
        RedisInputStream stream = new RedisInputStream(parser.rdbLoadPlainStringObject());
1✔
243
        List<byte[]> list = new ByteArrayList();
1✔
244
        BaseRdbParser.LenHelper.zlbytes(stream); // zlbytes
1✔
245
        BaseRdbParser.LenHelper.zltail(stream); // zltail
1✔
246
        int zllen = BaseRdbParser.LenHelper.zllen(stream);
1✔
247
        for (int i = 0; i < zllen; i++) {
1✔
248
            byte[] e = BaseRdbParser.StringHelper.zipListEntry(stream);
1✔
249
            list.add(e);
1✔
250
        }
251
        int zlend = BaseRdbParser.LenHelper.zlend(stream);
1✔
252
        if (zlend != 255) {
1✔
253
            throw new AssertionError("zlend expect 255 but " + zlend);
×
254
        }
255
        return (T) list;
1✔
256
    }
257

258
    @Override
259
    public <T> T applySetIntSet(RedisInputStream in, int version) throws IOException {
260
        /*
261
         * |<encoding>| <length-of-contents>|              <contents>                            |
262
         * | 4 bytes  |            4 bytes  | 2 bytes element| 4 bytes element | 8 bytes element |
263
         */
264
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
265

266
        RedisInputStream stream = new RedisInputStream(parser.rdbLoadPlainStringObject());
1✔
267
        Set<byte[]> set = new ByteArraySet();
1✔
268
        int encoding = BaseRdbParser.LenHelper.encoding(stream);
1✔
269
        long lenOfContent = BaseRdbParser.LenHelper.lenOfContent(stream);
1✔
270
        for (long i = 0; i < lenOfContent; i++) {
1✔
271
            switch (encoding) {
1✔
272
                case 2:
273
                    String element = String.valueOf(stream.readInt(2));
1✔
274
                    set.add(element.getBytes());
1✔
275
                    break;
1✔
276
                case 4:
277
                    element = String.valueOf(stream.readInt(4));
1✔
278
                    set.add(element.getBytes());
1✔
279
                    break;
1✔
280
                case 8:
281
                    element = String.valueOf(stream.readLong(8));
1✔
282
                    set.add(element.getBytes());
1✔
283
                    break;
1✔
284
                default:
285
                    throw new AssertionError("expect encoding [2,4,8] but:" + encoding);
×
286
            }
287
        }
288
        return (T) set;
1✔
289
    }
290

291
    @Override
292
    public <T> T applyZSetZipList(RedisInputStream in, int version) throws IOException {
293
        /*
294
         * |<zlbytes>| <zltail>| <zllen>| <entry> ...<entry> | <zlend>|
295
         * | 4 bytes | 4 bytes | 2bytes | zipListEntry ...   | 1byte  |
296
         */
297
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
298

299
        RedisInputStream stream = new RedisInputStream(parser.rdbLoadPlainStringObject());
1✔
300
        Set<ZSetEntry> zset = new LinkedHashSet<>();
1✔
301
        BaseRdbParser.LenHelper.zlbytes(stream); // zlbytes
1✔
302
        BaseRdbParser.LenHelper.zltail(stream); // zltail
1✔
303
        int zllen = BaseRdbParser.LenHelper.zllen(stream);
1✔
304
        while (zllen > 0) {
1✔
305
            byte[] element = BaseRdbParser.StringHelper.zipListEntry(stream);
1✔
306
            zllen--;
1✔
307
            double score = Double.valueOf(Strings.toString(BaseRdbParser.StringHelper.zipListEntry(stream)));
1✔
308
            zllen--;
1✔
309
            zset.add(new ZSetEntry(element, score));
1✔
310
        }
1✔
311
        int zlend = BaseRdbParser.LenHelper.zlend(stream);
1✔
312
        if (zlend != 255) {
1✔
313
            throw new AssertionError("zlend expect 255 but " + zlend);
×
314
        }
315
        return (T) zset;
1✔
316
    }
317
    
318
    @Override
319
    public <T> T applyZSetListPack(RedisInputStream in, int version) throws IOException {
320
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
321

322
        RedisInputStream listPack = new RedisInputStream(parser.rdbLoadPlainStringObject());
1✔
323
        Set<ZSetEntry> zset = new LinkedHashSet<>();
1✔
324
        listPack.skip(4); // total-bytes
1✔
325
        int len = listPack.readInt(2);
1✔
326
        while (len > 0) {
1✔
327
            byte[] element = listPackEntry(listPack);
1✔
328
            len--;
1✔
329
            double score = Double.valueOf(Strings.toString(listPackEntry(listPack)));
1✔
330
            len--;
1✔
331
            zset.add(new ZSetEntry(element, score));
1✔
332
        }
1✔
333
        int lpend = listPack.read(); // lp-end
1✔
334
        if (lpend != 255) {
1✔
335
            throw new AssertionError("listpack expect 255 but " + lpend);
×
336
        }
337
        return (T) zset;
1✔
338
    }
339

340
    @Override
341
    public <T> T applyHashZipList(RedisInputStream in, int version) throws IOException {
342
        /*
343
         * |<zlbytes>| <zltail>| <zllen>| <entry> ...<entry> | <zlend>|
344
         * | 4 bytes | 4 bytes | 2bytes | zipListEntry ...   | 1byte  |
345
         */
346
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
347

348
        RedisInputStream stream = new RedisInputStream(parser.rdbLoadPlainStringObject());
1✔
349
        ByteArrayMap map = new ByteArrayMap();
1✔
350
        BaseRdbParser.LenHelper.zlbytes(stream); // zlbytes
1✔
351
        BaseRdbParser.LenHelper.zltail(stream); // zltail
1✔
352
        int zllen = BaseRdbParser.LenHelper.zllen(stream);
1✔
353
        while (zllen > 0) {
1✔
354
            byte[] field = BaseRdbParser.StringHelper.zipListEntry(stream);
1✔
355
            zllen--;
1✔
356
            byte[] value = BaseRdbParser.StringHelper.zipListEntry(stream);
1✔
357
            zllen--;
1✔
358
            map.put(field, value);
1✔
359
        }
1✔
360
        int zlend = BaseRdbParser.LenHelper.zlend(stream);
1✔
361
        if (zlend != 255) {
1✔
362
            throw new AssertionError("zlend expect 255 but " + zlend);
×
363
        }
364
        return (T) map;
1✔
365
    }
366
    
367
    @Override
368
    public <T> T applyHashListPack(RedisInputStream in, int version) throws IOException {
369
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
370

371
        RedisInputStream listPack = new RedisInputStream(parser.rdbLoadPlainStringObject());
1✔
372
        ByteArrayMap map = new ByteArrayMap();
1✔
373
        listPack.skip(4); // total-bytes
1✔
374
        int len = listPack.readInt(2);
1✔
375
        while (len > 0) {
1✔
376
            byte[] field = listPackEntry(listPack);
1✔
377
            len--;
1✔
378
            byte[] value = listPackEntry(listPack);
1✔
379
            len--;
1✔
380
            map.put(field, value);
1✔
381
        }
1✔
382
        int lpend = listPack.read(); // lp-end
1✔
383
        if (lpend != 255) {
1✔
384
            throw new AssertionError("listpack expect 255 but " + lpend);
×
385
        }
386
        return (T) map;
1✔
387
    }
388

389
    @Override
390
    public <T> T applyListQuickList(RedisInputStream in, int version) throws IOException {
391
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
392

393
        long len = parser.rdbLoadLen().len;
1✔
394
        List<byte[]> list = new ByteArrayList();
1✔
395
        for (long i = 0; i < len; i++) {
1✔
396
            RedisInputStream stream = new RedisInputStream(parser.rdbGenericLoadStringObject(RDB_LOAD_NONE));
1✔
397

398
            BaseRdbParser.LenHelper.zlbytes(stream); // zlbytes
1✔
399
            BaseRdbParser.LenHelper.zltail(stream); // zltail
1✔
400
            int zllen = BaseRdbParser.LenHelper.zllen(stream);
1✔
401
            for (int j = 0; j < zllen; j++) {
1✔
402
                byte[] e = BaseRdbParser.StringHelper.zipListEntry(stream);
1✔
403
                list.add(e);
1✔
404
            }
405
            int zlend = BaseRdbParser.LenHelper.zlend(stream);
1✔
406
            if (zlend != 255) {
1✔
407
                throw new AssertionError("zlend expect 255 but " + zlend);
×
408
            }
409
        }
410
        return (T) list;
1✔
411
    }
412
    
413
    @Override
414
    public <T> T applyListQuickList2(RedisInputStream in, int version) throws IOException {
415
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
416

417
        long len = parser.rdbLoadLen().len;
1✔
418
        List<byte[]> list = new ByteArrayList();
1✔
419
        for (long i = 0; i < len; i++) {
1✔
420
            long container = parser.rdbLoadLen().len;
1✔
421
            ByteArray bytes = parser.rdbLoadPlainStringObject();
1✔
422
            if (container == QUICKLIST_NODE_CONTAINER_PLAIN) {
1✔
423
                list.add(bytes.first());
1✔
424
            } else if (container == QUICKLIST_NODE_CONTAINER_PACKED) {
1✔
425
                RedisInputStream listPack = new RedisInputStream(bytes);
1✔
426
                listPack.skip(4); // total-bytes
1✔
427
                int innerLen = listPack.readInt(2);
1✔
428
                for (int j = 0; j < innerLen; j++) {
1✔
429
                    byte[] e = listPackEntry(listPack);
1✔
430
                    list.add(e);
1✔
431
                }
432
                int lpend = listPack.read(); // lp-end
1✔
433
                if (lpend != 255) {
1✔
434
                    throw new AssertionError("listpack expect 255 but " + lpend);
×
435
                }
436
            } else {
1✔
437
                throw new UnsupportedOperationException(String.valueOf(container));
×
438
            }
439
        }
440
        return (T) list;
1✔
441
    }
442

443
    @Override
444
    public <T> T applyModule(RedisInputStream in, int version) throws IOException {
445
        //|6|6|6|6|6|6|6|6|6|10|
446
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
447

448
        char[] c = new char[9];
1✔
449
        long moduleid = parser.rdbLoadLen().len;
1✔
450
        for (int i = 0; i < c.length; i++) {
1✔
451
            c[i] = MODULE_SET[(int) (moduleid >>> (10 + (c.length - 1 - i) * 6) & 63)];
1✔
452
        }
453
        String moduleName = new String(c);
1✔
454
        int moduleVersion = (int) (moduleid & 1023);
1✔
455
        ModuleParser<? extends Module> moduleParser = lookupModuleParser(moduleName, moduleVersion);
1✔
456
        if (moduleParser == null) {
1✔
457
            throw new NoSuchElementException("module parser[" + moduleName + ", " + moduleVersion + "] not register. rdb type: [RDB_TYPE_MODULE]");
1✔
458
        }
459
        Module module = moduleParser.parse(in, 1);
1✔
460
        return (T) module;
1✔
461
    }
462

463
    @Override
464
    public <T> T applyModule2(RedisInputStream in, int version) throws IOException {
465
        //|6|6|6|6|6|6|6|6|6|10|
466
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
467

468
        char[] c = new char[9];
1✔
469
        long moduleid = parser.rdbLoadLen().len;
1✔
470
        for (int i = 0; i < c.length; i++) {
1✔
471
            c[i] = MODULE_SET[(int) (moduleid >>> (10 + (c.length - 1 - i) * 6) & 63)];
1✔
472
        }
473
        String moduleName = new String(c);
1✔
474
        int moduleVersion = (int) (moduleid & 1023);
1✔
475
        ModuleParser<? extends Module> moduleParser = lookupModuleParser(moduleName, moduleVersion);
1✔
476
        Module module = null;
1✔
477
        if (moduleParser == null) {
1✔
478
            logger.warn("module parser[{}, {}] not register. rdb type: [RDB_TYPE_MODULE_2]. module parse skipped.", moduleName, moduleVersion);
1✔
479
            SkipRdbParser skipRdbParser = new SkipRdbParser(in);
1✔
480
            skipRdbParser.rdbLoadCheckModuleValue();
1✔
481
        } else {
1✔
482
            module = moduleParser.parse(in, 2);
1✔
483
            long eof = parser.rdbLoadLen().len;
1✔
484
            if (eof != RDB_MODULE_OPCODE_EOF) {
1✔
485
                throw new UnsupportedOperationException("The RDB file contains module data for the module '" + moduleName + "' that is not terminated by the proper module value EOF marker");
×
486
            }
487
        }
488
        return (T) module;
1✔
489
    }
490

491
    protected ModuleParser<? extends Module> lookupModuleParser(String moduleName, int moduleVersion) {
492
        return replicator.getModuleParser(moduleName, moduleVersion);
1✔
493
    }
494

495
    @Override
496
    public <T> T applyStreamListPacks(RedisInputStream in, int version) throws IOException {
497
        return applyStreamListPacks(in, version, Constants.RDB_TYPE_STREAM_LISTPACKS);
1✔
498
    }
499
    
500
    @Override
501
    public <T> T applyStreamListPacks2(RedisInputStream in, int version) throws IOException {
502
        return applyStreamListPacks(in, version, Constants.RDB_TYPE_STREAM_LISTPACKS_2);
1✔
503
    }
504
    
505
    @Override
506
    public <T> T applyStreamListPacks3(RedisInputStream in, int version) throws IOException {
507
        return applyStreamListPacks(in, version, Constants.RDB_TYPE_STREAM_LISTPACKS_3);
1✔
508
    }
509
    
510
    @SuppressWarnings("resource")
511
    protected <T> T applyStreamListPacks(RedisInputStream in, int version, int type) throws IOException {
512
        BaseRdbParser parser = new BaseRdbParser(in);
1✔
513
    
514
        Stream stream = new Stream();
1✔
515
    
516
        // Entries
517
        NavigableMap<Stream.ID, Stream.Entry> entries = new TreeMap<>(Stream.ID.COMPARATOR);
1✔
518
        long listPacks = parser.rdbLoadLen().len;
1✔
519
        while (listPacks-- > 0) {
1✔
520
            RedisInputStream rawId = new RedisInputStream(parser.rdbLoadPlainStringObject());
1✔
521
            Stream.ID baseId = new Stream.ID(rawId.readLong(8, false), rawId.readLong(8, false));
1✔
522
            RedisInputStream listPack = new RedisInputStream(parser.rdbLoadPlainStringObject());
1✔
523
            listPack.skip(4); // total-bytes
1✔
524
            listPack.skip(2); // num-elements
1✔
525
            /*
526
             * Master entry
527
             * +-------+---------+------------+---------+--/--+---------+---------+-+
528
             * | count | deleted | num-fields | field_1 | field_2 | ... | field_N |0|
529
             * +-------+---------+------------+---------+--/--+---------+---------+-+
530
             */
531
            long count = Long.parseLong(Strings.toString(listPackEntry(listPack))); // count
1✔
532
            long deleted = Long.parseLong(Strings.toString(listPackEntry(listPack))); // deleted
1✔
533
            int numFields = Integer.parseInt(Strings.toString(listPackEntry(listPack))); // num-fields
1✔
534
            byte[][] tempFields = new byte[numFields][];
1✔
535
            for (int i = 0; i < numFields; i++) {
1✔
536
                tempFields[i] = listPackEntry(listPack);
1✔
537
            }
538
            listPackEntry(listPack); // 0
1✔
539
        
540
            long total = count + deleted;
1✔
541
            while (total-- > 0) {
1✔
542
                Map<byte[], byte[]> fields = new ByteArrayMap();
1✔
543
                /*
544
                 * FLAG
545
                 * +-----+--------+
546
                 * |flags|entry-id|
547
                 * +-----+--------+
548
                 */
549
                int flag = Integer.parseInt(Strings.toString(listPackEntry(listPack)));
1✔
550
                long ms = Long.parseLong(Strings.toString(listPackEntry(listPack)));
1✔
551
                long seq = Long.parseLong(Strings.toString(listPackEntry(listPack)));
1✔
552
                Stream.ID id = baseId.delta(ms, seq);
1✔
553
                boolean delete = (flag & STREAM_ITEM_FLAG_DELETED) != 0;
1✔
554
                if ((flag & STREAM_ITEM_FLAG_SAMEFIELDS) != 0) {
1✔
555
                    /*
556
                     * SAMEFIELD
557
                     * +-------+-/-+-------+--------+
558
                     * |value-1|...|value-N|lp-count|
559
                     * +-------+-/-+-------+--------+
560
                     */
561
                    for (int i = 0; i < numFields; i++) {
1✔
562
                        byte[] value = listPackEntry(listPack);
1✔
563
                        byte[] field = tempFields[i];
1✔
564
                        fields.put(field, value);
1✔
565
                    }
566
                    entries.put(id, new Stream.Entry(id, delete, fields));
1✔
567
                } else {
568
                    /*
569
                     * NONEFIELD
570
                     * +----------+-------+-------+-/-+-------+-------+--------+
571
                     * |num-fields|field-1|value-1|...|field-N|value-N|lp-count|
572
                     * +----------+-------+-------+-/-+-------+-------+--------+
573
                     */
574
                    numFields = Integer.parseInt(Strings.toString(listPackEntry(listPack)));
1✔
575
                    for (int i = 0; i < numFields; i++) {
1✔
576
                        byte[] field = listPackEntry(listPack);
1✔
577
                        byte[] value = listPackEntry(listPack);
1✔
578
                        fields.put(field, value);
1✔
579
                    }
580
                    entries.put(id, new Stream.Entry(id, delete, fields));
1✔
581
                }
582
                listPackEntry(listPack); // lp-count
1✔
583
            }
1✔
584
            int lpend = listPack.read(); // lp-end
1✔
585
            if (lpend != 255) {
1✔
586
                throw new AssertionError("listpack expect 255 but " + lpend);
×
587
            }
588
        }
1✔
589
    
590
        long length = parser.rdbLoadLen().len;
1✔
591
        Stream.ID lastId = new Stream.ID(parser.rdbLoadLen().len, parser.rdbLoadLen().len);
1✔
592
    
593
        Stream.ID firstId = null;
1✔
594
        Stream.ID maxDeletedEntryId = null;
1✔
595
        Long entriesAdded = null;
1✔
596
        if (type >= Constants.RDB_TYPE_STREAM_LISTPACKS_2) {
1✔
597
            firstId = new Stream.ID(parser.rdbLoadLen().len, parser.rdbLoadLen().len);
1✔
598
            maxDeletedEntryId = new Stream.ID(parser.rdbLoadLen().len, parser.rdbLoadLen().len);
1✔
599
            entriesAdded = parser.rdbLoadLen().len;
1✔
600
        }
601
    
602
        // Group
603
        List<Stream.Group> groups = new ArrayList<>();
1✔
604
        long groupCount = parser.rdbLoadLen().len;
1✔
605
        while (groupCount-- > 0) {
1✔
606
            Stream.Group group = new Stream.Group();
1✔
607
            byte[] groupName = parser.rdbLoadPlainStringObject().first();
1✔
608
            Stream.ID groupLastId = new Stream.ID(parser.rdbLoadLen().len, parser.rdbLoadLen().len);
1✔
609
            Long entriesRead = null;
1✔
610
            if (type >= Constants.RDB_TYPE_STREAM_LISTPACKS_2) {
1✔
611
                entriesRead = parser.rdbLoadLen().len;
1✔
612
            }
613
            
614
            // Group PEL
615
            NavigableMap<Stream.ID, Stream.Nack> groupPendingEntries = new TreeMap<>(Stream.ID.COMPARATOR);
1✔
616
            long globalPel = parser.rdbLoadLen().len;
1✔
617
            while (globalPel-- > 0) {
1✔
618
                Stream.ID rawId = new Stream.ID(in.readLong(8, false), in.readLong(8, false));
1✔
619
                long deliveryTime = parser.rdbLoadMillisecondTime();
1✔
620
                long deliveryCount = parser.rdbLoadLen().len;
1✔
621
                groupPendingEntries.put(rawId, new Stream.Nack(rawId, null, deliveryTime, deliveryCount));
1✔
622
            }
1✔
623
        
624
            // Consumer
625
            List<Stream.Consumer> consumers = new ArrayList<>();
1✔
626
            long consumerCount = parser.rdbLoadLen().len;
1✔
627
            while (consumerCount-- > 0) {
1✔
628
                Stream.Consumer consumer = new Stream.Consumer();
1✔
629
                byte[] consumerName = parser.rdbLoadPlainStringObject().first();
1✔
630
                long seenTime = parser.rdbLoadMillisecondTime();
1✔
631
                long activeTime = -1;
1✔
632
                if (type >= Constants.RDB_TYPE_STREAM_LISTPACKS_3) {
1✔
633
                    activeTime = parser.rdbLoadMillisecondTime();
1✔
634
                }
635
            
636
                // Consumer PEL
637
                NavigableMap<Stream.ID, Stream.Nack> consumerPendingEntries = new TreeMap<>(Stream.ID.COMPARATOR);
1✔
638
                long pel = parser.rdbLoadLen().len;
1✔
639
                while (pel-- > 0) {
1✔
640
                    Stream.ID rawId = new Stream.ID(in.readLong(8, false), in.readLong(8, false));
1✔
641
                    Stream.Nack nack = groupPendingEntries.get(rawId);
1✔
642
                    nack.setConsumer(consumer);
1✔
643
                    consumerPendingEntries.put(rawId, nack);
1✔
644
                }
1✔
645
            
646
                consumer.setName(consumerName);
1✔
647
                consumer.setSeenTime(seenTime);
1✔
648
                if (type >= Constants.RDB_TYPE_STREAM_LISTPACKS_3) {
1✔
649
                    consumer.setActiveTime(activeTime);
1✔
650
                }
651
                consumer.setPendingEntries(consumerPendingEntries);
1✔
652
                consumers.add(consumer);
1✔
653
            }
1✔
654
        
655
            group.setName(groupName);
1✔
656
            group.setLastId(groupLastId);
1✔
657
            if (type >= Constants.RDB_TYPE_STREAM_LISTPACKS_2) {
1✔
658
                group.setEntriesRead(entriesRead);
1✔
659
            }
660
            group.setPendingEntries(groupPendingEntries);
1✔
661
            group.setConsumers(consumers);
1✔
662
            groups.add(group);
1✔
663
        }
1✔
664
    
665
        stream.setLastId(lastId);
1✔
666
        if (type >= Constants.RDB_TYPE_STREAM_LISTPACKS_2) {
1✔
667
            stream.setFirstId(firstId);
1✔
668
            stream.setMaxDeletedEntryId(maxDeletedEntryId);
1✔
669
            stream.setEntriesAdded(entriesAdded);
1✔
670
        }
671
        stream.setEntries(entries);
1✔
672
        stream.setLength(length);
1✔
673
        stream.setGroups(groups);
1✔
674
    
675
        return (T) stream;
1✔
676
    }
677
    
678
    @Override
679
    public <T> T applyHashMetadata(RedisInputStream in, int version) throws IOException{
680
        // TODO
NEW
681
        return null;
×
682
    }
683
    
684
    @Override
685
    public <T> T applyHashListPackEx(RedisInputStream in, int version) throws IOException {
686
        // TODO
NEW
687
        return null;
×
688
    }
689
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc