• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ben-manes / caffeine / #5173

29 Dec 2025 05:27AM UTC coverage: 0.0% (-100.0%) from 100.0%
#5173

push

github

ben-manes
speed up development ci build

0 of 3838 branches covered (0.0%)

0 of 7869 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java
1
/*
2
 * Copyright 2015 Ben Manes. All Rights Reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package com.github.benmanes.caffeine.cache;
17

18
import static com.github.benmanes.caffeine.cache.Caffeine.calculateHashMapCapacity;
19
import static com.github.benmanes.caffeine.cache.Caffeine.hasMethodOverride;
20
import static com.github.benmanes.caffeine.cache.LocalAsyncCache.composeResult;
21
import static java.util.Objects.requireNonNull;
22

23
import java.lang.System.Logger;
24
import java.lang.System.Logger.Level;
25
import java.util.LinkedHashMap;
26
import java.util.Map;
27
import java.util.Set;
28
import java.util.concurrent.CancellationException;
29
import java.util.concurrent.CompletableFuture;
30
import java.util.concurrent.CompletionException;
31
import java.util.concurrent.Executor;
32
import java.util.concurrent.TimeoutException;
33
import java.util.function.BiFunction;
34
import java.util.function.Function;
35

36
import org.jspecify.annotations.Nullable;
37

38
import com.google.errorprone.annotations.Var;
39

40
/**
41
 * This class provides a skeletal implementation of the {@link AsyncLoadingCache} interface to
42
 * minimize the effort required to implement a {@link LocalCache}.
43
 *
44
 * @author ben.manes@gmail.com (Ben Manes)
45
 */
46
abstract class LocalAsyncLoadingCache<K, V>
47
    implements LocalAsyncCache<K, V>, AsyncLoadingCache<K, V> {
48
  static final Logger logger = System.getLogger(LocalAsyncLoadingCache.class.getName());
×
49

50
  final @Nullable BiFunction<? super Set<? extends K>, ? super Executor,
51
      ? extends CompletableFuture<? extends Map<? extends K, ? extends V>>> bulkMappingFunction;
52
  final BiFunction<? super K, ? super Executor,
53
      ? extends CompletableFuture<? extends V>> mappingFunction;
54
  final AsyncCacheLoader<K, V> cacheLoader;
55

56
  @Nullable LoadingCacheView<K, V> cacheView;
57

58
  @SuppressWarnings("unchecked")
59
  LocalAsyncLoadingCache(AsyncCacheLoader<? super K, V> cacheLoader) {
×
60
    this.bulkMappingFunction = newBulkMappingFunction(cacheLoader);
×
61
    this.cacheLoader = (AsyncCacheLoader<K, V>) cacheLoader;
×
62
    this.mappingFunction = newMappingFunction(cacheLoader);
×
63
  }
×
64

65
  /** Returns a mapping function that adapts to {@link AsyncCacheLoader#asyncLoad}. */
66
  BiFunction<
67
      ? super K,
68
      ? super Executor,
69
      ? extends CompletableFuture<? extends V>> newMappingFunction(
70
          AsyncCacheLoader<? super K, V> cacheLoader) {
71
    return (key, executor) -> {
×
72
      try {
73
        return cacheLoader.asyncLoad(key, executor);
×
74
      } catch (RuntimeException e) {
×
75
        throw e;
×
76
      } catch (InterruptedException e) {
×
77
        Thread.currentThread().interrupt();
×
78
        throw new CompletionException(e);
×
79
      } catch (Exception e) {
×
80
        throw new CompletionException(e);
×
81
      }
82
    };
83
  }
84

85
  /**
86
   * Returns a mapping function that adapts to {@link AsyncCacheLoader#asyncLoadAll}, if
87
   * implemented.
88
   */
89
  @Nullable
90
  BiFunction<Set<? extends K>, Executor, CompletableFuture<Map<K, V>>> newBulkMappingFunction(
91
      AsyncCacheLoader<? super K, V> cacheLoader) {
92
    if (!canBulkLoad(cacheLoader)) {
×
93
      return null;
×
94
    }
95
    return (keysToLoad, executor) -> {
×
96
      try {
97
        @SuppressWarnings("unchecked")
98
        var loaded = (CompletableFuture<Map<K, V>>) cacheLoader.asyncLoadAll(keysToLoad, executor);
×
99
        return loaded;
×
100
      } catch (RuntimeException e) {
×
101
        throw e;
×
102
      } catch (InterruptedException e) {
×
103
        Thread.currentThread().interrupt();
×
104
        throw new CompletionException(e);
×
105
      } catch (Exception e) {
×
106
        throw new CompletionException(e);
×
107
      }
108
    };
109
  }
110

111
  /** Returns whether the supplied cache loader has bulk load functionality. */
112
  boolean canBulkLoad(AsyncCacheLoader<?, ?> loader) {
113
    @Var Class<?> defaultLoaderClass = AsyncCacheLoader.class;
×
114
    if (loader instanceof CacheLoader<?, ?>) {
×
115
      defaultLoaderClass = CacheLoader.class;
×
116
      if (hasMethodOverride(defaultLoaderClass, loader, "loadAll", Set.class)) {
×
117
        return true;
×
118
      }
119
    }
120
    return hasMethodOverride(defaultLoaderClass,
×
121
        loader, "asyncLoadAll", Set.class, Executor.class);
122
  }
123

124
  @Override
125
  public CompletableFuture<V> get(K key) {
126
    return get(key, mappingFunction);
×
127
  }
128

129
  @Override
130
  public CompletableFuture<Map<K, V>> getAll(Iterable<? extends K> keys) {
131
    if (bulkMappingFunction != null) {
×
132
      return getAll(keys, bulkMappingFunction);
×
133
    }
134

135
    Function<K, CompletableFuture<V>> mappingFunction = this::get;
×
136
    var result = new LinkedHashMap<K, CompletableFuture<@Nullable V>>(
×
137
        calculateHashMapCapacity(keys));
×
138
    for (K key : keys) {
×
139
      var future = result.computeIfAbsent(key, mappingFunction);
×
140
      requireNonNull(future);
×
141
    }
×
142
    return composeResult(result);
×
143
  }
144

145
  @Override
146
  public LoadingCache<K, V> synchronous() {
147
    return (cacheView == null) ? (cacheView = new LoadingCacheView<>(this)) : cacheView;
×
148
  }
149

150
  /* --------------- Synchronous views --------------- */
151

152
  static final class LoadingCacheView<K, V>
153
      extends AbstractCacheView<K, V> implements LoadingCache<K, V> {
154
    private static final long serialVersionUID = 1L;
155

156
    @SuppressWarnings("serial")
157
    final LocalAsyncLoadingCache<K, V> asyncCache;
158

159
    LoadingCacheView(LocalAsyncLoadingCache<K, V> asyncCache) {
×
160
      this.asyncCache = requireNonNull(asyncCache);
×
161
    }
×
162

163
    @Override
164
    LocalAsyncLoadingCache<K, V> asyncCache() {
165
      return asyncCache;
×
166
    }
167

168
    @Override
169
    public V get(K key) {
170
      return resolve(asyncCache.get(key));
×
171
    }
172

173
    @Override
174
    public Map<K, V> getAll(Iterable<? extends K> keys) {
175
      return resolve(asyncCache.getAll(keys));
×
176
    }
177

178
    @Override
179
    public CompletableFuture<V> refresh(K key) {
180
      requireNonNull(key);
×
181

182
      Object keyReference = asyncCache.cache().referenceKey(key);
×
183
      for (;;) {
184
        @Var var future = tryOptimisticRefresh(key, keyReference);
×
185
        if (future == null) {
×
186
          future = tryComputeRefresh(key, keyReference);
×
187
        }
188
        if (future != null) {
×
189
          return future;
×
190
        }
191
      }
×
192
    }
193

194
    @Override
195
    public CompletableFuture<Map<K, V>> refreshAll(Iterable<? extends K> keys) {
196
      var result = new LinkedHashMap<K, CompletableFuture<@Nullable V>>(
×
197
          calculateHashMapCapacity(keys));
×
198
      for (K key : keys) {
×
199
        result.computeIfAbsent(key, this::refresh);
×
200
      }
×
201
      return composeResult(result);
×
202
    }
203

204
    /** Attempts to avoid a reload if the entry is absent, or a load or reload is in-flight. */
205
    @SuppressWarnings("FutureReturnValueIgnored")
206
    private @Nullable CompletableFuture<V> tryOptimisticRefresh(K key, Object keyReference) {
207
      // If a refresh is in-flight, then return it directly. If completed and not yet removed, then
208
      // remove to trigger a new reload.
209
      @SuppressWarnings("unchecked")
210
      var lastRefresh = (CompletableFuture<V>) asyncCache.cache().refreshes().get(keyReference);
×
211
      if (lastRefresh != null) {
×
212
        if (Async.isReady(lastRefresh) || asyncCache.cache().isPendingEviction(key)) {
×
213
          asyncCache.cache().refreshes().remove(keyReference, lastRefresh);
×
214
        } else {
215
          return lastRefresh;
×
216
        }
217
      }
218

219
      // If the entry is absent then perform a new load, else if in-flight then return it
220
      var oldValueFuture = asyncCache.cache().getIfPresentQuietly(key);
×
221
      if ((oldValueFuture == null)
×
222
          || (oldValueFuture.isDone() && oldValueFuture.isCompletedExceptionally())) {
×
223
        if (oldValueFuture != null) {
×
224
          asyncCache.cache().remove(key, oldValueFuture);
×
225
        }
226
        var future = asyncCache.get(key, asyncCache.mappingFunction, /* recordStats= */ false);
×
227
        @SuppressWarnings("unchecked")
228
        var prior = (CompletableFuture<V>) asyncCache.cache()
×
229
            .refreshes().putIfAbsent(keyReference, future);
×
230
        var result = (prior == null) ? future : prior;
×
231
        result.whenComplete((r, e) -> asyncCache.cache().refreshes().remove(keyReference, result));
×
232
        return result;
×
233
      } else if (!oldValueFuture.isDone()) {
×
234
        // no-op if load is pending
235
        return oldValueFuture;
×
236
      }
237

238
      // Fallback to the slow path, possibly retrying
239
      return null;
×
240
    }
241

242
    /** Begins a refresh if the entry has materialized and no reload is in-flight. */
243
    @SuppressWarnings("FutureReturnValueIgnored")
244
    private @Nullable CompletableFuture<V> tryComputeRefresh(K key, Object keyReference) {
245
      long[] startTime = new long[1];
×
246
      boolean[] refreshed = new boolean[1];
×
247
      @SuppressWarnings({"rawtypes", "unchecked"})
248
      @Nullable CompletableFuture<V>[] oldValueFuture = new CompletableFuture[1];
×
249
      var future = asyncCache.cache().refreshes().computeIfAbsent(keyReference, k -> {
×
250
        oldValueFuture[0] = asyncCache.cache().getIfPresentQuietly(key);
×
251
        V oldValue = Async.getIfReady(oldValueFuture[0]);
×
252
        if (oldValue == null) {
×
253
          return null;
×
254
        }
255

256
        refreshed[0] = true;
×
257
        startTime[0] = asyncCache.cache().statsTicker().read();
×
258
        try {
259
          var reloadFuture = asyncCache.cacheLoader.asyncReload(
×
260
              key, oldValue, asyncCache.cache().executor());
×
261
          return requireNonNull(reloadFuture, "Null future");
×
262
        } catch (RuntimeException e) {
×
263
          throw e;
×
264
        } catch (InterruptedException e) {
×
265
          Thread.currentThread().interrupt();
×
266
          throw new CompletionException(e);
×
267
        } catch (Exception e) {
×
268
          throw new CompletionException(e);
×
269
        }
270
      });
271

272
      if (future == null) {
×
273
        // Retry the optimistic path
274
        return null;
×
275
      }
276

277
      @SuppressWarnings("unchecked")
278
      var castedFuture = (CompletableFuture<V>) future;
×
279
      if (refreshed[0]) {
×
280
        castedFuture.whenComplete((newValue, error) -> {
×
281
          long loadTime = asyncCache.cache().statsTicker().read() - startTime[0];
×
282
          if (error != null) {
×
283
            if (!(error instanceof CancellationException) && !(error instanceof TimeoutException)) {
×
284
              logger.log(Level.WARNING, "Exception thrown during refresh", error);
×
285
            }
286
            asyncCache.cache().refreshes().remove(keyReference, castedFuture);
×
287
            asyncCache.cache().statsCounter().recordLoadFailure(loadTime);
×
288
            return;
×
289
          }
290

291
          try {
292
            boolean[] discard = new boolean[1];
×
293
            var value = asyncCache.cache().compute(key, (ignored, currentValue) -> {
×
294
              var successful = asyncCache.cache().refreshes().remove(keyReference, castedFuture);
×
295
              if (successful && (currentValue == oldValueFuture[0])) {
×
296
                if (currentValue == castedFuture) {
×
297
                  // If the reloaded value is the same instance then no-op
298
                  return currentValue;
×
299
                } else if (newValue == Async.getIfReady((CompletableFuture<?>) currentValue)) {
×
300
                  // If the completed futures hold the same value instance then no-op
301
                  return currentValue;
×
302
                }
303
                return (newValue == null) ? null : castedFuture;
×
304
              }
305
              // Otherwise, a write invalidated the refresh so discard it and notify the listener
306
              discard[0] = true;
×
307
              return currentValue;
×
308
            }, asyncCache.cache().expiry(), /* recordLoad= */ false, /* recordLoadFailure= */ true);
×
309

310
            if (discard[0] && (newValue != null)) {
×
311
              var cause = (value == null) ? RemovalCause.EXPLICIT : RemovalCause.REPLACED;
×
312
              asyncCache.cache().notifyRemoval(key, castedFuture, cause);
×
313
            }
314
            if (newValue == null) {
×
315
              asyncCache.cache().statsCounter().recordLoadFailure(loadTime);
×
316
            } else {
317
              asyncCache.cache().statsCounter().recordLoadSuccess(loadTime);
×
318
            }
319
          } catch (Throwable t) {
×
320
            logger.log(Level.WARNING, "Exception thrown during asynchronous load", t);
×
321
            asyncCache.cache().statsCounter().recordLoadFailure(loadTime);
×
322
            asyncCache.cache().remove(key, castedFuture);
×
323
          }
×
324
        });
×
325
      }
326
      return castedFuture;
×
327
    }
328
  }
329
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc