• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

alibaba / jetcache / #405

16 Apr 2024 05:58AM UTC coverage: 0.0% (-88.9%) from 88.866%
#405

push

areyouok
add encoding to fix coverage report

0 of 5353 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/jetcache-core/src/main/java/com/alicp/jetcache/RefreshCache.java
1
package com.alicp.jetcache;
2

3
import com.alicp.jetcache.embedded.AbstractEmbeddedCache;
4
import com.alicp.jetcache.external.AbstractExternalCache;
5
import com.alicp.jetcache.support.JetCacheExecutor;
6
import org.slf4j.Logger;
7
import org.slf4j.LoggerFactory;
8

9
import java.nio.ByteBuffer;
10
import java.util.ArrayList;
11
import java.util.Arrays;
12
import java.util.List;
13
import java.util.Map;
14
import java.util.Set;
15
import java.util.concurrent.ConcurrentHashMap;
16
import java.util.concurrent.ScheduledFuture;
17
import java.util.concurrent.TimeUnit;
18
import java.util.function.Function;
19

20
/**
21
 * Created on 2017/5/25.
22
 *
23
 * @author huangli
24
 */
25
public class RefreshCache<K, V> extends LoadingCache<K, V> {
26

27
    private static final Logger logger = LoggerFactory.getLogger(RefreshCache.class);
×
28

29
    public static final byte[] LOCK_KEY_SUFFIX = "_#RL#".getBytes();
×
30
    public static final byte[] TIMESTAMP_KEY_SUFFIX = "_#TS#".getBytes();
×
31

32
    private ConcurrentHashMap<Object, RefreshTask> taskMap = new ConcurrentHashMap<>();
×
33

34
    private boolean multiLevelCache;
35

36
    public RefreshCache(Cache cache) {
37
        super(cache);
×
38
        multiLevelCache = isMultiLevelCache();
×
39
    }
×
40

41
    protected void stopRefresh() {
42
        List<RefreshTask> tasks = new ArrayList<>();
×
43
        tasks.addAll(taskMap.values());
×
44
        tasks.forEach(task -> task.cancel());
×
45
    }
×
46

47
    @Override
48
    public void close() {
49
        stopRefresh();
×
50
        super.close();
×
51
    }
×
52

53

54
    private boolean hasLoader() {
55
        return config.getLoader() != null;
×
56
    }
57

58
    @Override
59
    public V computeIfAbsent(K key, Function<K, V> loader) {
60
        return computeIfAbsent(key, loader, config().isCacheNullValue());
×
61
    }
62

63
    @Override
64
    public V computeIfAbsent(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull) {
65
        return AbstractCache.computeIfAbsentImpl(key, loader, cacheNullWhenLoaderReturnNull,
×
66
                0, null, this);
67
    }
68

69
    @Override
70
    public V computeIfAbsent(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull,
71
                             long expireAfterWrite, TimeUnit timeUnit) {
72
        return AbstractCache.computeIfAbsentImpl(key, loader, cacheNullWhenLoaderReturnNull,
×
73
                expireAfterWrite, timeUnit, this);
74
    }
75

76
    protected Cache concreteCache() {
77
        Cache c = getTargetCache();
×
78
        while (true) {
79
            if (c instanceof ProxyCache) {
×
80
                c = ((ProxyCache) c).getTargetCache();
×
81
            } else if (c instanceof MultiLevelCache) {
×
82
                Cache[] caches = ((MultiLevelCache) c).caches();
×
83
                c = caches[caches.length - 1];
×
84
            } else {
×
85
                return c;
×
86
            }
87
        }
88
    }
89

90
    private boolean isMultiLevelCache() {
91
        Cache c = getTargetCache();
×
92
        while (c instanceof ProxyCache) {
×
93
            c = ((ProxyCache) c).getTargetCache();
×
94
        }
95
        return c instanceof MultiLevelCache;
×
96
    }
97

98
    private Object getTaskId(K key) {
99
        Cache c = concreteCache();
×
100
        if (c instanceof AbstractEmbeddedCache) {
×
101
            return ((AbstractEmbeddedCache) c).buildKey(key);
×
102
        } else if (c instanceof AbstractExternalCache) {
×
103
            byte[] bs = ((AbstractExternalCache) c).buildKey(key);
×
104
            return ByteBuffer.wrap(bs);
×
105
        } else {
106
            logger.error("can't getTaskId from " + c.getClass());
×
107
            return null;
×
108
        }
109
    }
110

111
    protected void addOrUpdateRefreshTask(K key, CacheLoader<K,V> loader) {
112
        RefreshPolicy refreshPolicy = config.getRefreshPolicy();
×
113
        if (refreshPolicy == null) {
×
114
            return;
×
115
        }
116
        long refreshMillis = refreshPolicy.getRefreshMillis();
×
117
        if (refreshMillis > 0) {
×
118
            Object taskId = getTaskId(key);
×
119
            RefreshTask refreshTask = taskMap.computeIfAbsent(taskId, tid -> {
×
120
                logger.debug("add refresh task. interval={},  key={}", refreshMillis , key);
×
121
                RefreshTask task = new RefreshTask(taskId, key, loader);
×
122
                task.lastAccessTime = System.currentTimeMillis();
×
123
                ScheduledFuture<?> future = JetCacheExecutor.heavyIOExecutor().scheduleWithFixedDelay(
×
124
                        task, refreshMillis, refreshMillis, TimeUnit.MILLISECONDS);
125
                task.future = future;
×
126
                return task;
×
127
            });
128
            refreshTask.lastAccessTime = System.currentTimeMillis();
×
129
        }
130
    }
×
131

132
    @Override
133
    public V get(K key) throws CacheInvokeException {
134
        if (config.getRefreshPolicy() != null && hasLoader()) {
×
135
            addOrUpdateRefreshTask(key, null);
×
136
        }
137
        return super.get(key);
×
138
    }
139

140
    @Override
141
    public Map<K, V> getAll(Set<? extends K> keys) throws CacheInvokeException {
142
        if (config.getRefreshPolicy() != null && hasLoader()) {
×
143
            for (K key : keys) {
×
144
                addOrUpdateRefreshTask(key, null);
×
145
            }
×
146
        }
147
        return super.getAll(keys);
×
148
    }
149

150
    class RefreshTask implements Runnable {
151
        private Object taskId;
152
        private K key;
153
        private CacheLoader<K, V> loader;
154

155
        private long lastAccessTime;
156
        private ScheduledFuture future;
157

158
        RefreshTask(Object taskId, K key, CacheLoader<K, V> loader) {
×
159
            this.taskId = taskId;
×
160
            this.key = key;
×
161
            this.loader = loader;
×
162
        }
×
163

164
        private void cancel() {
165
            logger.debug("cancel refresh: {}", key);
×
166
            future.cancel(false);
×
167
            taskMap.remove(taskId);
×
168
        }
×
169

170
        private void load() throws Throwable {
171
            CacheLoader<K,V> l = loader == null? config.getLoader(): loader;
×
172
            if (l != null) {
×
173
                l = CacheUtil.createProxyLoader(cache, l, eventConsumer);
×
174
                V v = l.load(key);
×
175
                if (needUpdate(v, l)) {
×
176
                    cache.PUT(key, v);
×
177
                }
178
            }
179
        }
×
180

181
        private void externalLoad(final Cache concreteCache, final long currentTime)
182
                throws Throwable {
183
            byte[] newKey = ((AbstractExternalCache) concreteCache).buildKey(key);
×
184
            byte[] lockKey = combine(newKey, LOCK_KEY_SUFFIX);
×
185
            long loadTimeOut = RefreshCache.this.config.getRefreshPolicy().getRefreshLockTimeoutMillis();
×
186
            long refreshMillis = config.getRefreshPolicy().getRefreshMillis();
×
187
            byte[] timestampKey = combine(newKey, TIMESTAMP_KEY_SUFFIX);
×
188

189
            // AbstractExternalCache buildKey method will not convert byte[]
190
            CacheGetResult refreshTimeResult = concreteCache.GET(timestampKey);
×
191
            boolean shouldLoad = false;
×
192
            if (refreshTimeResult.isSuccess()) {
×
193
                shouldLoad = currentTime >= Long.parseLong(refreshTimeResult.getValue().toString()) + refreshMillis;
×
194
            } else if (refreshTimeResult.getResultCode() == CacheResultCode.NOT_EXISTS) {
×
195
                shouldLoad = true;
×
196
            }
197

198
            if (!shouldLoad) {
×
199
                if (multiLevelCache) {
×
200
                    refreshUpperCaches(key);
×
201
                }
202
                return;
×
203
            }
204

205
            Runnable r = () -> {
×
206
                try {
207
                    load();
×
208
                    // AbstractExternalCache buildKey method will not convert byte[]
209
                    concreteCache.put(timestampKey, String.valueOf(System.currentTimeMillis()));
×
210
                } catch (Throwable e) {
×
211
                    throw new CacheException("refresh error", e);
×
212
                }
×
213
            };
×
214

215
            // AbstractExternalCache buildKey method will not convert byte[]
216
            boolean lockSuccess = concreteCache.tryLockAndRun(lockKey, loadTimeOut, TimeUnit.MILLISECONDS, r);
×
217
            if(!lockSuccess && multiLevelCache) {
×
218
                JetCacheExecutor.heavyIOExecutor().schedule(
×
219
                        () -> refreshUpperCaches(key), (long)(0.2 * refreshMillis), TimeUnit.MILLISECONDS);
×
220
            }
221
        }
×
222

223
        private void refreshUpperCaches(K key) {
224
            MultiLevelCache<K, V> targetCache = (MultiLevelCache<K, V>) getTargetCache();
×
225
            Cache[] caches = targetCache.caches();
×
226
            int len = caches.length;
×
227

228
            CacheGetResult cacheGetResult = caches[len - 1].GET(key);
×
229
            if (!cacheGetResult.isSuccess()) {
×
230
                return;
×
231
            }
232
            for (int i = 0; i < len - 1; i++) {
×
233
                caches[i].PUT(key, cacheGetResult.getValue());
×
234
            }
235
        }
×
236

237
        @Override
238
        public void run() {
239
            try {
240
                if (config.getRefreshPolicy() == null || (loader == null && !hasLoader())) {
×
241
                    cancel();
×
242
                    return;
×
243
                }
244
                long now = System.currentTimeMillis();
×
245
                long stopRefreshAfterLastAccessMillis = config.getRefreshPolicy().getStopRefreshAfterLastAccessMillis();
×
246
                if (stopRefreshAfterLastAccessMillis > 0) {
×
247
                    if (lastAccessTime + stopRefreshAfterLastAccessMillis < now) {
×
248
                        logger.debug("cancel refresh: {}", key);
×
249
                        cancel();
×
250
                        return;
×
251
                    }
252
                }
253
                logger.debug("refresh key: {}", key);
×
254
                Cache concreteCache = concreteCache();
×
255
                if (concreteCache instanceof AbstractExternalCache) {
×
256
                    externalLoad(concreteCache, now);
×
257
                } else {
258
                    load();
×
259
                }
260
            } catch (Throwable e) {
×
261
                logger.error("refresh error: key=" + key, e);
×
262
            }
×
263
        }
×
264
    }
265

266
    private byte[] combine(byte[] bs1, byte[] bs2) {
267
        byte[] newArray = Arrays.copyOf(bs1, bs1.length + bs2.length);
×
268
        System.arraycopy(bs2, 0, newArray, bs1.length, bs2.length);
×
269
        return newArray;
×
270
    }
271
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc