• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

alibaba / jetcache / #431

28 Apr 2025 05:27AM UTC coverage: 88.727% (-0.08%) from 88.802%
#431

push

areyouok
ci: change version to 2.7.9-SNAPSHOT

4754 of 5358 relevant lines covered (88.73%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.39
/jetcache-core/src/main/java/com/alicp/jetcache/support/BroadcastManager.java
1
/**
2
 * Created on 2019/6/10.
3
 */
4
package com.alicp.jetcache.support;
5

6
import com.alicp.jetcache.Cache;
7
import com.alicp.jetcache.CacheManager;
8
import com.alicp.jetcache.CacheResult;
9
import com.alicp.jetcache.CacheUtil;
10
import com.alicp.jetcache.MultiLevelCache;
11
import com.alicp.jetcache.embedded.AbstractEmbeddedCache;
12
import com.alicp.jetcache.external.ExternalCacheConfig;
13
import com.alicp.jetcache.CacheConfigException;
14
import org.slf4j.Logger;
15
import org.slf4j.LoggerFactory;
16

17
import java.util.Set;
18
import java.util.UUID;
19
import java.util.function.Function;
20
import java.util.stream.Collectors;
21
import java.util.stream.Stream;
22

23
/**
24
 * @author huangli
25
 */
26
public abstract class BroadcastManager implements AutoCloseable {
27
    private static Logger logger = LoggerFactory.getLogger(BroadcastManager.class);
1✔
28

29
    private final String sourceId = UUID.randomUUID().toString();
1✔
30
    private final CacheManager cacheManager;
31

32
    public BroadcastManager(CacheManager cacheManager) {
1✔
33
        this.cacheManager = cacheManager;
1✔
34
    }
1✔
35

36
    protected void checkConfig(ExternalCacheConfig config) {
37
        if (config.getBroadcastChannel() == null) {
1✔
38
            throw new CacheConfigException("BroadcastChannel not set");
×
39
        }
40
        if (config.getValueEncoder() == null) {
1✔
41
            throw new CacheConfigException("no value encoder");
×
42
        }
43
        if (config.getValueDecoder() == null) {
1✔
44
            throw new CacheConfigException("no value decoder");
×
45
        }
46
    }
1✔
47

48
    public abstract CacheResult publish(CacheMessage cacheMessage);
49

50
    public abstract void startSubscribe();
51

52
    @Override
53
    public void close() throws Exception {
54
    }
×
55

56
    public String getSourceId() {
57
        return sourceId;
1✔
58
    }
59

60
    public CacheManager getCacheManager() {
61
        return cacheManager;
1✔
62
    }
63

64
    protected void processNotification(byte[] message, Function<byte[], Object> decoder) {
65
        try {
66
            if (message == null) {
1✔
67
                logger.error("notify message is null");
×
68
                return;
×
69
            }
70
            Object value = decoder.apply(message);
1✔
71
            if (value == null) {
1✔
72
                logger.error("notify message is null");
×
73
                return;
×
74
            }
75
            if (value instanceof CacheMessage) {
1✔
76
                processCacheMessage((CacheMessage) value);
1✔
77
            } else {
78
                logger.error("the message is not instance of CacheMessage, class={}", value.getClass());
×
79
            }
80
        } catch (Throwable e) {
×
81
            SquashedLogger.getLogger(logger).error("receive cache notify error", e);
×
82
        }
1✔
83
    }
1✔
84

85
    private void processCacheMessage(CacheMessage cacheMessage) {
86
        if (sourceId.equals(cacheMessage.getSourceId())) {
1✔
87
            return;
1✔
88
        }
89
        Cache cache = cacheManager.getCache(cacheMessage.getArea(), cacheMessage.getCacheName());
1✔
90
        if (cache == null) {
1✔
91
            logger.warn("Cache instance not exists: {},{}", cacheMessage.getArea(), cacheMessage.getCacheName());
×
92
            return;
×
93
        }
94
        Cache absCache = CacheUtil.getAbstractCache(cache);
1✔
95
        if (!(absCache instanceof MultiLevelCache)) {
1✔
96
            logger.warn("Cache instance is not MultiLevelCache: {},{}", cacheMessage.getArea(), cacheMessage.getCacheName());
×
97
            return;
×
98
        }
99
        Cache[] caches = ((MultiLevelCache) absCache).caches();
1✔
100
        Set<Object> keys = Stream.of(cacheMessage.getKeys()).collect(Collectors.toSet());
1✔
101
        for (Cache c : caches) {
1✔
102
            Cache localCache = CacheUtil.getAbstractCache(c);
1✔
103
            if (localCache instanceof AbstractEmbeddedCache) {
1✔
104
                ((AbstractEmbeddedCache) localCache).__removeAll(keys);
1✔
105
            } else {
106
                break;
107
            }
108
        }
109
    }
1✔
110

111
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc