• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ben-manes / caffeine / #3896

pending completion
#3896

push

github-actions

ben-manes
upgrade jamm library (memory meter)

7542 of 7616 relevant lines covered (99.03%)

0.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.53
/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java
1
/*
2
 * Copyright 2015 Ben Manes. All Rights Reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package com.github.benmanes.caffeine.cache;
17

18
import static com.github.benmanes.caffeine.cache.Caffeine.calculateHashMapCapacity;
19
import static com.github.benmanes.caffeine.cache.LocalAsyncCache.composeResult; // NOPMD
20
import static java.util.Objects.requireNonNull;
21

22
import java.lang.System.Logger;
23
import java.lang.System.Logger.Level;
24
import java.lang.reflect.Method;
25
import java.util.LinkedHashMap;
26
import java.util.Map;
27
import java.util.Set;
28
import java.util.concurrent.CancellationException;
29
import java.util.concurrent.CompletableFuture;
30
import java.util.concurrent.CompletionException;
31
import java.util.concurrent.Executor;
32
import java.util.concurrent.TimeoutException;
33
import java.util.function.BiFunction;
34
import java.util.function.Function;
35

36
import org.checkerframework.checker.nullness.qual.Nullable;
37

38
/**
39
 * This class provides a skeletal implementation of the {@link AsyncLoadingCache} interface to
40
 * minimize the effort required to implement a {@link LocalCache}.
41
 *
42
 * @author ben.manes@gmail.com (Ben Manes)
43
 */
44
abstract class LocalAsyncLoadingCache<K, V>
45
    implements LocalAsyncCache<K, V>, AsyncLoadingCache<K, V> {
46
  static final Logger logger = System.getLogger(LocalAsyncLoadingCache.class.getName());
1✔
47

48
  final @Nullable BiFunction<? super Set<? extends K>, ? super Executor,
49
      ? extends CompletableFuture<? extends Map<? extends K, ? extends V>>> bulkMappingFunction;
50
  final BiFunction<? super K, ? super Executor,
51
      ? extends CompletableFuture<? extends V>> mappingFunction;
52
  final AsyncCacheLoader<K, V> cacheLoader;
53

54
  @Nullable LoadingCacheView<K, V> cacheView;
55

56
  @SuppressWarnings("unchecked")
57
  LocalAsyncLoadingCache(AsyncCacheLoader<? super K, V> cacheLoader) {
1✔
58
    this.bulkMappingFunction = newBulkMappingFunction(cacheLoader);
1✔
59
    this.cacheLoader = (AsyncCacheLoader<K, V>) cacheLoader;
1✔
60
    this.mappingFunction = newMappingFunction(cacheLoader);
1✔
61
  }
1✔
62

63
  /** Returns a mapping function that adapts to {@link AsyncCacheLoader#asyncLoad}. */
64
  BiFunction<? super K, ? super Executor, ? extends CompletableFuture<? extends V>> newMappingFunction(
65
      AsyncCacheLoader<? super K, V> cacheLoader) {
66
    return (key, executor) -> {
1✔
67
      try {
68
        return cacheLoader.asyncLoad(key, executor);
1✔
69
      } catch (RuntimeException e) {
1✔
70
        throw e;
1✔
71
      } catch (InterruptedException e) {
1✔
72
        Thread.currentThread().interrupt();
1✔
73
        throw new CompletionException(e);
1✔
74
      } catch (Exception e) {
1✔
75
        throw new CompletionException(e);
1✔
76
      }
77
    };
78
  }
79

80
  /**
81
   * Returns a mapping function that adapts to {@link AsyncCacheLoader#asyncLoadAll}, if
82
   * implemented.
83
   */
84
  @Nullable
85
  BiFunction<Set<? extends K>, Executor, CompletableFuture<Map<K, V>>> newBulkMappingFunction(
86
      AsyncCacheLoader<? super K, V> cacheLoader) {
87
    if (!canBulkLoad(cacheLoader)) {
1✔
88
      return null;
1✔
89
    }
90
    return (keysToLoad, executor) -> {
1✔
91
      try {
92
        @SuppressWarnings("unchecked")
93
        var loaded = (CompletableFuture<Map<K, V>>) cacheLoader.asyncLoadAll(keysToLoad, executor);
1✔
94
        return loaded;
1✔
95
      } catch (RuntimeException e) {
1✔
96
        throw e;
1✔
97
      } catch (InterruptedException e) {
1✔
98
        Thread.currentThread().interrupt();
1✔
99
        throw new CompletionException(e);
1✔
100
      } catch (Exception e) {
1✔
101
        throw new CompletionException(e);
1✔
102
      }
103
    };
104
  }
105

106
  /** Returns whether the supplied cache loader has bulk load functionality. */
107
  boolean canBulkLoad(AsyncCacheLoader<?, ?> loader) {
108
    try {
109
      Class<?> defaultLoaderClass = AsyncCacheLoader.class;
1✔
110
      if (loader instanceof CacheLoader<?, ?>) {
1✔
111
        defaultLoaderClass = CacheLoader.class;
1✔
112

113
        Method classLoadAll = loader.getClass().getMethod("loadAll", Set.class);
1✔
114
        Method defaultLoadAll = CacheLoader.class.getMethod("loadAll", Set.class);
1✔
115
        if (!classLoadAll.equals(defaultLoadAll)) {
1✔
116
          return true;
1✔
117
        }
118
      }
119

120
      Method classAsyncLoadAll = loader.getClass().getMethod(
1✔
121
          "asyncLoadAll", Set.class, Executor.class);
122
      Method defaultAsyncLoadAll = defaultLoaderClass.getMethod(
1✔
123
          "asyncLoadAll", Set.class, Executor.class);
124
      return !classAsyncLoadAll.equals(defaultAsyncLoadAll);
1✔
125
    } catch (NoSuchMethodException | SecurityException e) {
×
126
      logger.log(Level.WARNING, "Cannot determine if CacheLoader can bulk load", e);
×
127
      return false;
×
128
    }
129
  }
130

131
  @Override
132
  public CompletableFuture<V> get(K key) {
133
    return get(key, mappingFunction);
1✔
134
  }
135

136
  @Override
137
  public CompletableFuture<Map<K, V>> getAll(Iterable<? extends K> keys) {
138
    if (bulkMappingFunction != null) {
1✔
139
      return getAll(keys, bulkMappingFunction);
1✔
140
    }
141

142
    Function<K, CompletableFuture<V>> mappingFunction = this::get;
1✔
143
    var result = new LinkedHashMap<K, CompletableFuture<V>>(calculateHashMapCapacity(keys));
1✔
144
    for (K key : keys) {
1✔
145
      var future = result.computeIfAbsent(key, mappingFunction);
1✔
146
      requireNonNull(future);
1✔
147
    }
1✔
148
    return composeResult(result);
1✔
149
  }
150

151
  @Override
152
  public LoadingCache<K, V> synchronous() {
153
    return (cacheView == null) ? (cacheView = new LoadingCacheView<>(this)) : cacheView;
1✔
154
  }
155

156
  /* --------------- Synchronous views --------------- */
157

158
  static final class LoadingCacheView<K, V>
159
      extends AbstractCacheView<K, V> implements LoadingCache<K, V> {
160
    private static final long serialVersionUID = 1L;
161

162
    @SuppressWarnings("serial")
163
    final LocalAsyncLoadingCache<K, V> asyncCache;
164

165
    LoadingCacheView(LocalAsyncLoadingCache<K, V> asyncCache) {
1✔
166
      this.asyncCache = requireNonNull(asyncCache);
1✔
167
    }
1✔
168

169
    @Override
170
    LocalAsyncLoadingCache<K, V> asyncCache() {
171
      return asyncCache;
1✔
172
    }
173

174
    @Override
175
    @SuppressWarnings("PMD.PreserveStackTrace")
176
    public V get(K key) {
177
      return resolve(asyncCache.get(key));
1✔
178
    }
179

180
    @Override
181
    @SuppressWarnings("PMD.PreserveStackTrace")
182
    public Map<K, V> getAll(Iterable<? extends K> keys) {
183
      return resolve(asyncCache.getAll(keys));
1✔
184
    }
185

186
    @Override
187
    public CompletableFuture<V> refresh(K key) {
188
      requireNonNull(key);
1✔
189

190
      Object keyReference = asyncCache.cache().referenceKey(key);
1✔
191
      for (;;) {
192
        var future = tryOptimisticRefresh(key, keyReference);
1✔
193
        if (future == null) {
1✔
194
          future = tryComputeRefresh(key, keyReference);
1✔
195
        }
196
        if (future != null) {
1✔
197
          return future;
1✔
198
        }
199
      }
1✔
200
    }
201

202
    @Override
203
    public CompletableFuture<Map<K, V>> refreshAll(Iterable<? extends K> keys) {
204
      var result = new LinkedHashMap<K, CompletableFuture<V>>(calculateHashMapCapacity(keys));
1✔
205
      for (K key : keys) {
1✔
206
        result.computeIfAbsent(key, this::refresh);
1✔
207
      }
1✔
208
      return composeResult(result);
1✔
209
    }
210

211
    /** Attempts to avoid a reload if the entry is absent, or a load or reload is in-flight. */
212
    @SuppressWarnings("FutureReturnValueIgnored")
213
    private @Nullable CompletableFuture<V> tryOptimisticRefresh(K key, Object keyReference) {
214
      // If a refresh is in-flight, then return it directly. If completed and not yet removed, then
215
      // remove to trigger a new reload.
216
      @SuppressWarnings("unchecked")
217
      var lastRefresh = (CompletableFuture<V>) asyncCache.cache().refreshes().get(keyReference);
1✔
218
      if (lastRefresh != null) {
1✔
219
        if (Async.isReady(lastRefresh) || asyncCache.cache().isPendingEviction(key)) {
1✔
220
          asyncCache.cache().refreshes().remove(keyReference, lastRefresh);
1✔
221
        } else {
222
          return lastRefresh;
1✔
223
        }
224
      }
225

226
      // If the entry is absent then perform a new load, else if in-flight then return it
227
      var oldValueFuture = asyncCache.cache().getIfPresentQuietly(key);
1✔
228
      if ((oldValueFuture == null)
1✔
229
          || (oldValueFuture.isDone() && oldValueFuture.isCompletedExceptionally())) {
1✔
230
        if (oldValueFuture != null) {
1✔
231
          asyncCache.cache().remove(key, oldValueFuture);
1✔
232
        }
233
        var future = asyncCache.get(key, asyncCache.mappingFunction, /* recordStats */ false);
1✔
234
        @SuppressWarnings("unchecked")
235
        var prior = (CompletableFuture<V>) asyncCache.cache()
1✔
236
            .refreshes().putIfAbsent(keyReference, future);
1✔
237
        var result = (prior == null) ? future : prior;
1✔
238
        result.whenComplete((r, e) -> asyncCache.cache().refreshes().remove(keyReference, result));
1✔
239
        return result;
1✔
240
      } else if (!oldValueFuture.isDone()) {
1✔
241
        // no-op if load is pending
242
        return oldValueFuture;
1✔
243
      }
244

245
      // Fallback to the slow path, possibly retrying
246
      return null;
1✔
247
    }
248

249
    /** Begins a refresh if the entry has materialized and no reload is in-flight. */
250
    @SuppressWarnings("FutureReturnValueIgnored")
251
    private @Nullable CompletableFuture<V> tryComputeRefresh(K key, Object keyReference) {
252
      long[] startTime = new long[1];
1✔
253
      boolean[] refreshed = new boolean[1];
1✔
254
      @SuppressWarnings({"rawtypes", "unchecked"})
255
      CompletableFuture<V>[] oldValueFuture = new CompletableFuture[1];
1✔
256
      var future = asyncCache.cache().refreshes().computeIfAbsent(keyReference, k -> {
1✔
257
        oldValueFuture[0] = asyncCache.cache().getIfPresentQuietly(key);
1✔
258
        V oldValue = Async.getIfReady(oldValueFuture[0]);
1✔
259
        if (oldValue == null) {
1✔
260
          return null;
1✔
261
        }
262

263
        refreshed[0] = true;
1✔
264
        startTime[0] = asyncCache.cache().statsTicker().read();
1✔
265
        try {
266
          var reloadFuture = asyncCache.cacheLoader.asyncReload(
1✔
267
              key, oldValue, asyncCache.cache().executor());
1✔
268
          return requireNonNull(reloadFuture, "Null future");
1✔
269
        } catch (RuntimeException e) {
1✔
270
          throw e;
1✔
271
        } catch (InterruptedException e) {
1✔
272
          Thread.currentThread().interrupt();
1✔
273
          throw new CompletionException(e);
1✔
274
        } catch (Exception e) {
1✔
275
          throw new CompletionException(e);
1✔
276
        }
277
      });
278

279
      if (future == null) {
1✔
280
        // Retry the optimistic path
281
        return null;
1✔
282
      }
283

284
      @SuppressWarnings("unchecked")
285
      var castedFuture = (CompletableFuture<V>) future;
1✔
286
      if (refreshed[0]) {
1✔
287
        castedFuture.whenComplete((newValue, error) -> {
1✔
288
          long loadTime = asyncCache.cache().statsTicker().read() - startTime[0];
1✔
289
          if (error != null) {
1✔
290
            if (!(error instanceof CancellationException) && !(error instanceof TimeoutException)) {
1✔
291
              logger.log(Level.WARNING, "Exception thrown during refresh", error);
1✔
292
            }
293
            asyncCache.cache().refreshes().remove(keyReference, castedFuture);
1✔
294
            asyncCache.cache().statsCounter().recordLoadFailure(loadTime);
1✔
295
            return;
1✔
296
          }
297

298
          boolean[] discard = new boolean[1];
1✔
299
          var value = asyncCache.cache().compute(key, (ignored, currentValue) -> {
1✔
300
            var successful = asyncCache.cache().refreshes().remove(keyReference, castedFuture);
1✔
301
            if (successful && (currentValue == oldValueFuture[0])) {
1✔
302
              if (currentValue == null) {
1✔
303
                // If the entry is absent then discard the refresh and maybe notifying the listener
304
                discard[0] = (newValue != null);
×
305
                return null;
×
306
              } else if ((currentValue == newValue) || (currentValue == castedFuture)) {
1✔
307
                // If the reloaded value is the same instance then no-op
308
                return currentValue;
1✔
309
              } else if (newValue == Async.getIfReady((CompletableFuture<?>) currentValue)) {
1✔
310
                // If the completed futures hold the same value instance then no-op
311
                return currentValue;
1✔
312
              }
313
              return (newValue == null) ? null : castedFuture;
1✔
314
            }
315
            // Otherwise, a write invalidated the refresh so discard it and notify the listener
316
            discard[0] = true;
1✔
317
            return currentValue;
1✔
318
          }, asyncCache.cache().expiry(), /* recordLoad */ false, /* recordLoadFailure */ true);
1✔
319

320
          if (discard[0] && (newValue != null)) {
1✔
321
            var cause = (value == null) ? RemovalCause.EXPLICIT : RemovalCause.REPLACED;
1✔
322
            asyncCache.cache().notifyRemoval(key, castedFuture, cause);
1✔
323
          }
324
          if (newValue == null) {
1✔
325
            asyncCache.cache().statsCounter().recordLoadFailure(loadTime);
1✔
326
          } else {
327
            asyncCache.cache().statsCounter().recordLoadSuccess(loadTime);
1✔
328
          }
329
        });
1✔
330
      }
331
      return castedFuture;
1✔
332
    }
333
  }
334
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc