• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

alibaba / jetcache / #405

16 Apr 2024 05:58AM UTC coverage: 0.0% (-88.9%) from 88.866%
#405

push

areyouok
add encoding to fix coverage report

0 of 5353 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/jetcache-support/jetcache-redis/src/main/java/com/alicp/jetcache/redis/RedisCache.java
1
package com.alicp.jetcache.redis;
2

3
import com.alicp.jetcache.CacheConfig;
4
import com.alicp.jetcache.CacheConfigException;
5
import com.alicp.jetcache.CacheException;
6
import com.alicp.jetcache.CacheGetResult;
7
import com.alicp.jetcache.CacheResult;
8
import com.alicp.jetcache.CacheResultCode;
9
import com.alicp.jetcache.CacheValueHolder;
10
import com.alicp.jetcache.MultiGetResult;
11
import com.alicp.jetcache.external.AbstractExternalCache;
12
import org.slf4j.Logger;
13
import org.slf4j.LoggerFactory;
14
import redis.clients.jedis.ClusterPipeline;
15
import redis.clients.jedis.Connection;
16
import redis.clients.jedis.Jedis;
17
import redis.clients.jedis.JedisCluster;
18
import redis.clients.jedis.JedisPooled;
19
import redis.clients.jedis.Pipeline;
20
import redis.clients.jedis.Response;
21
import redis.clients.jedis.UnifiedJedis;
22
import redis.clients.jedis.commands.KeyBinaryCommands;
23
import redis.clients.jedis.commands.KeyPipelineBinaryCommands;
24
import redis.clients.jedis.commands.StringBinaryCommands;
25
import redis.clients.jedis.commands.StringPipelineBinaryCommands;
26
import redis.clients.jedis.params.SetParams;
27
import redis.clients.jedis.providers.ClusterConnectionProvider;
28
import redis.clients.jedis.util.Pool;
29

30
import java.io.Closeable;
31
import java.lang.reflect.Field;
32
import java.util.ArrayList;
33
import java.util.Arrays;
34
import java.util.Collections;
35
import java.util.HashMap;
36
import java.util.List;
37
import java.util.Map;
38
import java.util.Set;
39
import java.util.concurrent.ThreadLocalRandom;
40
import java.util.concurrent.TimeUnit;
41
import java.util.function.Function;
42
import java.util.stream.Collectors;
43

44
/**
45
 * Created on 2016/10/7.
46
 *
47
 * @author huangli
48
 */
49
public class RedisCache<K, V> extends AbstractExternalCache<K, V> {
50

51
    private static Logger logger = LoggerFactory.getLogger(RedisCache.class);
×
52

53
    protected RedisCacheConfig<K, V> config;
54

55
    Function<Object, byte[]> valueEncoder;
56
    Function<byte[], Object> valueDecoder;
57
    ClusterConnectionProvider provider = null;
×
58

59
    private static ThreadLocalRandom random = ThreadLocalRandom.current();
×
60

61
    public RedisCache(RedisCacheConfig<K, V> config) {
62
        super(config);
×
63
        this.config = config;
×
64
        this.valueEncoder = config.getValueEncoder();
×
65
        this.valueDecoder = config.getValueDecoder();
×
66

67
        if (config.getJedis() == null && config.getJedisPool() == null) {
×
68
            throw new CacheConfigException("no jedis");
×
69
        }
70
        if (config.getJedis() != null && config.getJedisPool() != null) {
×
71
            throw new CacheConfigException("'jedis' and 'jedisPool' can't set simultaneously");
×
72
        }
73
        if (config.getJedis() != null && config.getJedisSlavePools() != null) {
×
74
            throw new CacheConfigException("'jedisSlavePools' should work with 'jedisPool' in RedisCacheConfig");
×
75
        }
76
        if (config.getJedisPool() != null && config.getSlaves() != null) {
×
77
            throw new CacheConfigException("'slaves' should work with 'jedis' in RedisCacheConfig");
×
78
        }
79
        if (config.isReadFromSlave()) {
×
80
            if (slaveCount() == 0) {
×
81
                throw new CacheConfigException("slaves not config");
×
82
            }
83
            if (config.getSlaveReadWeights() == null) {
×
84
                initDefaultWeights();
×
85
            } else if (config.getSlaveReadWeights().length != slaveCount()) {
×
86
                logger.error("length of slaveReadWeights and jedisSlavePools not equals, using default weights");
×
87
                initDefaultWeights();
×
88
            }
89
        }
90
        if (config.isExpireAfterAccess()) {
×
91
            throw new CacheConfigException("expireAfterAccess is not supported");
×
92
        }
93
        UnifiedJedis jedis = config.getJedis();
×
94
        if (jedis != null && jedis instanceof JedisCluster) {
×
95
            try {
96
                Field field = UnifiedJedis.class.getDeclaredField("provider");
×
97
                boolean accessible = field.isAccessible();
×
98
                field.setAccessible(true);
×
99
                provider = (ClusterConnectionProvider) field.get(jedis);
×
100
                field.setAccessible(accessible);
×
101
            } catch (Exception ex) {
×
102
                throw new IllegalStateException("can not get ConnectionProvider from JedisClient", ex);
×
103
            }
×
104
        }
105
    }
×
106

107
    private int slaveCount() {
108
        if (config.getSlaves() != null) {
×
109
            return config.getSlaves().length;
×
110
        }
111
        if (config.getJedisSlavePools() != null) {
×
112
            return config.getJedisSlavePools().length;
×
113
        }
114
        return 0;
×
115
    }
116

117
    private void initDefaultWeights() {
118
        int len = slaveCount();
×
119
        int[] weights = new int[len];
×
120
        Arrays.fill(weights, 100);
×
121
        config.setSlaveReadWeights(weights);
×
122
    }
×
123

124
    @Override
125
    public CacheConfig<K, V> config() {
126
        return config;
×
127
    }
128

129
    @Override
130
    public <T> T unwrap(Class<T> clazz) {
131
        if (UnifiedJedis.class.isAssignableFrom(clazz)) {
×
132
            return (T) config.getJedis();
×
133
        }
134
        if (Pool.class.isAssignableFrom(clazz)) {
×
135
            return (T) config.getJedisPool();
×
136
        }
137
        throw new IllegalArgumentException(clazz.getName());
×
138
    }
139

140
    Object writeCommands() {
141
        return config.getJedis() != null ? config.getJedis() : config.getJedisPool().getResource();
×
142
    }
143

144
    Object readCommands() {
145
        if (!config.isReadFromSlave()) {
×
146
            return writeCommands();
×
147
        }
148
        int[] weights = config.getSlaveReadWeights();
×
149
        int index = randomIndex(weights);
×
150
        if (config.getSlaves() != null) {
×
151
            return config.getSlaves()[index];
×
152
        } else {
153
            return config.getJedisSlavePools()[index].getResource();
×
154
        }
155
    }
156

157
    static int randomIndex(int[] weights) {
158
        int sumOfWeights = 0;
×
159
        for (int w : weights) {
×
160
            sumOfWeights += w;
×
161
        }
162
        int r = random.nextInt(sumOfWeights);
×
163
        int x = 0;
×
164
        for (int i = 0; i < weights.length; i++) {
×
165
            x += weights[i];
×
166
            if (r < x) {
×
167
                return i;
×
168
            }
169
        }
170
        throw new CacheException("assert false");
×
171
    }
172

173
    static void closeJedis(Object maybeJedis) {
174
        if (maybeJedis instanceof Jedis) {
×
175
            close((Jedis) maybeJedis);
×
176
        }
177
    }
×
178

179
    private static void close(Closeable closeable) {
180
        if (closeable == null) {
×
181
            return;
×
182
        }
183
        try {
184
            closeable.close();
×
185
        } catch (Exception e) {
×
186
            logger.warn("close jedis resource error: {}", e.toString());
×
187
        }
×
188
    }
×
189

190
    @Override
191
    protected CacheGetResult<V> do_GET(K key) {
192
        StringBinaryCommands commands = null;
×
193
        try {
194
            byte[] newKey = buildKey(key);
×
195
            commands = (StringBinaryCommands) readCommands();
×
196
            byte[] bytes = commands.get(newKey);
×
197
            if (bytes != null) {
×
198
                CacheValueHolder<V> holder = (CacheValueHolder<V>) valueDecoder.apply(bytes);
×
199
                if (System.currentTimeMillis() >= holder.getExpireTime()) {
×
200
                    return CacheGetResult.EXPIRED_WITHOUT_MSG;
×
201
                }
202
                return new CacheGetResult(CacheResultCode.SUCCESS, null, holder);
×
203
            } else {
204
                return CacheGetResult.NOT_EXISTS_WITHOUT_MSG;
×
205
            }
206
        } catch (Exception ex) {
×
207
            logError("GET", key, ex);
×
208
            return new CacheGetResult(ex);
×
209
        } finally {
210
            closeJedis(commands);
×
211
        }
212
    }
213

214
    @Override
215
    protected MultiGetResult<K, V> do_GET_ALL(Set<? extends K> keys) {
216
        if (keys == null || keys.isEmpty()) {
×
217
            return new MultiGetResult<K, V>(CacheResultCode.SUCCESS, null, Collections.emptyMap());
×
218
        }
219
        // define the result object early to gain statefulFunction feature.
220
        Map<K, CacheGetResult<V>> resultMap = new HashMap<>();
×
221

222
        try {
223
            StringBinaryCommands readCommands = (StringBinaryCommands) readCommands();
×
224
            ArrayList<K> keyList = new ArrayList<K>(keys);
×
225
            byte[][] newKeys = keyList.stream().map(this::buildKey).toArray(byte[][]::new);
×
226

227
            return this.<StringBinaryCommands, StringPipelineBinaryCommands, MultiGetResult<K, V>>doWithPipeline(readCommands, false, (pipeline) -> {
×
228
                List<byte[]> results;
229
                if (pipeline != null) {
×
230
                    List<Response<byte[]>> responseList = new ArrayList<>();
×
231

232
                    for (byte[] newKey : newKeys) {
×
233
                        Response<byte[]> response = pipeline.get(newKey);
×
234
                        responseList.add(response);
×
235
                    }
236

237
                    sync(pipeline);
×
238

239
                    results = responseList.stream().map(Response::get).collect(Collectors.toList());
×
240
                } else {
×
241
                    results = readCommands.mget(newKeys);
×
242
                }
243

244
                for (int i = 0; i < results.size(); i++) {
×
245
                    Object value = results.get(i);
×
246
                    K key = keyList.get(i);
×
247
                    if (value != null) {
×
248
                        CacheValueHolder<V> holder = (CacheValueHolder<V>) valueDecoder.apply((byte[]) value);
×
249
                        if (System.currentTimeMillis() >= holder.getExpireTime()) {
×
250
                            resultMap.put(key, CacheGetResult.EXPIRED_WITHOUT_MSG);
×
251
                        } else {
252
                            CacheGetResult<V> r = new CacheGetResult<V>(CacheResultCode.SUCCESS, null, holder);
×
253
                            resultMap.put(key, r);
×
254
                        }
255
                    } else {
×
256
                        resultMap.put(key, CacheGetResult.NOT_EXISTS_WITHOUT_MSG);
×
257
                    }
258
                }
259

260
                return new MultiGetResult<K, V>(CacheResultCode.SUCCESS, null, resultMap);
×
261
            });
262
        } catch (Exception ex) {
×
263
            logError("GET_ALL", "keys(" + keys.size() + ")", ex);
×
264
            if (!resultMap.isEmpty()) {
×
265
                return new MultiGetResult<K, V>(CacheResultCode.PART_SUCCESS, ex.toString(), resultMap);
×
266
            } else {
267
                return new MultiGetResult<K, V>(ex);
×
268
            }
269
        }
270
    }
271

272
    @Override
273
    protected CacheResult do_PUT(K key, V value, long expireAfterWrite, TimeUnit timeUnit) {
274
        StringBinaryCommands commands = null;
×
275
        try {
276
            CacheValueHolder<V> holder = new CacheValueHolder(value, timeUnit.toMillis(expireAfterWrite));
×
277
            byte[] newKey = buildKey(key);
×
278
            commands = (StringBinaryCommands) writeCommands();
×
279
            String rt = commands.psetex(newKey, timeUnit.toMillis(expireAfterWrite), valueEncoder.apply(holder));
×
280
            if ("OK".equals(rt)) {
×
281
                return CacheResult.SUCCESS_WITHOUT_MSG;
×
282
            } else {
283
                return new CacheResult(CacheResultCode.FAIL, rt);
×
284
            }
285
        } catch (Exception ex) {
×
286
            logError("PUT", key, ex);
×
287
            return new CacheResult(ex);
×
288
        } finally {
289
            closeJedis(commands);
×
290
        }
291
    }
292

293
    @Override
294
    protected CacheResult do_PUT_ALL(Map<? extends K, ? extends V> map, long expireAfterWrite, TimeUnit timeUnit) {
295
        if (map == null || map.isEmpty()) {
×
296
            return CacheResult.SUCCESS_WITHOUT_MSG;
×
297
        }
298
        try {
299
            StringBinaryCommands writeCommands = (StringBinaryCommands) writeCommands();
×
300
            return this.<StringBinaryCommands, StringPipelineBinaryCommands, CacheResult>doWithPipeline(writeCommands, true, pipeline -> {
×
301
                int failCount = 0;
×
302
                List<Response<String>> responses = new ArrayList<>();
×
303
                for (Map.Entry<? extends K, ? extends V> en : map.entrySet()) {
×
304
                    CacheValueHolder<V> holder = new CacheValueHolder(en.getValue(), timeUnit.toMillis(expireAfterWrite));
×
305
                    Response<String> resp = pipeline.psetex(buildKey(en.getKey()), timeUnit.toMillis(expireAfterWrite), valueEncoder.apply(holder));
×
306
                    responses.add(resp);
×
307
                }
×
308

309
                sync(pipeline);
×
310

311
                for (Response<String> resp : responses) {
×
312
                    if (!"OK".equals(resp.get())) {
×
313
                        failCount++;
×
314
                    }
315
                }
×
316
                return failCount == 0 ? CacheResult.SUCCESS_WITHOUT_MSG :
×
317
                        failCount == map.size() ? CacheResult.FAIL_WITHOUT_MSG : CacheResult.PART_SUCCESS_WITHOUT_MSG;
×
318
            });
319
        } catch (Exception ex) {
×
320
            logError("PUT_ALL", "map(" + map.size() + ")", ex);
×
321
            return new CacheResult(ex);
×
322
        }
323
    }
324

325

326
    @Override
327
    protected CacheResult do_REMOVE(K key) {
328
        return REMOVE_impl(key, buildKey(key));
×
329
    }
330

331
    private CacheResult REMOVE_impl(Object key, byte[] newKey) {
332
        KeyBinaryCommands commands = null;
×
333
        try {
334
            commands = (KeyBinaryCommands) writeCommands();
×
335
            Long rt = commands.del(newKey);
×
336
            if (rt == null) {
×
337
                return CacheResult.FAIL_WITHOUT_MSG;
×
338
            } else if (rt == 1) {
×
339
                return CacheResult.SUCCESS_WITHOUT_MSG;
×
340
            } else if (rt == 0) {
×
341
                return new CacheResult(CacheResultCode.NOT_EXISTS, null);
×
342
            } else {
343
                return CacheResult.FAIL_WITHOUT_MSG;
×
344
            }
345
        } catch (Exception ex) {
×
346
            logError("REMOVE", key, ex);
×
347
            return new CacheResult(ex);
×
348
        } finally {
349
            closeJedis(commands);
×
350
        }
351
    }
352

353
    @Override
354
    protected CacheResult do_REMOVE_ALL(Set<? extends K> keys) {
355
        if (keys == null || keys.isEmpty()) {
×
356
            return CacheResult.SUCCESS_WITHOUT_MSG;
×
357
        }
358
        long[] count = new long[1];
×
359
        try {
360
            KeyBinaryCommands writeCommands = (KeyBinaryCommands) writeCommands();
×
361
            byte[][] newKeys = keys.stream().map((k) -> buildKey(k)).toArray((len) -> new byte[keys.size()][]);
×
362
            return this.<KeyBinaryCommands, KeyPipelineBinaryCommands, CacheResult>doWithPipeline(writeCommands, false, (pipeline) -> {
×
363

364
                if (pipeline != null) {
×
365
                    for (byte[] newKey : newKeys) {
×
366
                        pipeline.del(newKey);
×
367
                        count[0]++;
×
368
                    }
369

370
                    sync(pipeline);
×
371
                } else {
372
                    writeCommands.del(newKeys);
×
373
                }
374

375
                return CacheResult.SUCCESS_WITHOUT_MSG;
×
376
            });
377
        } catch (Exception ex) {
×
378
            logError("REMOVE_ALL", "keys(" + keys.size() + ")", ex);
×
379
            if (count[0] > 0) {
×
380
                return new CacheResult(CacheResultCode.PART_SUCCESS, ex.toString());
×
381
            } else {
382
                return new CacheResult(ex);
×
383
            }
384
        }
385
    }
386

387
    @Override
388
    protected CacheResult do_PUT_IF_ABSENT(K key, V value, long expireAfterWrite, TimeUnit timeUnit) {
389
        StringBinaryCommands commands = null;
×
390
        try {
391
            CacheValueHolder<V> holder = new CacheValueHolder(value, timeUnit.toMillis(expireAfterWrite));
×
392
            byte[] newKey = buildKey(key);
×
393
            SetParams params = new SetParams();
×
394
            params.nx()
×
395
                    .px(timeUnit.toMillis(expireAfterWrite));
×
396
            commands = (StringBinaryCommands) writeCommands();
×
397
            String rt = commands.set(newKey, valueEncoder.apply(holder), params);
×
398
            if ("OK".equals(rt)) {
×
399
                return CacheResult.SUCCESS_WITHOUT_MSG;
×
400
            } else if (rt == null) {
×
401
                return CacheResult.EXISTS_WITHOUT_MSG;
×
402
            } else {
403
                return new CacheResult(CacheResultCode.FAIL, rt);
×
404
            }
405
        } catch (Exception ex) {
×
406
            logError("PUT_IF_ABSENT", key, ex);
×
407
            return new CacheResult(ex);
×
408
        } finally {
409
            closeJedis(commands);
×
410
        }
411
    }
412

413
    /**
414
     * Useful function for redis interaction via Pipeline mode. Resources will be auto close. <br/>
415
     * For batch command, such as {@link StringBinaryCommands#mget(byte[]...)}, {@link KeyBinaryCommands#del(byte[]...)}, the performance of pipeline will be worse,
416
     * so choose them instead of pipeline as much as possible in non-cluster clients, See:
417
     * - https://medium.com/@jychen7/redis-get-pipeline-vs-mget-6e41aeaecef
418
     * - https://stackoverflow.com/questions/73992769/redis-del-many-keys-vs-pipeline-are-both-non-blocking
419
     *
420
     * @param client        redisClient
421
     * @param pipelineFirst set as false when only want to use the pipeline on cluster clients.
422
     * @param function      operator function
423
     * @param <C>           client type
424
     * @param <P>           pipeline type
425
     * @param <R>           result type
426
     * @return result
427
     */
428
    @SuppressWarnings("unchecked")
429
    private <C, P, R> R doWithPipeline(C client, boolean pipelineFirst, Function<P, R> function) {
430
        C commands = null;
×
431
        Closeable closeable = null;
×
432
        try {
433
            commands = client;
×
434
            P pipeline = null;
×
435
            // The connection from JedisPooled or JedisCluster needs to be returned to the pool.
436
            if (commands instanceof JedisCluster) {
×
437
                ClusterPipeline clusterPipeline = new ClusterPipeline(provider);
×
438
                closeable = clusterPipeline;
×
439
                pipeline = (P) clusterPipeline;
×
440
            } else if (pipelineFirst) {
×
441
                if (commands instanceof JedisPooled) {
×
442
                    Connection connection = ((JedisPooled) commands).getPool().getResource();
×
443
                    closeable = connection;
×
444
                    pipeline = (P) new Pipeline(connection);
×
445
                } else if (commands instanceof Jedis) {
×
446
                    pipeline = (P) new Pipeline((Jedis) commands);
×
447
                }
448
            }
449

450
            return function.apply(pipeline);
×
451
        } finally {
452
            closeJedis(commands);
×
453
            close(closeable);
×
454
        }
455
    }
456

457
    private <T> void sync(T pipeline) {
458
        if (pipeline instanceof Pipeline) {
×
459
            ((Pipeline) pipeline).sync();
×
460
        } else if (pipeline instanceof ClusterPipeline) {
×
461
            ((ClusterPipeline) pipeline).sync();
×
462
        } else {
463
            throw new UnsupportedOperationException("unrecognized pipeline type");
×
464
        }
465
    }
×
466

467
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc