• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

alibaba / jetcache / #421

12 Sep 2024 08:19AM UTC coverage: 88.785% (-0.06%) from 88.841%
#421

push

areyouok
ci: change pom version to 2.7.8-SNAPSHOT

4750 of 5350 relevant lines covered (88.79%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.61
/jetcache-support/jetcache-redis/src/main/java/com/alicp/jetcache/redis/RedisBroadcastManager.java
1
package com.alicp.jetcache.redis;
2

3
import com.alicp.jetcache.CacheConfigException;
4
import com.alicp.jetcache.CacheManager;
5
import com.alicp.jetcache.CacheResult;
6
import com.alicp.jetcache.support.BroadcastManager;
7
import com.alicp.jetcache.support.CacheMessage;
8
import com.alicp.jetcache.support.SquashedLogger;
9
import org.slf4j.Logger;
10
import org.slf4j.LoggerFactory;
11
import redis.clients.jedis.BinaryJedisPubSub;
12
import redis.clients.jedis.Jedis;
13
import redis.clients.jedis.UnifiedJedis;
14

15
import java.nio.charset.StandardCharsets;
16
import java.util.concurrent.locks.ReentrantLock;
17

18
/**
19
 * Created on 2022-05-03
20
 *
21
 * @author huangli
22
 */
23
public class RedisBroadcastManager extends BroadcastManager {
24

25
    private static final Logger logger = LoggerFactory.getLogger(RedisBroadcastManager.class);
1✔
26

27
    private final byte[] channel;
28
    private final String channelStr;
29
    private final RedisCacheConfig<Object, Object> config;
30

31
    private volatile CacheMessagePubSub cacheMessagePubSub;
32
    private volatile boolean closed;
33
    private volatile boolean subscribe;
34
    private boolean subscribeThreadStart;
35

36
    private final ReentrantLock reentrantLock = new ReentrantLock();
1✔
37

38
    public RedisBroadcastManager(CacheManager cacheManager, RedisCacheConfig<Object, Object> config) {
39
        super(cacheManager);
1✔
40
        this.channelStr = config.getBroadcastChannel();
1✔
41
        this.channel = channelStr.getBytes(StandardCharsets.UTF_8);
1✔
42
        this.config = config;
1✔
43

44
        checkConfig(config);
1✔
45
        if (config.getJedis() == null && config.getJedisPool() == null) {
1✔
46
            throw new CacheConfigException("no jedis");
×
47
        }
48
        if (config.getJedis() != null && config.getJedisPool() != null) {
1✔
49
            throw new CacheConfigException("'jedis' and 'jedisPool' can't set simultaneously");
×
50
        }
51
    }
1✔
52

53
    @Override
54
    public void startSubscribe() {
55
        reentrantLock.lock();
1✔
56
        try {
57
            if (subscribeThreadStart) {
1✔
58
                throw new IllegalStateException("subscribe thread is started");
×
59
            }
60
            this.cacheMessagePubSub = new CacheMessagePubSub();
1✔
61
            Thread subThread;
62
            subThread = new Thread(this::runSubThread, "Sub_" + channelStr);
1✔
63
            subThread.setDaemon(true);
1✔
64
            subThread.start();
1✔
65
            this.subscribeThreadStart = true;
1✔
66
        }finally {
67
            reentrantLock.unlock();
1✔
68
        }
69
    }
1✔
70

71
    private void runSubThread() {
72
        while (!closed) {
1✔
73
            runSubThread0();
1✔
74
        }
75
    }
1✔
76

77
    private void runSubThread0() {
78
        Object jedisObj = null;
1✔
79
        try {
80
            jedisObj = writeCommands();
1✔
81
            if (jedisObj instanceof Jedis) {
1✔
82
                subscribe = true;
1✔
83
                ((Jedis) jedisObj).subscribe(cacheMessagePubSub, channel);
×
84
            } else if (jedisObj instanceof UnifiedJedis) {
1✔
85
                subscribe = true;
1✔
86
                ((UnifiedJedis) jedisObj).subscribe(cacheMessagePubSub, channel);
1✔
87
            }
88
        } catch (Throwable e) {
×
89
            SquashedLogger.getLogger(logger).error("run jedis subscribe thread error: {}", e);
×
90
            try {
91
                Thread.sleep(1000);
×
92
            } catch (InterruptedException ex) {
×
93
                // ignore
94
            }
×
95
        } finally {
96
            subscribe = false;
1✔
97
            RedisCache.closeJedis(jedisObj);
1✔
98
        }
99
    }
1✔
100

101
    Object writeCommands() {
102
        return config.getJedis() != null ? config.getJedis() : config.getJedisPool().getResource();
1✔
103
    }
104

105
    @Override
106
    public CacheResult publish(CacheMessage message) {
107
        Object jedisObj = null;
1✔
108
        try {
109
            jedisObj = writeCommands();
1✔
110
            byte[] value = config.getValueEncoder().apply(message);
1✔
111
            if (jedisObj instanceof Jedis) {
1✔
112
                ((Jedis) jedisObj).publish(channel, value);
1✔
113
            } else {
114
                ((UnifiedJedis) jedisObj).publish(channel, value);
1✔
115
            }
116
            return CacheResult.SUCCESS_WITHOUT_MSG;
1✔
117
        } catch (Exception ex) {
×
118
            SquashedLogger.getLogger(logger).error("jetcache publish error", ex);
×
119
            return new CacheResult(ex);
×
120
        } finally {
121
            RedisCache.closeJedis(jedisObj);
1✔
122
        }
123
    }
124

125

126
    @Override
127
    public void close() {
128
        reentrantLock.lock();
1✔
129
        try {
130
            if (this.closed) {
1✔
131
                return;
×
132
            }
133
            this.closed = true;
1✔
134
            if (subscribe) {
1✔
135
                try {
136
                    this.cacheMessagePubSub.unsubscribe(channel);
1✔
137
                } catch (Exception e) {
×
138
                    logger.warn("unsubscribe {} fail", channelStr, e);
×
139
                }
1✔
140
            }
141
        }finally {
142
            reentrantLock.unlock();
1✔
143
        }
144
    }
1✔
145

146
    class CacheMessagePubSub extends BinaryJedisPubSub {
1✔
147

148
        @Override
149
        public void onMessage(byte[] channel, byte[] message) {
150
            processNotification(message, config.getValueDecoder());
1✔
151
        }
1✔
152
    }
153
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc