• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

alibaba / jetcache / #405

16 Apr 2024 05:58AM UTC coverage: 0.0% (-88.9%) from 88.866%
#405

push

areyouok
add encoding to fix coverage report

0 of 5353 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/jetcache-support/jetcache-redis/src/main/java/com/alicp/jetcache/redis/RedisBroadcastManager.java
1
package com.alicp.jetcache.redis;
2

3
import com.alicp.jetcache.CacheConfigException;
4
import com.alicp.jetcache.CacheManager;
5
import com.alicp.jetcache.CacheResult;
6
import com.alicp.jetcache.support.BroadcastManager;
7
import com.alicp.jetcache.support.CacheMessage;
8
import com.alicp.jetcache.support.SquashedLogger;
9
import org.slf4j.Logger;
10
import org.slf4j.LoggerFactory;
11
import redis.clients.jedis.BinaryJedisPubSub;
12
import redis.clients.jedis.Jedis;
13
import redis.clients.jedis.UnifiedJedis;
14

15
import java.nio.charset.StandardCharsets;
16
import java.util.concurrent.locks.ReentrantLock;
17

18
/**
19
 * Created on 2022-05-03
20
 *
21
 * @author huangli
22
 */
23
public class RedisBroadcastManager extends BroadcastManager {
24

25
    private static final Logger logger = LoggerFactory.getLogger(RedisBroadcastManager.class);
×
26

27
    private final byte[] channel;
28
    private final String channelStr;
29
    private final RedisCacheConfig<Object, Object> config;
30

31
    private volatile CacheMessagePubSub cacheMessagePubSub;
32
    private volatile boolean closed;
33
    private volatile boolean subscribe;
34
    private boolean subscribeThreadStart;
35

36
    private final ReentrantLock reentrantLock = new ReentrantLock();
×
37

38
    public RedisBroadcastManager(CacheManager cacheManager, RedisCacheConfig<Object, Object> config) {
39
        super(cacheManager);
×
40
        this.channelStr = config.getBroadcastChannel();
×
41
        this.channel = channelStr.getBytes(StandardCharsets.UTF_8);
×
42
        this.config = config;
×
43

44
        checkConfig(config);
×
45
        if (config.getJedis() == null && config.getJedisPool() == null) {
×
46
            throw new CacheConfigException("no jedis");
×
47
        }
48
        if (config.getJedis() != null && config.getJedisPool() != null) {
×
49
            throw new CacheConfigException("'jedis' and 'jedisPool' can't set simultaneously");
×
50
        }
51
    }
×
52

53
    @Override
54
    public void startSubscribe() {
55
        reentrantLock.lock();
×
56
        try {
57
            if (subscribeThreadStart) {
×
58
                throw new IllegalStateException("subscribe thread is started");
×
59
            }
60
            this.cacheMessagePubSub = new CacheMessagePubSub();
×
61
            Thread subThread;
62
            subThread = new Thread(this::runSubThread, "Sub_" + channelStr);
×
63
            subThread.setDaemon(true);
×
64
            subThread.start();
×
65
            this.subscribeThreadStart = true;
×
66
        }finally {
67
            reentrantLock.unlock();
×
68
        }
69
    }
×
70

71
    private void runSubThread() {
72
        while (!closed) {
×
73
            runSubThread0();
×
74
        }
75
    }
×
76

77
    private void runSubThread0() {
78
        Object jedisObj = null;
×
79
        try {
80
            jedisObj = writeCommands();
×
81
            if (jedisObj instanceof Jedis) {
×
82
                subscribe = true;
×
83
                ((Jedis) jedisObj).subscribe(cacheMessagePubSub, channel);
×
84
            } else if (jedisObj instanceof UnifiedJedis) {
×
85
                subscribe = true;
×
86
                ((UnifiedJedis) jedisObj).subscribe(cacheMessagePubSub, channel);
×
87
            }
88
        } catch (Throwable e) {
×
89
            SquashedLogger.getLogger(logger).error("run jedis subscribe thread error: {}", e);
×
90
            try {
91
                Thread.sleep(1000);
×
92
            } catch (InterruptedException ex) {
×
93
                // ignore
94
            }
×
95
        } finally {
96
            subscribe = false;
×
97
            RedisCache.closeJedis(jedisObj);
×
98
        }
99
    }
×
100

101
    Object writeCommands() {
102
        return config.getJedis() != null ? config.getJedis() : config.getJedisPool().getResource();
×
103
    }
104

105
    @Override
106
    public CacheResult publish(CacheMessage message) {
107
        Object jedisObj = null;
×
108
        try {
109
            jedisObj = writeCommands();
×
110
            byte[] value = config.getValueEncoder().apply(message);
×
111
            if (jedisObj instanceof Jedis) {
×
112
                ((Jedis) jedisObj).publish(channel, value);
×
113
            } else {
114
                ((UnifiedJedis) jedisObj).publish(channel, value);
×
115
            }
116
            return CacheResult.SUCCESS_WITHOUT_MSG;
×
117
        } catch (Exception ex) {
×
118
            SquashedLogger.getLogger(logger).error("jetcache publish error", ex);
×
119
            return new CacheResult(ex);
×
120
        } finally {
121
            RedisCache.closeJedis(jedisObj);
×
122
        }
123
    }
124

125

126
    @Override
127
    public void close() {
128
        reentrantLock.lock();
×
129
        try {
130
            if (this.closed) {
×
131
                return;
×
132
            }
133
            this.closed = true;
×
134
            if (subscribe) {
×
135
                try {
136
                    this.cacheMessagePubSub.unsubscribe(channel);
×
137
                } catch (Exception e) {
×
138
                    logger.warn("unsubscribe {} fail", channelStr, e);
×
139
                }
×
140
            }
141
        }finally {
142
            reentrantLock.unlock();
×
143
        }
144
    }
×
145

146
    class CacheMessagePubSub extends BinaryJedisPubSub {
×
147

148
        @Override
149
        public void onMessage(byte[] channel, byte[] message) {
150
            processNotification(message, config.getValueDecoder());
×
151
        }
×
152
    }
153
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc