• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ben-manes / caffeine / #5467

18 May 2026 10:56AM UTC coverage: 99.976% (-0.02%) from 100.0%
#5467

push

github

ben-manes
Use real ΔL/Δw in the gradient climbers; add Correlation climber

The four gradient optimizers (Stochastic SGD, Adam, AmsGrad, Nadam) were
each computing the "gradient" as the change in miss rate between samples,
without dividing by the change in window size. That gave a signal with
the right magnitude but missing direction in w-space: the sign depended
only on whether the miss rate rose or fell, never on whether the last
window move caused that change. As a result, momentum-based variants
could only oscillate, not actually pursue a gradient.

Track the realised window size between sample resets in AbstractClimber
and expose a finite-difference `missRateGradient(hitRate) = ΔL/Δw`
helper. Use it from all four optimizers, flip the sign on the returned
step so we descend (not ascend) the loss, and bootstrap with one
positive probe so a Δw exists before the first gradient call.

Also adds a new `CorrelationClimber` (HillClimberType.CORRELATION,
default-enabled alongside `simple` and `indicator`) inspired by Cacheus'
learning-rate hill climber: uses `sign(Δw · ΔHR)` as a direction signal
and `scale · |Δw|` as step magnitude, so it self-tunes — sustained
correlation accelerates it, |Δw| shrinks as it converges, and a 5-window
degradation counter resets to the initial probe to escape local pits.

Behaviour shift on the bundled LIRS traces: clear wins on scan-heavy
workloads (scan, zigzag, sprite where the broken sign was pointing the
optimizer the wrong way) and on the cold-cache loop@500 case. Small
regressions on mid-trace sizes are expected — `percent-pivot = 0.005`
in reference.conf was implicitly tuned for the old `ΔL`-magnitude
signal, and per-step displacement is now smaller. A retune of that
hyperparameter is a separate follow-up — a sweep over {0.005, 0.05, 0.5,
1.0} showed 0.005 remains the safest default (larger values cliff
catastrophically on some traces, e.g., loop@1000 amsgrad collapses from
94.31% to 19.62% at pivot=0.05).

4004 of 4016 branches covered (99.7%)

8206 of 8208 relevant lines covered (99.98%)

1.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.62
/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java
1
/*
2
 * Copyright 2015 Ben Manes. All Rights Reserved.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package com.github.benmanes.caffeine.cache;
17

18
import static com.github.benmanes.caffeine.cache.Caffeine.calculateHashMapCapacity;
19
import static com.github.benmanes.caffeine.cache.Caffeine.hasMethodOverride;
20
import static com.github.benmanes.caffeine.cache.LocalAsyncCache.composeResult;
21
import static java.util.Objects.requireNonNull;
22

23
import java.lang.System.Logger;
24
import java.lang.System.Logger.Level;
25
import java.util.LinkedHashMap;
26
import java.util.Map;
27
import java.util.Set;
28
import java.util.concurrent.CancellationException;
29
import java.util.concurrent.CompletableFuture;
30
import java.util.concurrent.CompletionException;
31
import java.util.concurrent.Executor;
32
import java.util.concurrent.TimeoutException;
33
import java.util.function.BiFunction;
34
import java.util.function.Function;
35

36
import org.jspecify.annotations.Nullable;
37

38
import com.google.errorprone.annotations.Var;
39

40
/**
41
 * This class provides a skeletal implementation of the {@link AsyncLoadingCache} interface to
42
 * minimize the effort required to implement a {@link LocalCache}.
43
 *
44
 * @author ben.manes@gmail.com (Ben Manes)
45
 */
46
abstract class LocalAsyncLoadingCache<K, V>
47
    implements LocalAsyncCache<K, V>, AsyncLoadingCache<K, V> {
48
  static final Logger logger = System.getLogger(LocalAsyncLoadingCache.class.getName());
1✔
49

50
  final @Nullable BiFunction<? super Set<? extends K>, ? super Executor,
51
      ? extends CompletableFuture<? extends Map<? extends K, ? extends V>>> bulkMappingFunction;
52
  final BiFunction<? super K, ? super Executor,
53
      ? extends CompletableFuture<? extends V>> mappingFunction;
54
  final AsyncCacheLoader<K, V> cacheLoader;
55

56
  @Nullable LoadingCacheView<K, V> cacheView;
57

58
  @SuppressWarnings("unchecked")
59
  LocalAsyncLoadingCache(AsyncCacheLoader<? super K, V> cacheLoader) {
1✔
60
    this.bulkMappingFunction = newBulkMappingFunction(cacheLoader);
1✔
61
    this.cacheLoader = (AsyncCacheLoader<K, V>) cacheLoader;
1✔
62
    this.mappingFunction = newMappingFunction(cacheLoader);
1✔
63
  }
1✔
64

65
  /** Returns a mapping function that adapts to {@link AsyncCacheLoader#asyncLoad}. */
66
  BiFunction<
67
      ? super K,
68
      ? super Executor,
69
      ? extends CompletableFuture<? extends V>> newMappingFunction(
70
          AsyncCacheLoader<? super K, V> cacheLoader) {
71
    return (key, executor) -> {
1✔
72
      try {
73
        return cacheLoader.asyncLoad(key, executor);
1✔
74
      } catch (RuntimeException e) {
1✔
75
        throw e;
1✔
76
      } catch (InterruptedException e) {
1✔
77
        Thread.currentThread().interrupt();
1✔
78
        throw new CompletionException(e);
1✔
79
      } catch (Exception e) {
1✔
80
        throw new CompletionException(e);
1✔
81
      }
82
    };
83
  }
84

85
  /**
86
   * Returns a mapping function that adapts to {@link AsyncCacheLoader#asyncLoadAll}, if
87
   * implemented.
88
   */
89
  @Nullable BiFunction<Set<? extends K>, Executor, CompletableFuture<Map<K, V>>>
90
      newBulkMappingFunction(AsyncCacheLoader<? super K, V> cacheLoader) {
91
    if (!canBulkLoad(cacheLoader)) {
1✔
92
      return null;
1✔
93
    }
94
    return (keysToLoad, executor) -> {
1✔
95
      try {
96
        @SuppressWarnings("unchecked")
97
        var loaded = (CompletableFuture<Map<K, V>>) cacheLoader.asyncLoadAll(keysToLoad, executor);
1✔
98
        return loaded;
1✔
99
      } catch (RuntimeException e) {
1✔
100
        throw e;
1✔
101
      } catch (InterruptedException e) {
1✔
102
        Thread.currentThread().interrupt();
1✔
103
        throw new CompletionException(e);
1✔
104
      } catch (Exception e) {
1✔
105
        throw new CompletionException(e);
1✔
106
      }
107
    };
108
  }
109

110
  /** Returns whether the supplied cache loader has bulk load functionality. */
111
  boolean canBulkLoad(AsyncCacheLoader<?, ?> loader) {
112
    @Var Class<?> defaultLoaderClass = AsyncCacheLoader.class;
1✔
113
    if (loader instanceof CacheLoader<?, ?>) {
1✔
114
      defaultLoaderClass = CacheLoader.class;
1✔
115
      if (hasMethodOverride(defaultLoaderClass, loader, "loadAll", Set.class)) {
1✔
116
        return true;
1✔
117
      }
118
    }
119
    return hasMethodOverride(defaultLoaderClass,
1✔
120
        loader, "asyncLoadAll", Set.class, Executor.class);
121
  }
122

123
  @Override
124
  public CompletableFuture<V> get(K key) {
125
    return get(key, mappingFunction);
1✔
126
  }
127

128
  @Override
129
  public CompletableFuture<Map<K, V>> getAll(Iterable<? extends K> keys) {
130
    if (bulkMappingFunction != null) {
1✔
131
      return getAll(keys, bulkMappingFunction);
1✔
132
    }
133

134
    Function<K, CompletableFuture<V>> mappingFunction = this::get;
1✔
135
    var result = new LinkedHashMap<K, CompletableFuture<@Nullable V>>(
1✔
136
        calculateHashMapCapacity(keys));
1✔
137
    for (K key : keys) {
1✔
138
      var future = result.computeIfAbsent(key, mappingFunction);
1✔
139
      requireNonNull(future);
1✔
140
    }
1✔
141
    return composeResult(result);
1✔
142
  }
143

144
  @Override
145
  public LoadingCache<K, V> synchronous() {
146
    return (cacheView == null) ? (cacheView = new LoadingCacheView<>(this)) : cacheView;
1✔
147
  }
148

149
  /* --------------- Synchronous views --------------- */
150

151
  static final class LoadingCacheView<K, V>
152
      extends AbstractCacheView<K, V> implements LoadingCache<K, V> {
153
    private static final long serialVersionUID = 1L;
154

155
    @SuppressWarnings("serial")
156
    final LocalAsyncLoadingCache<K, V> asyncCache;
157

158
    LoadingCacheView(LocalAsyncLoadingCache<K, V> asyncCache) {
1✔
159
      this.asyncCache = requireNonNull(asyncCache);
1✔
160
    }
1✔
161

162
    @Override
163
    LocalAsyncLoadingCache<K, V> asyncCache() {
164
      return asyncCache;
1✔
165
    }
166

167
    @Override
168
    public V get(K key) {
169
      return resolve(asyncCache.get(key));
1✔
170
    }
171

172
    @Override
173
    public Map<K, V> getAll(Iterable<? extends K> keys) {
174
      return resolve(asyncCache.getAll(keys));
1✔
175
    }
176

177
    @Override
178
    public CompletableFuture<V> refresh(K key) {
179
      requireNonNull(key);
1✔
180

181
      Object keyReference = asyncCache.cache().referenceKey(key);
1✔
182
      for (;;) {
183
        @Var var future = tryOptimisticRefresh(key, keyReference);
1✔
184
        if (future == null) {
1✔
185
          future = tryComputeRefresh(key, keyReference);
1✔
186
        }
187
        if (future != null) {
1✔
188
          return future;
1✔
189
        }
190
      }
1✔
191
    }
192

193
    @Override
194
    public CompletableFuture<Map<K, V>> refreshAll(Iterable<? extends K> keys) {
195
      var result = new LinkedHashMap<K, CompletableFuture<@Nullable V>>(
1✔
196
          calculateHashMapCapacity(keys));
1✔
197
      for (K key : keys) {
1✔
198
        result.computeIfAbsent(key, this::refresh);
1✔
199
      }
1✔
200
      return composeResult(result);
1✔
201
    }
202

203
    /** Attempts to avoid a reload if the entry is absent, or a load or reload is in-flight. */
204
    @SuppressWarnings("FutureReturnValueIgnored")
205
    private @Nullable CompletableFuture<V> tryOptimisticRefresh(K key, Object keyReference) {
206
      // If a refresh is in-flight, then return it directly. If completed and not yet removed, then
207
      // remove to trigger a new reload.
208
      @SuppressWarnings("unchecked")
209
      var lastRefresh = (CompletableFuture<V>) asyncCache.cache().refreshes().get(keyReference);
1✔
210
      if (lastRefresh != null) {
1✔
211
        if (Async.isReady(lastRefresh) || asyncCache.cache().isPendingEviction(key)) {
1✔
212
          asyncCache.cache().refreshes().remove(keyReference, lastRefresh);
1✔
213
        } else {
214
          return lastRefresh;
1✔
215
        }
216
      }
217

218
      // If the entry is absent then perform a new load, else if in-flight then return it
219
      var oldValueFuture = asyncCache.cache().getIfPresentQuietly(key);
1✔
220
      if ((oldValueFuture == null)
1✔
221
          || (oldValueFuture.isDone() && oldValueFuture.isCompletedExceptionally())) {
1✔
222
        if (oldValueFuture != null) {
1✔
223
          asyncCache.cache().remove(key, oldValueFuture);
1✔
224
        }
225
        var future = asyncCache.get(key, asyncCache.mappingFunction, /* recordStats= */ false);
1✔
226
        @SuppressWarnings("unchecked")
227
        var prior = (CompletableFuture<V>) asyncCache.cache()
1✔
228
            .refreshes().putIfAbsent(keyReference, future);
1✔
229
        var result = (prior == null) ? future : prior;
1✔
230
        result.whenComplete((r, e) -> asyncCache.cache().refreshes().remove(keyReference, result));
1✔
231
        return result;
1✔
232
      } else if (!oldValueFuture.isDone()) {
1✔
233
        // no-op if load is pending
234
        return oldValueFuture;
1✔
235
      }
236

237
      // Fallback to the slow path, possibly retrying
238
      return null;
1✔
239
    }
240

241
    /** Begins a refresh if the entry has materialized and no reload is in-flight. */
242
    @SuppressWarnings("FutureReturnValueIgnored")
243
    private @Nullable CompletableFuture<V> tryComputeRefresh(K key, Object keyReference) {
244
      var startTime = new long[1];
1✔
245
      var refreshed = new boolean[1];
1✔
246
      @SuppressWarnings({"rawtypes", "unchecked"})
247
      @Nullable CompletableFuture<V>[] oldValueFuture = new CompletableFuture[1];
1✔
248
      var future = asyncCache.cache().refreshes().computeIfAbsent(keyReference, k -> {
1✔
249
        oldValueFuture[0] = asyncCache.cache().getIfPresentQuietly(key);
1✔
250
        V oldValue = Async.getIfReady(oldValueFuture[0]);
1✔
251
        if (oldValue == null) {
1✔
252
          return null;
1✔
253
        }
254

255
        refreshed[0] = true;
1✔
256
        startTime[0] = asyncCache.cache().statsTicker().read();
1✔
257
        try {
258
          var reloadFuture = asyncCache.cacheLoader.asyncReload(
1✔
259
              key, oldValue, asyncCache.cache().executor());
1✔
260
          return requireNonNull(reloadFuture, "Null future");
1✔
261
        } catch (RuntimeException e) {
1✔
262
          throw e;
1✔
263
        } catch (InterruptedException e) {
1✔
264
          Thread.currentThread().interrupt();
1✔
265
          throw new CompletionException(e);
1✔
266
        } catch (Exception e) {
1✔
267
          throw new CompletionException(e);
1✔
268
        }
269
      });
270

271
      if (future == null) {
1✔
272
        // Retry the optimistic path
273
        return null;
1✔
274
      }
275

276
      @SuppressWarnings("unchecked")
277
      var castedFuture = (CompletableFuture<V>) future;
1✔
278
      if (refreshed[0]) {
1✔
279
        castedFuture.whenComplete((newValue, error) -> {
1✔
280
          long loadTime = asyncCache.cache().statsTicker().read() - startTime[0];
1✔
281
          if (error != null) {
1✔
282
            if (!(error instanceof CancellationException) && !(error instanceof TimeoutException)) {
1✔
283
              logger.log(Level.WARNING, "Exception thrown during refresh", error);
1✔
284
            }
285
            asyncCache.cache().refreshes().remove(keyReference, castedFuture);
1✔
286
            asyncCache.cache().statsCounter().recordLoadFailure(loadTime);
1✔
287
            return;
1✔
288
          }
289

290
          try {
291
            var discard = new boolean[1];
1✔
292
            var hints = new LocalCache.RemapHints();
1✔
293
            var value = asyncCache.cache().compute(key, (ignored, currentValue) -> {
1✔
294
              if (newValue == Async.getIfReady((CompletableFuture<?>) currentValue)) {
1✔
295
                // If the completed futures hold the same value instance then no-op
296
                hints.preserveTimestamps = true;
1✔
297
                return currentValue;
1✔
298
              }
299

300
              var successful = asyncCache.cache().refreshes().remove(keyReference, castedFuture);
1✔
301
              if (successful && (currentValue == oldValueFuture[0])) {
1✔
302
                if (currentValue == castedFuture) {
1!
303
                  // If the reloaded value is the same instance then no-op
304
                  hints.preserveTimestamps = true;
×
305
                  return currentValue;
×
306
                }
307
                return (newValue == null) ? null : castedFuture;
1✔
308
              }
309
              // Otherwise, a write invalidated the refresh so discard it
310
              hints.preserveTimestamps = true;
1✔
311
              discard[0] = true;
1✔
312
              return currentValue;
1✔
313
            }, asyncCache.cache().expiry(), /* recordLoad= */ false,
1✔
314
                /* recordLoadFailure= */ true, hints);
315

316
            if (discard[0] && (newValue != null)) {
1!
317
              var cause = (value == null) ? RemovalCause.EXPLICIT : RemovalCause.REPLACED;
1✔
318
              asyncCache.cache().notifyRemoval(key, castedFuture, cause);
1✔
319
            }
320
            if (newValue == null) {
1✔
321
              asyncCache.cache().statsCounter().recordLoadFailure(loadTime);
1✔
322
            } else {
323
              asyncCache.cache().statsCounter().recordLoadSuccess(loadTime);
1✔
324
            }
325
          } catch (Throwable t) {
1✔
326
            logger.log(Level.WARNING, "Exception thrown during asynchronous load", t);
1✔
327
            asyncCache.cache().statsCounter().recordLoadFailure(loadTime);
1✔
328
            asyncCache.cache().remove(key, castedFuture);
1✔
329
          }
1✔
330
        });
1✔
331
      }
332
      return castedFuture;
1✔
333
    }
334
  }
335
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc