• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

alibaba / jetcache / #419

12 Sep 2024 07:01AM UTC coverage: 88.879% (+0.04%) from 88.839%
#419

push

areyouok
fix: concurrent problem in LinkedHashMapCache, affect 2.7.5 and 2.7.6 (#914)

1 of 1 new or added line in 1 file covered. (100.0%)

3 existing lines in 1 file now uncovered.

4755 of 5350 relevant lines covered (88.88%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.71
/jetcache-core/src/main/java/com/alicp/jetcache/AbstractCache.java
1
package com.alicp.jetcache;
2

3
import com.alicp.jetcache.embedded.AbstractEmbeddedCache;
4
import com.alicp.jetcache.event.CacheEvent;
5
import com.alicp.jetcache.event.CacheGetAllEvent;
6
import com.alicp.jetcache.event.CacheGetEvent;
7
import com.alicp.jetcache.event.CachePutAllEvent;
8
import com.alicp.jetcache.event.CachePutEvent;
9
import com.alicp.jetcache.event.CacheRemoveAllEvent;
10
import com.alicp.jetcache.event.CacheRemoveEvent;
11
import com.alicp.jetcache.external.AbstractExternalCache;
12
import com.alicp.jetcache.support.JetCacheExecutor;
13
import com.alicp.jetcache.support.SquashedLogger;
14
import org.slf4j.Logger;
15
import org.slf4j.LoggerFactory;
16

17
import java.nio.ByteBuffer;
18
import java.time.Duration;
19
import java.util.List;
20
import java.util.Map;
21
import java.util.Set;
22
import java.util.concurrent.CompletionStage;
23
import java.util.concurrent.ConcurrentHashMap;
24
import java.util.concurrent.CountDownLatch;
25
import java.util.concurrent.TimeUnit;
26
import java.util.concurrent.locks.ReentrantLock;
27
import java.util.function.Consumer;
28
import java.util.function.Function;
29

30
/**
31
 * Created on 2016/10/7.
32
 *
33
 * @author huangli
34
 */
35
public abstract class AbstractCache<K, V> implements Cache<K, V> {
1✔
36

37
    private static Logger logger = LoggerFactory.getLogger(AbstractCache.class);
1✔
38

39
    private volatile ConcurrentHashMap<Object, LoaderLock> loaderMap;
40

41
    protected volatile boolean closed;
42
    private static final ReentrantLock reentrantLock = new ReentrantLock();
1✔
43

44
    ConcurrentHashMap<Object, LoaderLock> initOrGetLoaderMap() {
45
        if (loaderMap == null) {
1✔
46
            reentrantLock.lock();
1✔
47
            try {
48
                if (loaderMap == null) {
1✔
49
                    loaderMap = new ConcurrentHashMap<>();
1✔
50
                }
51
            } finally {
52
                reentrantLock.unlock();
1✔
53
            }
54
        }
55
        return loaderMap;
1✔
56
    }
57

58
    protected void logError(String oper, Object key, Throwable e) {
59
        StringBuilder sb = new StringBuilder(256);
1✔
60
        sb.append("jetcache(")
1✔
61
                .append(this.getClass().getSimpleName()).append(") ")
1✔
62
                .append(oper)
1✔
63
                .append(" error.");
1✔
64
        if (!(key instanceof byte[])) {
1✔
65
            try {
66
                sb.append(" key=[")
1✔
67
                        .append(config().getKeyConvertor().apply((K) key))
1✔
68
                        .append(']');
1✔
69
            } catch (Exception ex) {
1✔
70
                // ignore
71
            }
1✔
72
        }
73
        SquashedLogger.getLogger(logger).error(sb, e);
1✔
74
    }
1✔
75

76
    public void notify(CacheEvent e) {
77
        notify0(e);
1✔
78
    }
1✔
79

80
    private void notify(CacheResult r, CacheEvent e) {
81
        CompletionStage<?> f = r.future();
1✔
82
        if (f.toCompletableFuture().isDone()) {
1✔
83
            notify0(e);
1✔
84
        } else {
85
            f.thenRunAsync(() -> notify0(e), JetCacheExecutor.defaultExecutor());
1✔
86
        }
87
    }
1✔
88

89
    private void notify0(CacheEvent e) {
90
        List<CacheMonitor> monitors = config().getMonitors();
1✔
91
        for (CacheMonitor m : monitors) {
1✔
92
            m.afterOperation(e);
1✔
93
        }
1✔
94
    }
1✔
95

96
    @Override
97
    public final CacheGetResult<V> GET(K key) {
98
        long t = System.currentTimeMillis();
1✔
99
        CacheGetResult<V> result;
100
        if (key == null) {
1✔
101
            result = new CacheGetResult<V>(CacheResultCode.FAIL, CacheResult.MSG_ILLEGAL_ARGUMENT, null);
1✔
102
        } else {
103
            result = do_GET(key);
1✔
104
        }
105
        CacheGetEvent event = new CacheGetEvent(this, System.currentTimeMillis() - t, key, result);
1✔
106
        notify(result, event);
1✔
107
        return result;
1✔
108
    }
109

110
    protected abstract CacheGetResult<V> do_GET(K key);
111

112
    @Override
113
    public final MultiGetResult<K, V> GET_ALL(Set<? extends K> keys) {
114
        long t = System.currentTimeMillis();
1✔
115
        MultiGetResult<K, V> result;
116
        if (keys == null) {
1✔
117
            result = new MultiGetResult<>(CacheResultCode.FAIL, CacheResult.MSG_ILLEGAL_ARGUMENT, null);
1✔
118
        } else {
119
            result = do_GET_ALL(keys);
1✔
120
        }
121
        CacheGetAllEvent event = new CacheGetAllEvent(this, System.currentTimeMillis() - t, keys, result);
1✔
122
        notify(result, event);
1✔
123
        return result;
1✔
124
    }
125

126
    protected abstract MultiGetResult<K, V> do_GET_ALL(Set<? extends K> keys);
127

128
    @Override
129
    public final V computeIfAbsent(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull) {
130
        return computeIfAbsentImpl(key, loader, cacheNullWhenLoaderReturnNull,
1✔
131
                0, null, this);
132
    }
133

134
    @Override
135
    public final V computeIfAbsent(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull,
136
                                   long expireAfterWrite, TimeUnit timeUnit) {
137
        return computeIfAbsentImpl(key, loader, cacheNullWhenLoaderReturnNull,
1✔
138
                expireAfterWrite, timeUnit, this);
139
    }
140

141
    private static <K, V> boolean needUpdate(V loadedValue, boolean cacheNullWhenLoaderReturnNull, Function<K, V> loader) {
142
        if (loadedValue == null && !cacheNullWhenLoaderReturnNull) {
1✔
143
            return false;
1✔
144
        }
145
        if (loader instanceof CacheLoader && ((CacheLoader<K, V>) loader).vetoCacheUpdate()) {
1✔
146
            return false;
1✔
147
        }
148
        return true;
1✔
149
    }
150

151
    static <K, V> V computeIfAbsentImpl(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull,
152
                                        long expireAfterWrite, TimeUnit timeUnit, Cache<K, V> cache) {
153
        AbstractCache<K, V> abstractCache = CacheUtil.getAbstractCache(cache);
1✔
154
        CacheLoader<K, V> newLoader = CacheUtil.createProxyLoader(cache, loader, abstractCache::notify);
1✔
155
        CacheGetResult<V> r;
156
        if (cache instanceof RefreshCache) {
1✔
157
            RefreshCache<K, V> refreshCache = ((RefreshCache<K, V>) cache);
1✔
158
            r = refreshCache.GET(key);
1✔
159
            refreshCache.addOrUpdateRefreshTask(key, newLoader);
1✔
160
        } else {
1✔
161
            r = cache.GET(key);
1✔
162
        }
163
        if (r.isSuccess()) {
1✔
164
            return r.getValue();
1✔
165
        } else {
166
            Consumer<V> cacheUpdater = (loadedValue) -> {
1✔
167
                if (needUpdate(loadedValue, cacheNullWhenLoaderReturnNull, newLoader)) {
1✔
168
                    if (timeUnit != null) {
1✔
169
                        cache.PUT(key, loadedValue, expireAfterWrite, timeUnit).waitForResult();
1✔
170
                    } else {
171
                        cache.PUT(key, loadedValue).waitForResult();
1✔
172
                    }
173
                }
174
            };
1✔
175

176
            V loadedValue;
177
            if (cache.config().isCachePenetrationProtect()) {
1✔
178
                loadedValue = synchronizedLoad(cache.config(), abstractCache, key, newLoader, cacheUpdater);
1✔
179
            } else {
180
                loadedValue = newLoader.apply(key);
1✔
181
                cacheUpdater.accept(loadedValue);
1✔
182
            }
183

184
            return loadedValue;
1✔
185
        }
186
    }
187

188
    static <K, V> V synchronizedLoad(CacheConfig config, AbstractCache<K, V> abstractCache,
189
                                     K key, Function<K, V> newLoader, Consumer<V> cacheUpdater) {
190
        ConcurrentHashMap<Object, LoaderLock> loaderMap = abstractCache.initOrGetLoaderMap();
1✔
191
        Object lockKey = buildLoaderLockKey(abstractCache, key);
1✔
192
        while (true) {
193
            boolean create[] = new boolean[1];
1✔
194
            LoaderLock ll = loaderMap.computeIfAbsent(lockKey, (unusedKey) -> {
1✔
195
                create[0] = true;
1✔
196
                LoaderLock loaderLock = new LoaderLock();
1✔
197
                loaderLock.signal = new CountDownLatch(1);
1✔
198
                loaderLock.loaderThread = Thread.currentThread();
1✔
199
                return loaderLock;
1✔
200
            });
201
            if (create[0] || ll.loaderThread == Thread.currentThread()) {
1✔
202
                try {
203
                    CacheGetResult<V> getResult = abstractCache.GET(key);
1✔
204
                    if (getResult.isSuccess()) {
1✔
UNCOV
205
                        ll.success = true;
×
UNCOV
206
                        ll.value = getResult.getValue();
×
UNCOV
207
                        return getResult.getValue();
×
208
                    } else {
209
                        V loadedValue = newLoader.apply(key);
1✔
210
                        ll.success = true;
1✔
211
                        ll.value = loadedValue;
1✔
212
                        cacheUpdater.accept(loadedValue);
1✔
213
                        return loadedValue;
1✔
214
                    }
215
                } finally {
216
                    if (create[0]) {
1✔
217
                        ll.signal.countDown();
1✔
218
                        loaderMap.remove(lockKey);
1✔
219
                    }
220
                }
221
            } else {
222
                try {
223
                    Duration timeout = config.getPenetrationProtectTimeout();
1✔
224
                    if (timeout == null) {
1✔
225
                        ll.signal.await();
1✔
226
                    } else {
227
                        boolean ok = ll.signal.await(timeout.toMillis(), TimeUnit.MILLISECONDS);
1✔
228
                        if (!ok) {
1✔
229
                            logger.info("loader wait timeout:" + timeout);
1✔
230
                            return newLoader.apply(key);
1✔
231
                        }
232
                    }
233
                } catch (InterruptedException e) {
1✔
234
                    logger.warn("loader wait interrupted");
1✔
235
                    return newLoader.apply(key);
1✔
236
                }
1✔
237
                if (ll.success) {
1✔
238
                    return (V) ll.value;
1✔
239
                } else {
240
                    continue;
241
                }
242

243
            }
244
        }
245
    }
246

247
    private static Object buildLoaderLockKey(Cache c, Object key) {
248
        if (c instanceof AbstractEmbeddedCache) {
1✔
249
            return ((AbstractEmbeddedCache) c).buildKey(key);
1✔
250
        } else if (c instanceof AbstractExternalCache) {
1✔
251
            byte bytes[] = ((AbstractExternalCache) c).buildKey(key);
1✔
252
            return ByteBuffer.wrap(bytes);
1✔
253
        } else if (c instanceof MultiLevelCache) {
1✔
254
            c = ((MultiLevelCache) c).caches()[0];
1✔
255
            return buildLoaderLockKey(c, key);
1✔
256
        } else if (c instanceof ProxyCache) {
×
257
            c = ((ProxyCache) c).getTargetCache();
×
258
            return buildLoaderLockKey(c, key);
×
259
        } else {
260
            throw new CacheException("impossible");
×
261
        }
262
    }
263

264
    @Override
265
    public final CacheResult PUT(K key, V value, long expireAfterWrite, TimeUnit timeUnit) {
266
        long t = System.currentTimeMillis();
1✔
267
        CacheResult result;
268
        if (key == null) {
1✔
269
            result = CacheResult.FAIL_ILLEGAL_ARGUMENT;
1✔
270
        } else {
271
            result = do_PUT(key, value, expireAfterWrite, timeUnit);
1✔
272
        }
273
        CachePutEvent event = new CachePutEvent(this, System.currentTimeMillis() - t, key, value, result);
1✔
274
        notify(result, event);
1✔
275
        return result;
1✔
276
    }
277

278
    protected abstract CacheResult do_PUT(K key, V value, long expireAfterWrite, TimeUnit timeUnit);
279

280
    @Override
281
    public final CacheResult PUT_ALL(Map<? extends K, ? extends V> map, long expireAfterWrite, TimeUnit timeUnit) {
282
        long t = System.currentTimeMillis();
1✔
283
        CacheResult result;
284
        if (map == null) {
1✔
285
            result = CacheResult.FAIL_ILLEGAL_ARGUMENT;
1✔
286
        } else {
287
            result = do_PUT_ALL(map, expireAfterWrite, timeUnit);
1✔
288
        }
289
        CachePutAllEvent event = new CachePutAllEvent(this, System.currentTimeMillis() - t, map, result);
1✔
290
        notify(result, event);
1✔
291
        return result;
1✔
292
    }
293

294
    protected abstract CacheResult do_PUT_ALL(Map<? extends K, ? extends V> map, long expireAfterWrite, TimeUnit timeUnit);
295

296
    @Override
297
    public final CacheResult REMOVE(K key) {
298
        long t = System.currentTimeMillis();
1✔
299
        CacheResult result;
300
        if (key == null) {
1✔
301
            result = CacheResult.FAIL_ILLEGAL_ARGUMENT;
1✔
302
        } else {
303
            result = do_REMOVE(key);
1✔
304
        }
305
        CacheRemoveEvent event = new CacheRemoveEvent(this, System.currentTimeMillis() - t, key, result);
1✔
306
        notify(result, event);
1✔
307
        return result;
1✔
308
    }
309

310
    protected abstract CacheResult do_REMOVE(K key);
311

312
    @Override
313
    public final CacheResult REMOVE_ALL(Set<? extends K> keys) {
314
        long t = System.currentTimeMillis();
1✔
315
        CacheResult result;
316
        if (keys == null) {
1✔
317
            result = CacheResult.FAIL_ILLEGAL_ARGUMENT;
1✔
318
        } else {
319
            result = do_REMOVE_ALL(keys);
1✔
320
        }
321
        CacheRemoveAllEvent event = new CacheRemoveAllEvent(this, System.currentTimeMillis() - t, keys, result);
1✔
322
        notify(result, event);
1✔
323
        return result;
1✔
324
    }
325

326
    protected abstract CacheResult do_REMOVE_ALL(Set<? extends K> keys);
327

328
    @Override
329
    public final CacheResult PUT_IF_ABSENT(K key, V value, long expireAfterWrite, TimeUnit timeUnit) {
330
        long t = System.currentTimeMillis();
1✔
331
        CacheResult result;
332
        if (key == null) {
1✔
333
            result = CacheResult.FAIL_ILLEGAL_ARGUMENT;
1✔
334
        } else {
335
            result = do_PUT_IF_ABSENT(key, value, expireAfterWrite, timeUnit);
1✔
336
        }
337
        CachePutEvent event = new CachePutEvent(this, System.currentTimeMillis() - t, key, value, result);
1✔
338
        notify(result, event);
1✔
339
        return result;
1✔
340
    }
341

342
    protected abstract CacheResult do_PUT_IF_ABSENT(K key, V value, long expireAfterWrite, TimeUnit timeUnit);
343

344
    @Override
345
    public void close() {
346
        this.closed = true;
1✔
347
    }
1✔
348

349
    public boolean isClosed() {
350
        return this.closed;
1✔
351
    }
352

353
    static class LoaderLock {
1✔
354
        CountDownLatch signal;
355
        Thread loaderThread;
356
        volatile boolean success;
357
        volatile Object value;
358
    }
359
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc