• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ben-manes / caffeine / #5156

03 Dec 2025 03:21AM UTC coverage: 0.0% (-100.0%) from 100.0%
#5156

push

github

ben-manes
add loading type to parameterized test dimensions to reduce task size

0 of 3834 branches covered (0.0%)

0 of 7848 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java
1
/*
2
 * Copyright 2015 Ben Manes. All Rights Reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package com.github.benmanes.caffeine.cache;
17

18
import static com.github.benmanes.caffeine.cache.Caffeine.calculateHashMapCapacity;
19
import static com.github.benmanes.caffeine.cache.Caffeine.hasMethodOverride;
20
import static com.github.benmanes.caffeine.cache.LocalAsyncCache.composeResult;
21
import static java.util.Objects.requireNonNull;
22

23
import java.lang.System.Logger;
24
import java.lang.System.Logger.Level;
25
import java.util.LinkedHashMap;
26
import java.util.Map;
27
import java.util.Set;
28
import java.util.concurrent.CancellationException;
29
import java.util.concurrent.CompletableFuture;
30
import java.util.concurrent.CompletionException;
31
import java.util.concurrent.Executor;
32
import java.util.concurrent.TimeoutException;
33
import java.util.function.BiFunction;
34
import java.util.function.Function;
35

36
import org.jspecify.annotations.Nullable;
37

38
import com.google.errorprone.annotations.Var;
39

40
/**
41
 * This class provides a skeletal implementation of the {@link AsyncLoadingCache} interface to
42
 * minimize the effort required to implement a {@link LocalCache}.
43
 *
44
 * @author ben.manes@gmail.com (Ben Manes)
45
 */
46
abstract class LocalAsyncLoadingCache<K, V>
47
    implements LocalAsyncCache<K, V>, AsyncLoadingCache<K, V> {
48
  static final Logger logger = System.getLogger(LocalAsyncLoadingCache.class.getName());
×
49

50
  final @Nullable BiFunction<? super Set<? extends K>, ? super Executor,
51
      ? extends CompletableFuture<? extends Map<? extends K, ? extends V>>> bulkMappingFunction;
52
  final BiFunction<? super K, ? super Executor,
53
      ? extends CompletableFuture<? extends V>> mappingFunction;
54
  final AsyncCacheLoader<K, V> cacheLoader;
55

56
  @Nullable LoadingCacheView<K, V> cacheView;
57

58
  @SuppressWarnings("unchecked")
59
  LocalAsyncLoadingCache(AsyncCacheLoader<? super K, V> cacheLoader) {
×
60
    this.bulkMappingFunction = newBulkMappingFunction(cacheLoader);
×
61
    this.cacheLoader = (AsyncCacheLoader<K, V>) cacheLoader;
×
62
    this.mappingFunction = newMappingFunction(cacheLoader);
×
63
  }
×
64

65
  /** Returns a mapping function that adapts to {@link AsyncCacheLoader#asyncLoad}. */
66
  BiFunction<
67
      ? super K,
68
      ? super Executor,
69
      ? extends CompletableFuture<? extends V>> newMappingFunction(
70
          AsyncCacheLoader<? super K, V> cacheLoader) {
71
    return (key, executor) -> {
×
72
      try {
73
        return cacheLoader.asyncLoad(key, executor);
×
74
      } catch (RuntimeException e) {
×
75
        throw e;
×
76
      } catch (InterruptedException e) {
×
77
        Thread.currentThread().interrupt();
×
78
        throw new CompletionException(e);
×
79
      } catch (Exception e) {
×
80
        throw new CompletionException(e);
×
81
      }
82
    };
83
  }
84

85
  /**
86
   * Returns a mapping function that adapts to {@link AsyncCacheLoader#asyncLoadAll}, if
87
   * implemented.
88
   */
89
  @Nullable
90
  BiFunction<Set<? extends K>, Executor, CompletableFuture<Map<K, V>>> newBulkMappingFunction(
91
      AsyncCacheLoader<? super K, V> cacheLoader) {
92
    if (!canBulkLoad(cacheLoader)) {
×
93
      return null;
×
94
    }
95
    return (keysToLoad, executor) -> {
×
96
      try {
97
        @SuppressWarnings("unchecked")
98
        var loaded = (CompletableFuture<Map<K, V>>) cacheLoader.asyncLoadAll(keysToLoad, executor);
×
99
        return loaded;
×
100
      } catch (RuntimeException e) {
×
101
        throw e;
×
102
      } catch (InterruptedException e) {
×
103
        Thread.currentThread().interrupt();
×
104
        throw new CompletionException(e);
×
105
      } catch (Exception e) {
×
106
        throw new CompletionException(e);
×
107
      }
108
    };
109
  }
110

111
  /** Returns whether the supplied cache loader has bulk load functionality. */
112
  boolean canBulkLoad(AsyncCacheLoader<?, ?> loader) {
113
    @Var Class<?> defaultLoaderClass = AsyncCacheLoader.class;
×
114
    if (loader instanceof CacheLoader<?, ?>) {
×
115
      defaultLoaderClass = CacheLoader.class;
×
116
      if (hasMethodOverride(defaultLoaderClass, loader, "loadAll", Set.class)) {
×
117
        return true;
×
118
      }
119
    }
120
    return hasMethodOverride(defaultLoaderClass,
×
121
        loader, "asyncLoadAll", Set.class, Executor.class);
122
  }
123

124
  @Override
125
  public CompletableFuture<V> get(K key) {
126
    return get(key, mappingFunction);
×
127
  }
128

129
  @Override
130
  public CompletableFuture<Map<K, V>> getAll(Iterable<? extends K> keys) {
131
    if (bulkMappingFunction != null) {
×
132
      return getAll(keys, bulkMappingFunction);
×
133
    }
134

135
    Function<K, CompletableFuture<V>> mappingFunction = this::get;
×
136
    var result = new LinkedHashMap<K, CompletableFuture<V>>(calculateHashMapCapacity(keys));
×
137
    for (K key : keys) {
×
138
      var future = result.computeIfAbsent(key, mappingFunction);
×
139
      requireNonNull(future);
×
140
    }
×
141
    return composeResult(result);
×
142
  }
143

144
  @Override
145
  public LoadingCache<K, V> synchronous() {
146
    return (cacheView == null) ? (cacheView = new LoadingCacheView<>(this)) : cacheView;
×
147
  }
148

149
  /* --------------- Synchronous views --------------- */
150

151
  static final class LoadingCacheView<K, V>
152
      extends AbstractCacheView<K, V> implements LoadingCache<K, V> {
153
    private static final long serialVersionUID = 1L;
154

155
    @SuppressWarnings("serial")
156
    final LocalAsyncLoadingCache<K, V> asyncCache;
157

158
    LoadingCacheView(LocalAsyncLoadingCache<K, V> asyncCache) {
×
159
      this.asyncCache = requireNonNull(asyncCache);
×
160
    }
×
161

162
    @Override
163
    LocalAsyncLoadingCache<K, V> asyncCache() {
164
      return asyncCache;
×
165
    }
166

167
    @Override
168
    public V get(K key) {
169
      return resolve(asyncCache.get(key));
×
170
    }
171

172
    @Override
173
    public Map<K, V> getAll(Iterable<? extends K> keys) {
174
      return resolve(asyncCache.getAll(keys));
×
175
    }
176

177
    @Override
178
    public CompletableFuture<V> refresh(K key) {
179
      requireNonNull(key);
×
180

181
      Object keyReference = asyncCache.cache().referenceKey(key);
×
182
      for (;;) {
183
        @Var var future = tryOptimisticRefresh(key, keyReference);
×
184
        if (future == null) {
×
185
          future = tryComputeRefresh(key, keyReference);
×
186
        }
187
        if (future != null) {
×
188
          return future;
×
189
        }
190
      }
×
191
    }
192

193
    @Override
194
    public CompletableFuture<Map<K, V>> refreshAll(Iterable<? extends K> keys) {
195
      var result = new LinkedHashMap<K, CompletableFuture<V>>(calculateHashMapCapacity(keys));
×
196
      for (K key : keys) {
×
197
        result.computeIfAbsent(key, this::refresh);
×
198
      }
×
199
      return composeResult(result);
×
200
    }
201

202
    /** Attempts to avoid a reload if the entry is absent, or a load or reload is in-flight. */
203
    @SuppressWarnings("FutureReturnValueIgnored")
204
    private @Nullable CompletableFuture<V> tryOptimisticRefresh(K key, Object keyReference) {
205
      // If a refresh is in-flight, then return it directly. If completed and not yet removed, then
206
      // remove to trigger a new reload.
207
      @SuppressWarnings("unchecked")
208
      var lastRefresh = (CompletableFuture<V>) asyncCache.cache().refreshes().get(keyReference);
×
209
      if (lastRefresh != null) {
×
210
        if (Async.isReady(lastRefresh) || asyncCache.cache().isPendingEviction(key)) {
×
211
          asyncCache.cache().refreshes().remove(keyReference, lastRefresh);
×
212
        } else {
213
          return lastRefresh;
×
214
        }
215
      }
216

217
      // If the entry is absent then perform a new load, else if in-flight then return it
218
      var oldValueFuture = asyncCache.cache().getIfPresentQuietly(key);
×
219
      if ((oldValueFuture == null)
×
220
          || (oldValueFuture.isDone() && oldValueFuture.isCompletedExceptionally())) {
×
221
        if (oldValueFuture != null) {
×
222
          asyncCache.cache().remove(key, oldValueFuture);
×
223
        }
224
        var future = asyncCache.get(key, asyncCache.mappingFunction, /* recordStats= */ false);
×
225
        @SuppressWarnings("unchecked")
226
        var prior = (CompletableFuture<V>) asyncCache.cache()
×
227
            .refreshes().putIfAbsent(keyReference, future);
×
228
        var result = (prior == null) ? future : prior;
×
229
        result.whenComplete((r, e) -> asyncCache.cache().refreshes().remove(keyReference, result));
×
230
        return result;
×
231
      } else if (!oldValueFuture.isDone()) {
×
232
        // no-op if load is pending
233
        return oldValueFuture;
×
234
      }
235

236
      // Fallback to the slow path, possibly retrying
237
      return null;
×
238
    }
239

240
    /** Begins a refresh if the entry has materialized and no reload is in-flight. */
241
    @SuppressWarnings("FutureReturnValueIgnored")
242
    private @Nullable CompletableFuture<V> tryComputeRefresh(K key, Object keyReference) {
243
      long[] startTime = new long[1];
×
244
      boolean[] refreshed = new boolean[1];
×
245
      @SuppressWarnings({"rawtypes", "unchecked"})
246
      @Nullable CompletableFuture<V>[] oldValueFuture = new CompletableFuture[1];
×
247
      var future = asyncCache.cache().refreshes().computeIfAbsent(keyReference, k -> {
×
248
        oldValueFuture[0] = asyncCache.cache().getIfPresentQuietly(key);
×
249
        V oldValue = Async.getIfReady(oldValueFuture[0]);
×
250
        if (oldValue == null) {
×
251
          return null;
×
252
        }
253

254
        refreshed[0] = true;
×
255
        startTime[0] = asyncCache.cache().statsTicker().read();
×
256
        try {
257
          var reloadFuture = asyncCache.cacheLoader.asyncReload(
×
258
              key, oldValue, asyncCache.cache().executor());
×
259
          return requireNonNull(reloadFuture, "Null future");
×
260
        } catch (RuntimeException e) {
×
261
          throw e;
×
262
        } catch (InterruptedException e) {
×
263
          Thread.currentThread().interrupt();
×
264
          throw new CompletionException(e);
×
265
        } catch (Exception e) {
×
266
          throw new CompletionException(e);
×
267
        }
268
      });
269

270
      if (future == null) {
×
271
        // Retry the optimistic path
272
        return null;
×
273
      }
274

275
      @SuppressWarnings("unchecked")
276
      var castedFuture = (CompletableFuture<V>) future;
×
277
      if (refreshed[0]) {
×
278
        castedFuture.whenComplete((newValue, error) -> {
×
279
          long loadTime = asyncCache.cache().statsTicker().read() - startTime[0];
×
280
          if (error != null) {
×
281
            if (!(error instanceof CancellationException) && !(error instanceof TimeoutException)) {
×
282
              logger.log(Level.WARNING, "Exception thrown during refresh", error);
×
283
            }
284
            asyncCache.cache().refreshes().remove(keyReference, castedFuture);
×
285
            asyncCache.cache().statsCounter().recordLoadFailure(loadTime);
×
286
            return;
×
287
          }
288

289
          try {
290
            boolean[] discard = new boolean[1];
×
291
            var value = asyncCache.cache().compute(key, (ignored, currentValue) -> {
×
292
              var successful = asyncCache.cache().refreshes().remove(keyReference, castedFuture);
×
293
              if (successful && (currentValue == oldValueFuture[0])) {
×
294
                if (currentValue == castedFuture) {
×
295
                  // If the reloaded value is the same instance then no-op
296
                  return currentValue;
×
297
                } else if (newValue == Async.getIfReady((CompletableFuture<?>) currentValue)) {
×
298
                  // If the completed futures hold the same value instance then no-op
299
                  return currentValue;
×
300
                }
301
                return (newValue == null) ? null : castedFuture;
×
302
              }
303
              // Otherwise, a write invalidated the refresh so discard it and notify the listener
304
              discard[0] = true;
×
305
              return currentValue;
×
306
            }, asyncCache.cache().expiry(), /* recordLoad= */ false, /* recordLoadFailure= */ true);
×
307

308
            if (discard[0] && (newValue != null)) {
×
309
              var cause = (value == null) ? RemovalCause.EXPLICIT : RemovalCause.REPLACED;
×
310
              asyncCache.cache().notifyRemoval(key, castedFuture, cause);
×
311
            }
312
            if (newValue == null) {
×
313
              asyncCache.cache().statsCounter().recordLoadFailure(loadTime);
×
314
            } else {
315
              asyncCache.cache().statsCounter().recordLoadSuccess(loadTime);
×
316
            }
317
          } catch (Throwable t) {
×
318
            logger.log(Level.WARNING, "Exception thrown during asynchronous load", t);
×
319
            asyncCache.cache().statsCounter().recordLoadFailure(loadTime);
×
320
            asyncCache.cache().remove(key, castedFuture);
×
321
          }
×
322
        });
×
323
      }
324
      return castedFuture;
×
325
    }
326
  }
327
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc