• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mizosoft / methanol / #592

10 Sep 2025 10:51AM UTC coverage: 88.99% (-0.06%) from 89.053%
#592

push

github

mizosoft
Upgrade Kotlin version

2325 of 2796 branches covered (83.15%)

Branch coverage included in aggregate %.

7633 of 8394 relevant lines covered (90.93%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.93
/methanol/src/main/java/com/github/mizosoft/methanol/internal/cache/DiskStore.java
1
/*
2
 * Copyright (c) 2025 Moataz Hussein
3
 *
4
 * Permission is hereby granted, free of charge, to any person obtaining a copy
5
 * of this software and associated documentation files (the "Software"), to deal
6
 * in the Software without restriction, including without limitation the rights
7
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
 * copies of the Software, and to permit persons to whom the Software is
9
 * furnished to do so, subject to the following conditions:
10
 *
11
 * The above copyright notice and this permission notice shall be included in all
12
 * copies or substantial portions of the Software.
13
 *
14
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20
 * SOFTWARE.
21
 */
22

23
package com.github.mizosoft.methanol.internal.cache;
24

25
import static com.github.mizosoft.methanol.internal.Utils.requireNonNegativeDuration;
26
import static com.github.mizosoft.methanol.internal.Validate.castNonNull;
27
import static com.github.mizosoft.methanol.internal.Validate.requireArgument;
28
import static com.github.mizosoft.methanol.internal.Validate.requireState;
29
import static java.nio.charset.StandardCharsets.UTF_8;
30
import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
31
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
32
import static java.nio.file.StandardOpenOption.CREATE;
33
import static java.nio.file.StandardOpenOption.READ;
34
import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
35
import static java.nio.file.StandardOpenOption.WRITE;
36
import static java.util.Objects.requireNonNull;
37
import static java.util.Objects.requireNonNullElse;
38

39
import com.github.mizosoft.methanol.internal.DebugUtils;
40
import com.github.mizosoft.methanol.internal.Utils;
41
import com.github.mizosoft.methanol.internal.concurrent.Delayer;
42
import com.github.mizosoft.methanol.internal.concurrent.SerialExecutor;
43
import com.github.mizosoft.methanol.internal.function.ThrowingRunnable;
44
import com.github.mizosoft.methanol.internal.function.Unchecked;
45
import com.github.mizosoft.methanol.internal.util.Compare;
46
import com.google.errorprone.annotations.CanIgnoreReturnValue;
47
import com.google.errorprone.annotations.concurrent.GuardedBy;
48
import java.io.Closeable;
49
import java.io.EOFException;
50
import java.io.IOException;
51
import java.io.InterruptedIOException;
52
import java.io.UncheckedIOException;
53
import java.lang.System.Logger;
54
import java.lang.System.Logger.Level;
55
import java.lang.invoke.MethodHandles;
56
import java.lang.invoke.VarHandle;
57
import java.nio.ByteBuffer;
58
import java.nio.channels.FileChannel;
59
import java.nio.file.AccessDeniedException;
60
import java.nio.file.DirectoryIteratorException;
61
import java.nio.file.FileAlreadyExistsException;
62
import java.nio.file.Files;
63
import java.nio.file.NoSuchFileException;
64
import java.nio.file.Path;
65
import java.security.MessageDigest;
66
import java.security.NoSuchAlgorithmException;
67
import java.time.Clock;
68
import java.time.Duration;
69
import java.time.Instant;
70
import java.util.ArrayList;
71
import java.util.Collection;
72
import java.util.Collections;
73
import java.util.Comparator;
74
import java.util.HashMap;
75
import java.util.HashSet;
76
import java.util.Iterator;
77
import java.util.List;
78
import java.util.Map;
79
import java.util.NoSuchElementException;
80
import java.util.Optional;
81
import java.util.Set;
82
import java.util.TreeMap;
83
import java.util.concurrent.CompletableFuture;
84
import java.util.concurrent.ConcurrentHashMap;
85
import java.util.concurrent.Executor;
86
import java.util.concurrent.Phaser;
87
import java.util.concurrent.ThreadLocalRandom;
88
import java.util.concurrent.atomic.AtomicBoolean;
89
import java.util.concurrent.atomic.AtomicInteger;
90
import java.util.concurrent.atomic.AtomicLong;
91
import java.util.concurrent.locks.Lock;
92
import java.util.concurrent.locks.ReadWriteLock;
93
import java.util.concurrent.locks.ReentrantLock;
94
import java.util.concurrent.locks.ReentrantReadWriteLock;
95
import java.util.function.Supplier;
96
import java.util.zip.CRC32C;
97
import org.checkerframework.checker.nullness.qual.EnsuresNonNullIf;
98
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
99
import org.checkerframework.checker.nullness.qual.Nullable;
100

101
/**
102
 * A persistent {@link Store} implementation that saves entries on disk under a specified directory.
103
 * A {@code DiskStore} instance assumes exclusive ownership of its directory; only a single {@code
104
 * DiskStore} from a single JVM process can safely operate on a given directory. This assumption is
105
 * cooperatively enforced among {@code DiskStore} instances such that attempting to initialize a
106
 * store with a directory that is in use by another store in the same or a different JVM process
107
 * will cause an {@code IOException} to be thrown.
108
 *
109
 * <p>The store keeps track of entries known to it across sessions by maintaining an on-disk
110
 * hashtable called the index. As changes are made to the store by adding, accessing or removing
111
 * entries, the index is transparently updated in a time-limited manner. By default, there's at most
112
 * one index update every 2 seconds. This rate can be changed by setting the system property: {@code
113
 * com.github.mizosoft.methanol.internal.cache.DiskStore.indexUpdateDelayMillis}. Setting a small
114
 * delay can result in too often index updates, which extracts a noticeable toll on IO and CPU,
115
 * especially if there's a relatively large number of entries (updating entails reconstructing then
116
 * rewriting the whole index). On the other hand, scarcely updating the index affords less
117
 * durability against crashes as entries that aren't indexed are dropped on initialization. Calling
118
 * the {@code flush} method forces an index update, regardless of the time limit.
119
 *
120
 * <p>To ensure entries are not lost across sessions, a store must be {@link #close() closed} after
121
 * it has been done with. The {@link #dispose()} method can be called to atomically close the store
122
 * and clear its directory if persistence isn't needed (e.g. using temp directories for storage). A
123
 * closed store usually throws an {@code IllegalStateException} when used.
124
 */
125
public final class DiskStore implements Store, TestableStore {
126
  /*
127
   * The store's layout on disk is as follows:
128
   *
129
   *   - An 'index' file.
130
   *   - A corresponding file for each entry with its name being the hex string of the first 80
131
   *     bits of the key's SHA-245, concatenated to the suffix '.ch3oh'.
132
   *   - A '.lock' indicating that the directory is currently being used.
133
   *
134
   * The index and entry files are formatted as follows (in slang BNF):
135
   *
136
   *   <index> = <index-header> <index-entry>*
137
   *   <index-header> = 8-bytes-index-magic
138
   *                    4-bytes-store-version
139
   *                    4-bytes-app-version
140
   *                    8-bytes-entry-count
141
   *   <index-entry> = 10-bytes-entry-hash
142
   *                   8-bytes-last-used-millis (maintained for LRU eviction)
143
   *                   8-bytes-entry-size
144
   *
145
   *   <entry> = <data> <entry-epilogue>
146
   *   <data> = byte*
147
   *   <entry-epilogue> = <key> <metadata> <entry-trailer>
148
   *   <key> = utf8-byte*
149
   *   <metadata> = byte*
150
   *   <entry-trailer> = 8-bytes-entry-magic
151
   *                     4-bytes-store-version
152
   *                     4-bytes-app-version
153
   *                     4-bytes-key-size
154
   *                     4-bytes-metadata-size
155
   *                     8-bytes-data-size
156
   *                     4-bytes-data-crc32c
157
   *                     4-bytes-epilogue-crc32c
158
   *
159
   * Having the key, metadata & their sizes at the end of the file makes it easier and quicker to
160
   * update an entry when only its metadata block changes (and possibly its key in case there's a
161
   * hash collision). In such case, an entry update only overwrites <entry-epilogue> next to an
162
   * existing <data>, truncating the file if necessary. Having an <entry-trailer> instead of an
163
   * <entry-header> allows validating the entry file and knowing its key & metadata sizes in a
164
   * single read.
165
   *
166
   * An effort is made to ensure store operations on disk are atomic. Index and entry writers first
167
   * do their work on a temp file. After they're done, the previous version of the file, if any, is
168
   * atomically replaced. Index writes ensure there's a channel::force before the atomic move. Viewers
169
   * opened for an entry see a constant snapshot of that entry's data even if the entry is removed or
170
   * edited one or more times.
171
   */
172

173
  private static final Logger logger = System.getLogger(DiskStore.class.getName());
1✔
174

175
  /** Indicates whether a task is currently being run by the index executor. Used for debugging. */
176
  private static final ThreadLocal<Boolean> isIndexExecutor = ThreadLocal.withInitial(() -> false);
1✔
177

178
  /**
179
   * The max number of entries an index file can contain. This caps on what to be read from the
180
   * index so that an {@code OutOfMemoryError} is not thrown when reading some corrupt index file.
181
   */
182
  private static final int MAX_ENTRY_COUNT = 1_000_000;
183

184
  static final long INDEX_MAGIC = 0x6d657468616e6f6cL;
185
  static final long ENTRY_MAGIC = 0x7b6368332d6f687dL;
186
  static final int STORE_VERSION = 2;
187
  static final int INDEX_HEADER_SIZE = 2 * Long.BYTES + 2 * Integer.BYTES;
188
  static final int INDEX_ENTRY_SIZE = Hash.BYTES + 2 * Long.BYTES;
189
  static final int ENTRY_TRAILER_SIZE = 2 * Long.BYTES + 6 * Integer.BYTES;
190
  static final long INT_MASK = 0xffffffffL;
191
  static final int SHORT_MASK = 0xffff;
192

193
  static final String LOCK_FILENAME = ".lock";
194
  static final String INDEX_FILENAME = "index";
195
  static final String TEMP_INDEX_FILENAME = "index.tmp";
196
  static final String ENTRY_FILE_SUFFIX = ".ch3oh";
197
  static final String TEMP_ENTRY_FILE_SUFFIX = ".ch3oh.tmp";
198
  static final String ISOLATED_FILE_PREFIX = "RIP_";
199

200
  private final long maxSize;
201
  private final int appVersion;
202
  private final Path directory;
203
  private final Hasher hasher;
204
  private final SerialExecutor indexExecutor;
205
  private final IndexOperator indexOperator;
206
  private final IndexWriteScheduler indexWriteScheduler;
207
  private final EvictionScheduler evictionScheduler;
208
  private final DirectoryLock directoryLock;
209
  private final ConcurrentHashMap<Hash, Entry> entries = new ConcurrentHashMap<>();
1✔
210
  private final AtomicLong size = new AtomicLong();
1✔
211

212
  /**
213
   * A monotonic clock used for ordering entries based on recency. The clock is not completely
214
   * monotonic, however, as the clock value can overflow. But a signed long gives us about 300 years
215
   * of monotonicity assuming the clock is incremented every 1 ns, which is not bad at all.
216
   */
217
  private final AtomicLong lruClock = new AtomicLong();
1✔
218

219
  private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
1✔
220

221
  @GuardedBy("closeLock")
222
  private boolean closed;
223

224
  private DiskStore(Builder builder, boolean debugIndexOps) throws IOException {
1✔
225
    maxSize = builder.maxSize();
1✔
226
    appVersion = builder.appVersion();
1✔
227
    directory = builder.directory();
1✔
228
    hasher = builder.hasher();
1✔
229
    indexExecutor =
1✔
230
        new SerialExecutor(
231
            debugIndexOps
1!
232
                ? toDebuggingIndexExecutorDelegate(builder.executor())
1✔
233
                : builder.executor());
1✔
234
    indexOperator =
1✔
235
        debugIndexOps
1!
236
            ? new DebugIndexOperator(directory, appVersion)
1✔
237
            : new IndexOperator(directory, appVersion);
1✔
238
    indexWriteScheduler =
1✔
239
        new IndexWriteScheduler(
240
            indexOperator,
241
            indexExecutor,
242
            this::indexEntriesSnapshot,
243
            builder.indexUpdateDelay(),
1✔
244
            builder.delayer(),
1✔
245
            builder.clock());
1✔
246
    evictionScheduler = new EvictionScheduler(this, builder.executor());
1✔
247

248
    if (debugIndexOps) {
1!
249
      isIndexExecutor.set(true);
1✔
250
    }
251
    try {
252
      directoryLock = initialize();
1✔
253
    } finally {
254
      if (debugIndexOps) {
1!
255
        isIndexExecutor.set(false);
1✔
256
      }
257
    }
258
  }
1✔
259

260
  Clock clock() {
261
    return indexWriteScheduler.clock();
1✔
262
  }
263

264
  Delayer delayer() {
265
    return indexWriteScheduler.delayer();
1✔
266
  }
267

268
  private DirectoryLock initialize() throws IOException {
269
    var lock = DirectoryLock.acquire(Files.createDirectories(directory));
1✔
270

271
    long totalSize = 0L;
1✔
272
    long maxLastUsed = -1;
1✔
273
    for (var indexEntry : indexOperator.recoverEntries()) {
1✔
274
      entries.put(indexEntry.hash, new Entry(indexEntry));
1✔
275
      totalSize += indexEntry.size;
1✔
276
      maxLastUsed = Math.max(maxLastUsed, indexEntry.lastUsed);
1✔
277
    }
1✔
278
    size.set(totalSize);
1✔
279
    lruClock.set(maxLastUsed + 1);
1✔
280

281
    // Make sure we start within bounds.
282
    if (totalSize > maxSize) {
1✔
283
      evictionScheduler.schedule();
1✔
284
    }
285
    return lock;
1✔
286
  }
287

288
  public Path directory() {
289
    return directory;
1✔
290
  }
291

292
  @Override
293
  public long maxSize() {
294
    return maxSize;
1✔
295
  }
296

297
  @Override
298
  public Optional<Viewer> view(String key) throws IOException {
299
    requireNonNull(key);
1✔
300
    closeLock.readLock().lock();
1✔
301
    try {
302
      requireNotClosed();
1✔
303
      var entry = entries.get(hasher.hash(key));
1✔
304
      return Optional.ofNullable(entry != null ? entry.view(key) : null);
1✔
305
    } finally {
306
      closeLock.readLock().unlock();
1✔
307
    }
308
  }
309

310
  @Override
311
  public CompletableFuture<Optional<Viewer>> view(String key, Executor executor) {
312
    return Unchecked.supplyAsync(() -> view(key), executor);
1✔
313
  }
314

315
  @Override
316
  public Optional<Editor> edit(String key) throws IOException {
317
    requireNonNull(key);
1✔
318
    closeLock.readLock().lock();
1✔
319
    try {
320
      requireNotClosed();
1✔
321
      return Optional.ofNullable(
1✔
322
          entries.computeIfAbsent(hasher.hash(key), Entry::new).edit(key, Entry.ANY_VERSION));
1✔
323
    } finally {
324
      closeLock.readLock().unlock();
1✔
325
    }
326
  }
327

328
  @Override
329
  public CompletableFuture<Optional<Editor>> edit(String key, Executor executor) {
330
    return Unchecked.supplyAsync(() -> edit(key), executor);
1✔
331
  }
332

333
  @Override
334
  public Iterator<Viewer> iterator() {
335
    return new ConcurrentViewerIterator();
1✔
336
  }
337

338
  @Override
339
  public boolean remove(String key) throws IOException {
340
    requireNonNull(key);
1✔
341
    closeLock.readLock().lock();
1✔
342
    try {
343
      requireNotClosed();
1✔
344
      var entry = entries.get(hasher.hash(key));
1✔
345
      if (entry != null) {
1✔
346
        var versionHolder = new int[1];
1✔
347
        var keyIfKnown = entry.keyIfKnown(versionHolder);
1✔
348
        if (keyIfKnown == null || key.equals(keyIfKnown) || key.equals(entry.currentEditorKey())) {
1!
349
          return removeEntry(entry, versionHolder[0]);
1✔
350
        }
351
      }
352
      return false;
1✔
353
    } finally {
354
      closeLock.readLock().unlock();
1✔
355
    }
356
  }
357

358
  @Override
359
  public void clear() throws IOException {
360
    closeLock.readLock().lock();
1✔
361
    try {
362
      requireNotClosed();
1✔
363
      for (var entry : entries.values()) {
1✔
364
        removeEntry(entry);
1✔
365
      }
1✔
366
    } finally {
367
      closeLock.readLock().unlock();
1✔
368
    }
369
  }
1✔
370

371
  @Override
372
  public long size() {
373
    return size.get();
1✔
374
  }
375

376
  @Override
377
  public void dispose() throws IOException {
378
    doClose(true);
1✔
379
    size.set(0);
1✔
380
  }
1✔
381

382
  @Override
383
  public void close() throws IOException {
384
    doClose(false);
1✔
385
  }
1✔
386

387
  private void doClose(boolean disposing) throws IOException {
388
    closeLock.writeLock().lock();
1✔
389
    try {
390
      if (closed) {
1✔
391
        return;
1✔
392
      }
393
      closed = true;
1✔
394
    } finally {
395
      closeLock.writeLock().unlock();
1✔
396
    }
397

398
    // Make sure our final index write captures each entry's final state.
399
    entries.values().forEach(Entry::freeze);
1✔
400

401
    try (directoryLock) {
1✔
402
      if (disposing) {
1✔
403
        // Shutdown the scheduler to avoid overlapping an index write with store directory deletion.
404
        indexWriteScheduler.shutdown();
1✔
405
        deleteStoreContent(directory);
1✔
406
      } else {
407
        evictExcessiveEntries();
1✔
408
        indexWriteScheduler.forceSchedule();
1✔
409
        indexWriteScheduler.shutdown();
1✔
410
      }
411
    }
412
    indexExecutor.shutdown();
1✔
413
    evictionScheduler.shutdown();
1✔
414
    entries.clear();
1✔
415
  }
1✔
416

417
  @Override
418
  public void flush() throws IOException {
419
    indexWriteScheduler.forceSchedule();
1✔
420
  }
1✔
421

422
  @Override
423
  public String toString() {
424
    boolean closed;
425
    closeLock.readLock().lock();
1✔
426
    try {
427
      closed = this.closed;
1✔
428
    } finally {
429
      closeLock.readLock().unlock();
1✔
430
    }
431
    return Utils.toStringIdentityPrefix(this)
1✔
432
        + "[directory="
433
        + directory
434
        + ", fileSystem="
435
        + directory.getFileSystem()
1✔
436
        + ", appVersion="
437
        + appVersion
438
        + ", maxSize="
439
        + maxSize
440
        + ", size="
441
        + size
442
        + ", "
443
        + (closed ? "CLOSED" : "OPEN")
1!
444
        + "]";
445
  }
446

447
  private Set<IndexEntry> indexEntriesSnapshot() {
448
    var snapshot = new HashSet<IndexEntry>();
1✔
449
    for (var entry : entries.values()) {
1✔
450
      var indexEntry = entry.toIndexEntry();
1✔
451
      if (indexEntry != null) {
1✔
452
        snapshot.add(indexEntry);
1✔
453
      }
454
    }
1✔
455
    return Collections.unmodifiableSet(snapshot);
1✔
456
  }
457

458
  @CanIgnoreReturnValue
459
  private boolean removeEntry(Entry entry) throws IOException {
460
    return removeEntry(entry, Entry.ANY_VERSION);
1✔
461
  }
462

463
  /**
464
   * Atomically evicts the given entry and decrements its size, returning {@code true} if the entry
465
   * was evicted by this call.
466
   */
467
  @CanIgnoreReturnValue
468
  private boolean removeEntry(Entry entry, int targetVersion) throws IOException {
469
    long evictedSize = evict(entry, targetVersion);
1✔
470
    if (evictedSize >= 0) {
1✔
471
      size.addAndGet(-evictedSize);
1✔
472
      return true;
1✔
473
    }
474
    return false;
1✔
475
  }
476

477
  /**
478
   * Atomically evicts the given entry if it matches the given version, returning its last committed
479
   * size if evicted by this call or -1 otherwise.
480
   */
481
  private long evict(Entry entry, int targetVersion) throws IOException {
482
    long evictedSize = entry.evict(targetVersion);
1✔
483
    if (evictedSize >= 0) {
1✔
484
      entries.remove(entry.hash, entry);
1✔
485
      indexWriteScheduler.trySchedule();
1✔
486
    }
487
    return evictedSize;
1✔
488
  }
489

490
  /** Attempts to call {@link #evictExcessiveEntries()} if not closed. */
491
  private boolean evictExcessiveEntriesIfOpen() throws IOException {
492
    closeLock.readLock().lock();
1✔
493
    try {
494
      if (closed) {
1!
495
        return false;
×
496
      }
497
      evictExcessiveEntries();
1✔
498
      return true;
1✔
499
    } finally {
500
      closeLock.readLock().unlock();
1✔
501
    }
502
  }
503

504
  /** Keeps evicting entries in LRU order till the size bound is satisfied. */
505
  private void evictExcessiveEntries() throws IOException {
506
    Iterator<Entry> lruIterator = null;
1✔
507
    for (long currentSize = size.get(); currentSize > maxSize; ) {
1✔
508
      if (lruIterator == null) {
1✔
509
        lruIterator = entriesSnapshotInLruOrder().iterator();
1✔
510
      }
511
      if (!lruIterator.hasNext()) {
1✔
512
        break;
1✔
513
      }
514

515
      long evictedSize = evict(lruIterator.next(), Entry.ANY_VERSION);
1✔
516
      if (evictedSize >= 0) {
1!
517
        currentSize = size.addAndGet(-evictedSize);
1✔
518
      } else {
519
        // Get fresh size in case of eviction races.
520
        currentSize = size.get();
×
521
      }
522
    }
1✔
523
  }
1✔
524

525
  private Collection<Entry> entriesSnapshotInLruOrder() {
526
    var lruEntries = new TreeMap<IndexEntry, Entry>(IndexEntry.LRU_ORDER);
1✔
527
    for (var entry : entries.values()) {
1✔
528
      var indexEntry = entry.toIndexEntry();
1✔
529
      if (indexEntry != null) {
1!
530
        lruEntries.put(indexEntry, entry);
1✔
531
      }
532
    }
1✔
533
    return Collections.unmodifiableCollection(lruEntries.values());
1✔
534
  }
535

536
  private void requireNotClosed() {
537
    assert holdsCloseLock();
1!
538
    requireState(!closed, "closed");
1✔
539
  }
1✔
540

541
  private boolean holdsCloseLock() {
542
    var lock = (ReentrantReadWriteLock) closeLock;
1✔
543
    return lock.isWriteLocked() || lock.getReadLockCount() > 0;
1!
544
  }
545

546
  int indexWriteCount() {
547
    requireState(indexOperator instanceof DebugIndexOperator, "not debugging!");
1✔
548
    return ((DebugIndexOperator) indexOperator).writeCount();
1✔
549
  }
550

551
  long lruTime() {
552
    return lruClock.get();
1✔
553
  }
554

555
  @Override
556
  public List<String> entriesOnUnderlyingStorageForTesting(String key) {
557
    var hash = hasher.hash(key);
1✔
558
    var path = directory.resolve(hash.toHexString() + ENTRY_FILE_SUFFIX);
1✔
559
    try (var viewer = new Entry(hash).view(key)) {
1✔
560
      return viewer != null ? List.of(path.toString()) : List.of();
1!
561
    } catch (IOException e) {
×
562
      throw new UncheckedIOException(e);
×
563
    }
564
  }
565

566
  private static Executor toDebuggingIndexExecutorDelegate(Executor delegate) {
567
    return runnable ->
1✔
568
        delegate.execute(
1✔
569
            () -> {
570
              isIndexExecutor.set(true);
1✔
571
              try {
572
                runnable.run();
1✔
573
              } finally {
574
                isIndexExecutor.set(false);
1✔
575
              }
576
            });
1✔
577
  }
578

579
  private static void checkValue(long expected, long found, String msg)
580
      throws StoreCorruptionException {
581
    if (expected != found) {
1✔
582
      throw new StoreCorruptionException(
1✔
583
          String.format("%s; expected: %#x, found: %#x", msg, expected, found));
1✔
584
    }
585
  }
1✔
586

587
  private static void checkValue(boolean valueIsValid, String msg, long value)
588
      throws StoreCorruptionException {
589
    if (!valueIsValid) {
1!
590
      throw new StoreCorruptionException(String.format("%s: %d", msg, value));
×
591
    }
592
  }
1✔
593

594
  private static int getNonNegativeInt(ByteBuffer buffer) throws StoreCorruptionException {
595
    int value = buffer.getInt();
1✔
596
    checkValue(value >= 0, "Expected a value >= 0", value);
1!
597
    return value;
1✔
598
  }
599

600
  private static long getNonNegativeLong(ByteBuffer buffer) throws StoreCorruptionException {
601
    long value = buffer.getLong();
1✔
602
    checkValue(value >= 0, "Expected a value >= 0", value);
1!
603
    return value;
1✔
604
  }
605

606
  private static long getPositiveLong(ByteBuffer buffer) throws StoreCorruptionException {
607
    long value = buffer.getLong();
1✔
608
    checkValue(value > 0, "Expected a positive value", value);
1!
609
    return value;
1✔
610
  }
611

612
  private static @Nullable Hash entryFileToHash(String filename) {
613
    assert filename.endsWith(ENTRY_FILE_SUFFIX) || filename.endsWith(TEMP_ENTRY_FILE_SUFFIX);
1!
614
    int suffixLength =
615
        filename.endsWith(ENTRY_FILE_SUFFIX)
1✔
616
            ? ENTRY_FILE_SUFFIX.length()
1✔
617
            : TEMP_ENTRY_FILE_SUFFIX.length();
1✔
618
    return Hash.tryParse(filename.substring(0, filename.length() - suffixLength));
1✔
619
  }
620

621
  private static void replace(Path source, Path target) throws IOException {
622
    Files.move(source, target, ATOMIC_MOVE, REPLACE_EXISTING);
1✔
623
  }
1✔
624

625
  private static void deleteStoreContent(Path directory) throws IOException {
626
    // Retain the lock file as we're still using the directory.
627
    var lockFile = directory.resolve(LOCK_FILENAME);
1✔
628
    try (var stream = Files.newDirectoryStream(directory, file -> !file.equals(lockFile))) {
1✔
629
      for (var file : stream) {
1✔
630
        safeDeleteIfExists(file);
1✔
631
      }
1✔
632
    } catch (DirectoryIteratorException e) {
×
633
      throw e.getCause();
×
634
    }
1✔
635
  }
1✔
636

637
  /**
638
   * Deletes the given file in isolation from its original name. This is done by randomly renaming
639
   * it beforehand.
640
   *
641
   * <p>Typically, Windows denys access to names of files deleted while having open handles (these
642
   * are deletable when opened with FILE_SHARE_DELETE, which is NIO's case). The reason seems to be
643
   * that 'deletion' in such case merely tags the file for physical deletion when all open handles
644
   * are closed. However, it appears that handles in Windows are associated with the names of files
645
   * they're opened for (<a
646
   * href="https://devblogs.microsoft.com/oldnewthing/20040607-00/?p=38993">check this blog</a>).
647
   *
648
   * <p>This causes problems when an entry is deleted while being viewed. We're prevented from using
649
   * that entry's file name in case it's recreated (i.e. by committing an edit) while at least one
650
   * viewer is still open. The solution is to randomly rename these files before deletion, so the OS
651
   * associates any open handles with that random name instead. The original name is reusable
652
   * thereafter.
653
   */
654
  private static void isolatedDeleteIfExists(Path file) throws IOException {
655
    try {
656
      Files.deleteIfExists(isolate(file));
1✔
657
    } catch (NoSuchFileException ignored) {
1✔
658
      // This can be thrown by isolate(Path), meaning the file is already gone!
659
    }
1✔
660
  }
1✔
661

662
  private static Path isolate(Path file) throws IOException {
663
    while (true) {
664
      var randomFilename =
665
          ISOLATED_FILE_PREFIX + Long.toHexString(ThreadLocalRandom.current().nextLong());
1✔
666
      try {
667
        return Files.move(file, file.resolveSibling(randomFilename), ATOMIC_MOVE);
1✔
668
      } catch (FileAlreadyExistsException | AccessDeniedException filenameAlreadyInUse) {
×
669
        // We can then try again with a new random name. Note that an AccessDeniedException is
670
        // thrown on a name clash with another 'isolated' file that still has open handles.
671
      }
672
    }
×
673
  }
674

675
  /**
676
   * Deletes the given file with {@link DiskStore#isolatedDeleteIfExists(Path)} if it's an entry
677
   * file (and thus may have open handles), otherwise deletes it with {@link
678
   * Files#deleteIfExists(Path)}.
679
   */
680
  private static void safeDeleteIfExists(Path file) throws IOException {
681
    var filenameComponent = file.getFileName();
1✔
682
    var filename = filenameComponent != null ? filenameComponent.toString() : "";
1!
683
    if (filename.endsWith(ENTRY_FILE_SUFFIX)) {
1✔
684
      isolatedDeleteIfExists(file);
1✔
685
    } else if (filename.startsWith(ISOLATED_FILE_PREFIX)) {
1!
686
      try {
687
        Files.deleteIfExists(file);
×
688
      } catch (AccessDeniedException ignored) {
×
689
        // An isolated file can be awaiting deletion if it has open handles. In this case, an
690
        // AccessDeniedException is always thrown on Windows, so there's nothing we can do.
691
      }
×
692
    } else {
693
      Files.deleteIfExists(file);
1✔
694
    }
695
  }
1✔
696

697
  private static void closeQuietly(Closeable closeable) {
698
    try {
699
      closeable.close();
1✔
700
    } catch (IOException e) {
×
701
      logger.log(Level.WARNING, "Exception thrown when closing: " + closeable, e);
×
702
    }
1✔
703
  }
1✔
704

705
  private static void deleteIfExistsQuietly(Path path) {
706
    try {
707
      Files.deleteIfExists(path);
1✔
708
    } catch (IOException e) {
×
709
      logger.log(Level.WARNING, "Exception thrown when deleting: " + path, e);
×
710
    }
1✔
711
  }
1✔
712

713
  private static boolean keyMismatches(
714
      @Nullable String keyIfKnown, @Nullable String expectedKeyIfKnown) {
715
    return keyIfKnown != null
1!
716
        && expectedKeyIfKnown != null
717
        && !keyIfKnown.equals(expectedKeyIfKnown);
1✔
718
  }
719

720
  public static Builder newBuilder() {
721
    return new Builder();
1✔
722
  }
723

724
  private static final class FileChannelCloseable implements Closeable {
725
    private final FileChannel channel;
726
    private boolean close = true;
1✔
727

728
    FileChannelCloseable(FileChannel channel) {
1✔
729
      this.channel = channel;
1✔
730
    }
1✔
731

732
    FileChannel channel() {
733
      return channel;
1✔
734
    }
735

736
    void keepOpen() {
737
      close = false;
1✔
738
    }
1✔
739

740
    @Override
741
    public void close() throws IOException {
742
      if (close) {
1✔
743
        channel.close();
1✔
744
      }
745
    }
1✔
746
  }
747

748
  private final class ConcurrentViewerIterator implements Iterator<Viewer> {
749
    private final Iterator<Entry> entryIterator = entries.values().iterator();
1✔
750

751
    private @Nullable Viewer nextViewer;
752
    private @Nullable Viewer currentViewer;
753

754
    ConcurrentViewerIterator() {}
1✔
755

756
    @Override
757
    @EnsuresNonNullIf(expression = "nextViewer", result = true)
758
    public boolean hasNext() {
759
      return nextViewer != null || findNext();
1✔
760
    }
761

762
    @Override
763
    public Viewer next() {
764
      if (!hasNext()) {
1✔
765
        throw new NoSuchElementException();
1✔
766
      }
767
      var viewer = castNonNull(nextViewer);
1✔
768
      nextViewer = null;
1✔
769
      currentViewer = viewer;
1✔
770
      return viewer;
1✔
771
    }
772

773
    @Override
774
    public void remove() {
775
      var viewer = currentViewer;
1✔
776
      requireState(viewer != null, "next() must be called before remove()");
1!
777
      currentViewer = null;
1✔
778
      try {
779
        castNonNull(viewer).removeEntry();
1✔
780
      } catch (IOException e) {
×
781
        logger.log(Level.WARNING, "Exception thrown when removing entry", e);
×
782
      }
1✔
783
    }
1✔
784

785
    @EnsuresNonNullIf(expression = "nextViewer", result = true)
786
    private boolean findNext() {
787
      while (entryIterator.hasNext()) {
1✔
788
        var entry = entryIterator.next();
1✔
789
        try {
790
          var viewer = view(entry);
1✔
791
          if (viewer != null) {
1✔
792
            nextViewer = viewer;
1✔
793
            return true;
1✔
794
          }
795
        } catch (IOException e) {
×
796
          // Try next entry.
797
          logger.log(Level.WARNING, "Exception thrown when iterating over entries", e);
×
798
        } catch (IllegalStateException e) {
1✔
799
          // Handle closure gracefully by ending iteration.
800
          return false;
1✔
801
        }
1✔
802
      }
1✔
803
      return false;
1✔
804
    }
805

806
    private @Nullable Viewer view(Entry entry) throws IOException {
807
      closeLock.readLock().lock();
1✔
808
      try {
809
        requireState(!closed, "Closed");
1✔
810
        return entry.view(null);
1✔
811
      } finally {
812
        closeLock.readLock().unlock();
1✔
813
      }
814
    }
815
  }
816

817
  private static final class Sha256MessageDigestFactory {
818
    private static final @Nullable MessageDigest TEMPLATE = lookupTemplateIfCloneable();
1✔
819

820
    private Sha256MessageDigestFactory() {}
821

822
    static MessageDigest create() {
823
      if (TEMPLATE == null) {
1!
824
        return lookup();
×
825
      }
826
      try {
827
        return (MessageDigest) TEMPLATE.clone();
1✔
828
      } catch (CloneNotSupportedException e) {
×
829
        throw new AssertionError(e);
×
830
      }
831
    }
832

833
    private static @Nullable MessageDigest lookupTemplateIfCloneable() {
834
      try {
835
        return (MessageDigest) lookup().clone();
1✔
836
      } catch (CloneNotSupportedException ignored) {
×
837
        return null;
×
838
      }
839
    }
840

841
    private static MessageDigest lookup() {
842
      try {
843
        return MessageDigest.getInstance("SHA-256");
1✔
844
      } catch (NoSuchAlgorithmException e) {
×
845
        throw new UnsupportedOperationException("SHA-256 not available!", e);
×
846
      }
847
    }
848
  }
849

850
  /** A function that computes an 80-bit hash from a string key. */
851
  @FunctionalInterface
852
  public interface Hasher {
853
    /** A Hasher returning the first 80 bits of the SHA-256 of the key's UTF-8 encoded bytes. */
854
    Hasher TRUNCATED_SHA_256 = Hasher::truncatedSha256Hash;
1✔
855

856
    Hash hash(String key);
857

858
    private static Hash truncatedSha256Hash(String key) {
859
      return new Hash(
1✔
860
          ByteBuffer.wrap(Sha256MessageDigestFactory.create().digest(key.getBytes(UTF_8)))
1✔
861
              .limit(Hash.BYTES));
1✔
862
    }
863
  }
864

865
  private static class IndexOperator {
866
    private final Path directory;
867
    private final Path indexFile;
868
    private final Path tempIndexFile;
869
    private final int appVersion;
870

871
    IndexOperator(Path directory, int appVersion) {
1✔
872
      this.directory = directory;
1✔
873
      this.indexFile = directory.resolve(INDEX_FILENAME);
1✔
874
      this.tempIndexFile = directory.resolve(TEMP_INDEX_FILENAME);
1✔
875
      this.appVersion = appVersion;
1✔
876
    }
1✔
877

878
    Set<IndexEntry> recoverEntries() throws IOException {
879
      var diskEntries = scanDirectoryForEntries();
1✔
880
      var indexEntries = readOrCreateIndex();
1✔
881
      var retainedIndexEntries = new HashSet<IndexEntry>(indexEntries.size());
1✔
882
      var filesToDelete = new HashSet<Path>();
1✔
883
      for (var entry : indexEntries) {
1✔
884
        var entryFiles = diskEntries.get(entry.hash);
1✔
885
        if (entryFiles != null) {
1✔
886
          if (entryFiles.cleanFile != null) {
1✔
887
            retainedIndexEntries.add(entry);
1✔
888
          }
889
          if (entryFiles.dirtyFile != null) {
1✔
890
            // Delete trails of unsuccessful edits.
891
            filesToDelete.add(entryFiles.dirtyFile);
1✔
892
          }
893
        }
894
      }
1✔
895

896
      // Delete entries found on disk but not referenced by the index.
897
      // TODO consider trying to recover these entries.
898
      if (retainedIndexEntries.size() != diskEntries.size()) {
1✔
899
        var untrackedEntries = new HashMap<>(diskEntries);
1✔
900
        retainedIndexEntries.forEach(entries -> untrackedEntries.remove(entries.hash));
1✔
901
        for (var entryFiles : untrackedEntries.values()) {
1✔
902
          if (entryFiles.cleanFile != null) {
1✔
903
            filesToDelete.add(entryFiles.cleanFile);
1✔
904
          }
905
          if (entryFiles.dirtyFile != null) {
1✔
906
            filesToDelete.add(entryFiles.dirtyFile);
1✔
907
          }
908
        }
1✔
909
      }
910

911
      for (var file : filesToDelete) {
1✔
912
        safeDeleteIfExists(file);
1✔
913
      }
1✔
914
      return Collections.unmodifiableSet(retainedIndexEntries);
1✔
915
    }
916

917
    private Set<IndexEntry> readOrCreateIndex() throws IOException {
918
      try {
919
        return readIndex();
1✔
920
      } catch (NoSuchFileException e) {
1✔
921
        return Set.of();
1✔
922
      } catch (StoreCorruptionException | EOFException e) {
1✔
923
        // TODO consider trying to rebuild the index from a directory scan instead.
924
        logger.log(Level.WARNING, "Dropping store content due to an unreadable index", e);
1✔
925
        deleteStoreContent(directory);
1✔
926
        return Set.of();
1✔
927
      }
928
    }
929

930
    Set<IndexEntry> readIndex() throws IOException {
931
      try (var channel = FileChannel.open(indexFile, READ)) {
1✔
932
        var header = FileIO.read(channel, INDEX_HEADER_SIZE);
1✔
933
        checkValue(INDEX_MAGIC, header.getLong(), "Not in index format");
1✔
934
        checkValue(STORE_VERSION, header.getInt(), "Unexpected store version");
1✔
935
        checkValue(appVersion, header.getInt(), "Unexpected app version");
1✔
936

937
        long entryCount = header.getLong();
1✔
938
        checkValue(
1!
939
            entryCount >= 0 && entryCount <= MAX_ENTRY_COUNT, "Invalid entry count", entryCount);
940
        if (entryCount == 0) {
1✔
941
          return Set.of();
1✔
942
        }
943

944
        int intEntryCount = (int) entryCount;
1✔
945
        int entryTableSize = intEntryCount * INDEX_ENTRY_SIZE;
1✔
946
        var entryTable = FileIO.read(channel, entryTableSize);
1✔
947
        var entries = new HashSet<IndexEntry>(intEntryCount);
1✔
948
        for (int i = 0; i < intEntryCount; i++) {
1✔
949
          entries.add(new IndexEntry(entryTable));
1✔
950
        }
951
        return Collections.unmodifiableSet(entries);
1✔
952
      }
1!
953
    }
954

955
    void writeIndex(Set<IndexEntry> entries) throws IOException {
956
      requireArgument(entries.size() <= MAX_ENTRY_COUNT, "Too many entries");
1!
957
      try (var channel = FileChannel.open(tempIndexFile, CREATE, WRITE, TRUNCATE_EXISTING)) {
1✔
958
        var index =
1✔
959
            ByteBuffer.allocate(INDEX_HEADER_SIZE + INDEX_ENTRY_SIZE * entries.size())
1✔
960
                .putLong(INDEX_MAGIC)
1✔
961
                .putInt(STORE_VERSION)
1✔
962
                .putInt(appVersion)
1✔
963
                .putLong(entries.size());
1✔
964
        entries.forEach(entry -> entry.writeTo(index));
1✔
965
        FileIO.write(channel, index.flip());
1✔
966
        channel.force(false);
1✔
967
      }
968
      replace(tempIndexFile, indexFile);
1✔
969
    }
1✔
970

971
    private Map<Hash, EntryFiles> scanDirectoryForEntries() throws IOException {
972
      var diskEntries = new HashMap<Hash, EntryFiles>();
1✔
973
      try (var stream = Files.newDirectoryStream(directory)) {
1✔
974
        for (var file : stream) {
1✔
975
          var filenameComponent = file.getFileName();
1✔
976
          var filename = filenameComponent != null ? filenameComponent.toString() : "";
1!
977
          if (filename.equals(INDEX_FILENAME)
1✔
978
              || filename.equals(TEMP_INDEX_FILENAME)
1!
979
              || filename.equals(LOCK_FILENAME)) {
1✔
980
            // Skip non-entry files.
981
            continue;
1✔
982
          }
983

984
          Hash hash;
985
          if ((filename.endsWith(ENTRY_FILE_SUFFIX) || filename.endsWith(TEMP_ENTRY_FILE_SUFFIX))
1!
986
              && (hash = entryFileToHash(filename)) != null) {
1!
987
            var files = diskEntries.computeIfAbsent(hash, __ -> new EntryFiles());
1✔
988
            if (filename.endsWith(ENTRY_FILE_SUFFIX)) {
1✔
989
              files.cleanFile = file;
1✔
990
            } else {
991
              files.dirtyFile = file;
1✔
992
            }
993
          } else if (filename.startsWith(ISOLATED_FILE_PREFIX)) {
1!
994
            // Clean trails of isolatedDeleteIfExists in case it failed in a previous session.
995
            safeDeleteIfExists(file);
×
996
          } else {
997
            logger.log(
×
998
                Level.WARNING,
999
                "Unrecognized file or directory found during initialization <"
1000
                    + file
1001
                    + ">. "
1002
                    + System.lineSeparator()
×
1003
                    + "It is generally not a good idea to let the store directory be used by other entities.");
1004
          }
1005
        }
1✔
1006
      }
1007
      return diskEntries;
1✔
1008
    }
1009

1010
    /** Entry related files found by a directory scan. */
1011
    private static final class EntryFiles {
1012
      @MonotonicNonNull Path cleanFile;
1013
      @MonotonicNonNull Path dirtyFile;
1014

1015
      EntryFiles() {}
1✔
1016
    }
1017
  }
1018

1019
  private static final class DebugIndexOperator extends IndexOperator {
1020
    private static final VarHandle RUNNING_OPERATION;
1021

1022
    static {
1023
      try {
1024
        RUNNING_OPERATION =
1025
            MethodHandles.lookup()
1✔
1026
                .findVarHandle(DebugIndexOperator.class, "runningOperation", String.class);
1✔
1027
      } catch (NoSuchFieldException | IllegalAccessException e) {
×
1028
        throw new ExceptionInInitializerError(e);
×
1029
      }
1✔
1030
    }
1✔
1031

1032
    private final AtomicInteger writeCount = new AtomicInteger(0);
1✔
1033

1034
    @SuppressWarnings({"FieldCanBeLocal", "UnusedVariable"}) // VarHandle indirection.
1035
    private @Nullable String runningOperation;
1036

1037
    DebugIndexOperator(Path directory, int appVersion) {
1038
      super(directory, appVersion);
1✔
1039
    }
1✔
1040

1041
    @Override
1042
    Set<IndexEntry> readIndex() throws IOException {
1043
      enter("readIndex");
1✔
1044
      try {
1045
        return super.readIndex();
1✔
1046
      } finally {
1047
        exit();
1✔
1048
      }
1049
    }
1050

1051
    @Override
1052
    void writeIndex(Set<IndexEntry> entries) throws IOException {
1053
      enter("writeIndex");
1✔
1054
      try {
1055
        super.writeIndex(entries);
1✔
1056
        writeCount.incrementAndGet();
1✔
1057
      } finally {
1058
        exit();
1✔
1059
      }
1060
    }
1✔
1061

1062
    private void enter(String operation) {
1063
      if (!isIndexExecutor.get()) {
1!
1064
        logger.log(
×
1065
            Level.ERROR,
1066
            () -> "IndexOperator::" + operation + " isn't called by the index executor");
×
1067
      }
1068

1069
      var currentOperation = RUNNING_OPERATION.compareAndExchange(this, null, operation);
1✔
1070
      if (currentOperation != null) {
1!
1071
        logger.log(
×
1072
            Level.ERROR,
1073
            () ->
1074
                "IndexOperator::"
×
1075
                    + operation
1076
                    + " is called while IndexOperator::"
1077
                    + currentOperation
1078
                    + " is running");
1079
      }
1080
    }
1✔
1081

1082
    private void exit() {
1083
      runningOperation = null;
1✔
1084
    }
1✔
1085

1086
    int writeCount() {
1087
      return writeCount.get();
1✔
1088
    }
1089
  }
1090

1091
  /**
1092
   * A time-limited scheduler for index writes that arranges no more than 1 write per the specified
1093
   * time period.
1094
   */
1095
  private static final class IndexWriteScheduler {
1096
    private static final VarHandle SCHEDULED_WRITE_TASK;
1097

1098
    static {
1099
      try {
1100
        SCHEDULED_WRITE_TASK =
1101
            MethodHandles.lookup()
1✔
1102
                .findVarHandle(IndexWriteScheduler.class, "scheduledWriteTask", WriteTask.class);
1✔
1103
      } catch (NoSuchFieldException | IllegalAccessException e) {
×
1104
        throw new ExceptionInInitializerError(e);
×
1105
      }
1✔
1106
    }
1107

1108
    /** Terminal marker that is set when no more writes are to be scheduled. */
1109
    private static final WriteTask TOMBSTONE =
1✔
1110
        new WriteTask() {
1✔
1111
          @Override
1112
          Instant fireTime() {
1113
            return Instant.MIN;
×
1114
          }
1115

1116
          @Override
1117
          void cancel() {}
×
1118
        };
1119

1120
    private final IndexOperator indexOperator;
1121
    private final Executor indexExecutor;
1122
    private final Supplier<Set<IndexEntry>> indexEntriesSnapshotSupplier;
1123
    private final Duration period;
1124
    private final Delayer delayer;
1125
    private final Clock clock;
1126
    private @MonotonicNonNull WriteTask scheduledWriteTask;
1127

1128
    /**
1129
     * A barrier for shutdowns to await the currently running task. Scheduled WriteTasks have the
1130
     * following transitions:
1131
     *
1132
     * <pre>{@code
1133
     * T1 -> T2 -> .... -> Tn
1134
     * }</pre>
1135
     *
1136
     * Where Tn is the currently scheduled and hence the only referenced task, and the time between
1137
     * two consecutive Ts is generally the specified period, or less if there are immediate flushes
1138
     * (note that Ts don't overlap since the executor is serialized). Ensuring no Ts are running
1139
     * after shutdown entails awaiting the currently running task (if any) to finish then preventing
1140
     * ones following it from starting. If the update delay is small enough, or if the executor
1141
     * and/or the system-wide scheduler are busy, the currently running task might be lagging behind
1142
     * Tn by multiple Ts, so it's not ideal to somehow keep a reference to it in order to await it
1143
     * when needed. This Phaser solves this issue by having the currently running T to register
1144
     * itself then arriveAndDeregister when finished. During shutdown, the scheduler de-registers
1145
     * from, then attempts to await, the phaser, where it is only awaited if there is still one
1146
     * registered party (a running T). When registerers reach 0, the phaser is terminated,
1147
     * preventing yet to arrive tasks from registering, so they can choose not to run.
1148
     */
1149
    private final Phaser runningTaskAwaiter = new Phaser(1); // Register self.
1✔
1150

1151
    IndexWriteScheduler(
1152
        IndexOperator indexOperator,
1153
        Executor indexExecutor,
1154
        Supplier<Set<IndexEntry>> indexEntriesSnapshotSupplier,
1155
        Duration period,
1156
        Delayer delayer,
1157
        Clock clock) {
1✔
1158
      this.indexOperator = indexOperator;
1✔
1159
      this.indexExecutor = indexExecutor;
1✔
1160
      this.indexEntriesSnapshotSupplier = indexEntriesSnapshotSupplier;
1✔
1161
      this.period = period;
1✔
1162
      this.delayer = delayer;
1✔
1163
      this.clock = clock;
1✔
1164
    }
1✔
1165

1166
    Clock clock() {
1167
      return clock;
1✔
1168
    }
1169

1170
    Delayer delayer() {
1171
      return delayer;
1✔
1172
    }
1173

1174
    @SuppressWarnings("FutureReturnValueIgnored")
1175
    void trySchedule() {
1176
      // Decide whether to schedule and when as follows:
1177
      //   - If TOMBSTONE is set, don't schedule anything.
1178
      //   - If scheduledWriteTask is null, then this is the first call, so schedule immediately.
1179
      //   - If scheduledWriteTask is set to run in the future, then it'll see the changes made so
1180
      //     far and there's no need to schedule.
1181
      //   - If less than INDEX_UPDATE_DELAY time has passed since the last write, then schedule the
1182
      //     writer to run when the period evaluates from the last write.
1183
      //   - Otherwise, a timeslot is available, so schedule immediately.
1184
      //
1185
      // This is retried in case of contention.
1186

1187
      var now = clock.instant();
1✔
1188
      while (true) {
1189
        var currentTask = scheduledWriteTask;
1✔
1190
        var nextFireTime = currentTask != null ? currentTask.fireTime() : null;
1✔
1191
        Duration delay;
1192
        if (nextFireTime == null) {
1✔
1193
          delay = Duration.ZERO;
1✔
1194
        } else if (currentTask == TOMBSTONE || nextFireTime.isAfter(now)) {
1!
1195
          return; // No writes are needed.
1✔
1196
        } else {
1197
          var idleness = Duration.between(nextFireTime, now);
1✔
1198
          delay = Compare.max(period.minus(idleness), Duration.ZERO);
1✔
1199
        }
1200

1201
        var newTask = new RunnableWriteTask(now.plus(delay));
1✔
1202
        if (SCHEDULED_WRITE_TASK.compareAndSet(this, currentTask, newTask)) {
1!
1203
          delayer.delay(newTask::runUnchecked, delay, indexExecutor);
1✔
1204
          return;
1✔
1205
        }
1206
      }
×
1207
    }
1208

1209
    /** Forcibly submits an index write to the index executor, ignoring the time rate. */
1210
    void forceSchedule() throws IOException {
1211
      Utils.getIo(forceScheduleAsync());
1✔
1212
    }
1✔
1213

1214
    private CompletableFuture<Void> forceScheduleAsync() {
1215
      var now = clock.instant();
1✔
1216
      while (true) {
1217
        var currentTask = scheduledWriteTask;
1✔
1218
        requireState(currentTask != TOMBSTONE, "Shutdown");
1!
1219

1220
        var newTask = new RunnableWriteTask(now);
1✔
1221
        if (SCHEDULED_WRITE_TASK.compareAndSet(this, currentTask, newTask)) {
1!
1222
          if (currentTask != null) {
1✔
1223
            currentTask.cancel();
1✔
1224
          }
1225
          return Unchecked.runAsync(newTask, indexExecutor);
1✔
1226
        }
1227
      }
×
1228
    }
1229

1230
    void shutdown() throws InterruptedIOException {
1231
      scheduledWriteTask = TOMBSTONE;
1✔
1232
      try {
1233
        runningTaskAwaiter.awaitAdvanceInterruptibly(runningTaskAwaiter.arriveAndDeregister());
1✔
1234
        assert runningTaskAwaiter.isTerminated();
1!
1235
      } catch (InterruptedException e) {
×
1236
        throw Utils.toInterruptedIOException(e);
×
1237
      }
1✔
1238
    }
1✔
1239

1240
    private abstract static class WriteTask {
1241
      abstract Instant fireTime();
1242

1243
      abstract void cancel();
1244
    }
1245

1246
    private final class RunnableWriteTask extends WriteTask implements ThrowingRunnable {
1247
      private final Instant fireTime;
1248
      private volatile boolean cancelled;
1249

1250
      RunnableWriteTask(Instant fireTime) {
1✔
1251
        this.fireTime = fireTime;
1✔
1252
      }
1✔
1253

1254
      @Override
1255
      Instant fireTime() {
1256
        return fireTime;
1✔
1257
      }
1258

1259
      @Override
1260
      void cancel() {
1261
        cancelled = true;
1✔
1262
      }
1✔
1263

1264
      @Override
1265
      public void run() throws IOException {
1266
        if (!cancelled && runningTaskAwaiter.register() >= 0) {
1✔
1267
          try {
1268
            indexOperator.writeIndex(indexEntriesSnapshotSupplier.get());
1✔
1269
          } finally {
1270
            runningTaskAwaiter.arriveAndDeregister();
1✔
1271
          }
1272
        }
1273
      }
1✔
1274

1275
      void runUnchecked() {
1276
        // TODO consider disabling the store if failure happens too often.
1277
        try {
1278
          run();
1✔
1279
        } catch (IOException e) {
×
1280
          logger.log(Level.ERROR, "Exception thrown when writing the index", e);
×
1281
        }
1✔
1282
      }
1✔
1283
    }
1284
  }
1285

1286
  /** Schedules eviction tasks on demand while ensuring they're run sequentially. */
1287
  private static final class EvictionScheduler {
1288
    private static final int RUN = 1;
1289
    private static final int KEEP_ALIVE = 2;
1290
    private static final int SHUTDOWN = 4;
1291

1292
    private static final VarHandle SYNC;
1293

1294
    static {
1295
      try {
1296
        var lookup = MethodHandles.lookup();
1✔
1297
        SYNC = lookup.findVarHandle(EvictionScheduler.class, "sync", int.class);
1✔
1298
      } catch (NoSuchFieldException | IllegalAccessException e) {
×
1299
        throw new ExceptionInInitializerError(e);
×
1300
      }
1✔
1301
    }
1✔
1302

1303
    private final DiskStore store;
1304
    private final Executor executor;
1305

1306
    @SuppressWarnings("unused") // VarHandle indirection.
1307
    private volatile int sync;
1308

1309
    EvictionScheduler(DiskStore store, Executor executor) {
1✔
1310
      this.store = store;
1✔
1311
      this.executor = executor;
1✔
1312
    }
1✔
1313

1314
    void schedule() {
1315
      for (int s; ((s = sync) & SHUTDOWN) == 0; ) {
1!
1316
        int bit = (s & RUN) == 0 ? RUN : KEEP_ALIVE; // Run or keep-alive.
1✔
1317
        if (SYNC.compareAndSet(this, s, (s | bit))) {
1!
1318
          if (bit == RUN) {
1✔
1319
            executor.execute(this::runEviction);
1✔
1320
          }
1321
          break;
1322
        }
1323
      }
×
1324
    }
1✔
1325

1326
    private void runEviction() {
1327
      for (int s; ((s = sync) & SHUTDOWN) == 0; ) {
1!
1328
        try {
1329
          if (!store.evictExcessiveEntriesIfOpen()) {
1!
1330
            // Ignore eviction, the store ensures it's closed within bounds.
1331
            break;
×
1332
          }
1333
        } catch (IOException e) {
×
1334
          logger.log(Level.ERROR, "Exception thrown when evicting entries in background", e);
×
1335
        }
1✔
1336

1337
        // Exit or consume keep-alive bit.
1338
        int bit = (s & KEEP_ALIVE) != 0 ? KEEP_ALIVE : RUN;
1✔
1339
        if (SYNC.compareAndSet(this, s, s & ~bit) && bit == RUN) {
1✔
1340
          break;
1✔
1341
        }
1342
      }
1✔
1343
    }
1✔
1344

1345
    void shutdown() {
1346
      SYNC.getAndBitwiseOr(this, SHUTDOWN);
1✔
1347
    }
1✔
1348
  }
1349

1350
  /**
1351
   * A lock on the store directory that ensures it's operated upon by a single DiskStore instance in
1352
   * a single JVM process. This only works in a cooperative manner; it doesn't prevent other
1353
   * entities from using the directory.
1354
   */
1355
  private static final class DirectoryLock implements AutoCloseable {
1356
    private final Path lockFile;
1357
    private final FileChannel channel;
1358

1359
    private DirectoryLock(Path lockFile, FileChannel channel) {
1✔
1360
      this.lockFile = lockFile;
1✔
1361
      this.channel = channel;
1✔
1362
    }
1✔
1363

1364
    @Override
1365
    public void close() {
1366
      deleteIfExistsQuietly(lockFile);
1✔
1367
      closeQuietly(channel); // Closing the channel releases the lock.
1✔
1368
    }
1✔
1369

1370
    static DirectoryLock acquire(Path directory) throws IOException {
1371
      var lockFile = directory.resolve(LOCK_FILENAME);
1✔
1372
      try (var closeable =
1✔
1373
          new FileChannelCloseable(FileChannel.open(lockFile, READ, WRITE, CREATE))) {
1✔
1374
        var channel = closeable.channel();
1✔
1375
        var fileLock = channel.tryLock();
1✔
1376
        if (fileLock == null) {
1!
1377
          throw new IOException("Store directory <" + directory + "> already in use");
×
1378
        }
1379
        var lock = new DirectoryLock(lockFile, channel);
1✔
1380
        closeable.keepOpen();
1✔
1381
        return lock;
1✔
1382
      } catch (IOException e) {
×
1383
        deleteIfExistsQuietly(lockFile);
×
1384
        throw e;
×
1385
      }
1386
    }
1387
  }
1388

1389
  /** An immutable 80-bit hash code. */
1390
  public static final class Hash {
1✔
1391
    static final int BYTES = 10;
1392
    static final int HEX_STRING_LENGTH = 2 * BYTES;
1393

1394
    // Upper 64 bits + lower 16 bits in big-endian order.
1395
    private final long upper64Bits;
1396
    private final short lower16Bits;
1397

1398
    private @MonotonicNonNull String lazyHex;
1399

1400
    public Hash(ByteBuffer buffer) {
1401
      this(buffer.getLong(), buffer.getShort());
1✔
1402
    }
1✔
1403

1404
    Hash(long upper64Bits, short lower16Bits) {
1✔
1405
      this.upper64Bits = upper64Bits;
1✔
1406
      this.lower16Bits = lower16Bits;
1✔
1407
    }
1✔
1408

1409
    void writeTo(ByteBuffer buffer) {
1410
      buffer.putLong(upper64Bits);
1✔
1411
      buffer.putShort(lower16Bits);
1✔
1412
    }
1✔
1413

1414
    String toHexString() {
1415
      var hex = lazyHex;
1✔
1416
      if (hex == null) {
1✔
1417
        hex =
1✔
1418
            toPaddedHexString(upper64Bits, Long.BYTES)
1✔
1419
                + toPaddedHexString(lower16Bits & SHORT_MASK, Short.BYTES);
1✔
1420
        lazyHex = hex;
1✔
1421
      }
1422
      return hex;
1✔
1423
    }
1424

1425
    @Override
1426
    public int hashCode() {
1427
      return Long.hashCode(upper64Bits) ^ Short.hashCode(lower16Bits);
1✔
1428
    }
1429

1430
    @Override
1431
    public boolean equals(@Nullable Object obj) {
1432
      if (obj == this) {
1!
1433
        return true;
×
1434
      }
1435
      if (!(obj instanceof Hash)) {
1!
1436
        return false;
×
1437
      }
1438
      var other = (Hash) obj;
1✔
1439
      return upper64Bits == other.upper64Bits && lower16Bits == other.lower16Bits;
1!
1440
    }
1441

1442
    @Override
1443
    public String toString() {
1444
      return toHexString();
×
1445
    }
1446

1447
    static @Nullable Hash tryParse(String hex) {
1448
      if (hex.length() != HEX_STRING_LENGTH) {
1!
1449
        return null;
×
1450
      }
1451
      try {
1452
        // There's no Short.parseShort that accepts a CharSequence region, so use downcast result of
1453
        // Integer.parseInt. This will certainly fit in a short since exactly 32 hex characters are
1454
        // parsed, and we don't care about the sign.
1455
        return new Hash(
1✔
1456
            Long.parseUnsignedLong(hex, 0, Long.BYTES << 1, 16),
1✔
1457
            (short) Integer.parseInt(hex, Long.BYTES << 1, hex.length(), 16));
1✔
1458
      } catch (NumberFormatException ignored) {
×
1459
        return null;
×
1460
      }
1461
    }
1462

1463
    private static String toPaddedHexString(long value, int size) {
1464
      var hex = Long.toHexString(value);
1✔
1465
      int padding = (size << 1) - hex.length();
1✔
1466
      assert padding >= 0;
1!
1467
      if (padding > 0) {
1✔
1468
        hex = "0".repeat(padding) + hex;
1✔
1469
      }
1470
      return hex;
1✔
1471
    }
1472
  }
1473

1474
  private static final class IndexEntry {
1475
    /**
1476
     * A comparator that defines LRU eviction order. It is assumed that there can be no ties based
1477
     * on latest usage time.
1478
     */
1479
    static final Comparator<IndexEntry> LRU_ORDER =
1✔
1480
        Comparator.comparingLong(entry -> entry.lastUsed);
1✔
1481

1482
    final Hash hash;
1483
    final long lastUsed;
1484
    final long size;
1485

1486
    IndexEntry(Hash hash, long lastUsed, long size) {
1✔
1487
      this.hash = hash;
1✔
1488
      this.lastUsed = lastUsed;
1✔
1489
      this.size = size;
1✔
1490
    }
1✔
1491

1492
    IndexEntry(ByteBuffer buffer) throws StoreCorruptionException {
1✔
1493
      hash = new Hash(buffer);
1✔
1494
      lastUsed = buffer.getLong();
1✔
1495
      size = getPositiveLong(buffer);
1✔
1496
    }
1✔
1497

1498
    void writeTo(ByteBuffer buffer) {
1499
      hash.writeTo(buffer);
1✔
1500
      buffer.putLong(lastUsed);
1✔
1501
      buffer.putLong(size);
1✔
1502
    }
1✔
1503

1504
    @Override
1505
    public int hashCode() {
1506
      return hash.hashCode();
1✔
1507
    }
1508

1509
    @Override
1510
    public boolean equals(@Nullable Object obj) {
1511
      if (obj == this) {
×
1512
        return true;
×
1513
      }
1514
      if (!(obj instanceof IndexEntry)) {
×
1515
        return false;
×
1516
      }
1517
      return hash.equals(((IndexEntry) obj).hash);
×
1518
    }
1519
  }
1520

1521
  private static final class EntryDescriptor {
1522
    final String key;
1523
    final ByteBuffer metadata;
1524
    final long dataSize;
1525
    final long dataCrc32c;
1526

1527
    EntryDescriptor(String key, ByteBuffer metadata, long dataSize, long dataCrc32c) {
1✔
1528
      this.key = key;
1✔
1529
      this.metadata = metadata.asReadOnlyBuffer();
1✔
1530
      this.dataSize = dataSize;
1✔
1531
      this.dataCrc32c = dataCrc32c;
1✔
1532
    }
1✔
1533

1534
    ByteBuffer encodeToEpilogue(int appVersion) {
1535
      var encodedKey = UTF_8.encode(key);
1✔
1536
      int keySize = encodedKey.remaining();
1✔
1537
      int metadataSize = metadata.remaining();
1✔
1538
      var epilogue =
1✔
1539
          ByteBuffer.allocate(keySize + metadataSize + ENTRY_TRAILER_SIZE)
1✔
1540
              .put(encodedKey)
1✔
1541
              .put(metadata.duplicate())
1✔
1542
              .putLong(ENTRY_MAGIC)
1✔
1543
              .putInt(STORE_VERSION)
1✔
1544
              .putInt(appVersion)
1✔
1545
              .putInt(keySize)
1✔
1546
              .putInt(metadataSize)
1✔
1547
              .putLong(dataSize)
1✔
1548
              .putInt((int) dataCrc32c);
1✔
1549
      var crc32c = new CRC32C();
1✔
1550
      crc32c.update(epilogue.flip());
1✔
1551
      return epilogue.limit(epilogue.capacity()).putInt((int) crc32c.getValue()).flip();
1✔
1552
    }
1553
  }
1554

1555
  private final class Entry {
1556
    static final int ANY_VERSION = -1;
1557

1558
    final Hash hash;
1559

1560
    private final ReentrantLock lock = new ReentrantLock();
1✔
1561

1562
    @GuardedBy("lock")
1563
    private long lastUsed;
1564

1565
    @GuardedBy("lock")
1566
    private long size;
1567

1568
    @GuardedBy("lock")
1569
    private int viewerCount;
1570

1571
    @GuardedBy("lock")
1572
    private @Nullable DiskEditor currentEditor;
1573

1574
    @GuardedBy("lock")
1575
    private int version;
1576

1577
    @GuardedBy("lock")
1578
    private boolean readable;
1579

1580
    @GuardedBy("lock")
1581
    private boolean writable;
1582

1583
    private @MonotonicNonNull Path lazyEntryFile;
1584
    private @MonotonicNonNull Path lazyTempEntryFile;
1585

1586
    /** This entry's descriptor as known from the last read or write. */
1587
    @GuardedBy("lock")
1588
    private @MonotonicNonNull EntryDescriptor cachedDescriptor;
1589

1590
    Entry(Hash hash) {
1✔
1591
      this.hash = hash;
1✔
1592
      this.lastUsed = -1;
1✔
1593
      this.readable = false;
1✔
1594
      this.writable = true;
1✔
1595
    }
1✔
1596

1597
    Entry(IndexEntry indexEntry) {
1✔
1598
      this.hash = indexEntry.hash;
1✔
1599
      this.lastUsed = indexEntry.lastUsed;
1✔
1600
      this.size = indexEntry.size;
1✔
1601
      this.readable = true;
1✔
1602
      this.writable = true;
1✔
1603
    }
1✔
1604

1605
    @Nullable IndexEntry toIndexEntry() {
1606
      lock.lock();
1✔
1607
      try {
1608
        return readable ? new IndexEntry(hash, lastUsed, size) : null;
1✔
1609
      } finally {
1610
        lock.unlock();
1✔
1611
      }
1612
    }
1613

1614
    @Nullable Viewer view(@Nullable String expectedKey) throws IOException {
1615
      var viewer = openViewerForKey(expectedKey);
1✔
1616
      if (viewer != null) {
1✔
1617
        indexWriteScheduler.trySchedule();
1✔
1618
      }
1619
      return viewer;
1✔
1620
    }
1621

1622
    private @Nullable Viewer openViewerForKey(@Nullable String expectedKey) throws IOException {
1623
      lock.lock();
1✔
1624
      try {
1625
        if (!readable) {
1✔
1626
          return null;
1✔
1627
        }
1628

1629
        try (var closeable = new FileChannelCloseable(FileChannel.open(entryFile(), READ))) {
1✔
1630
          var channel = closeable.channel();
1✔
1631
          var descriptor = readDescriptorForKey(channel, expectedKey);
1✔
1632
          if (descriptor == null) {
1✔
1633
            return null;
1✔
1634
          }
1635
          var viewer = createViewer(channel, version, descriptor);
1✔
1636
          closeable.keepOpen();
1✔
1637
          return viewer;
1✔
1638
        } catch (NoSuchFileException missingEntryFile) {
1✔
1639
          // Our file disappeared! We'll handle this gracefully by making the store lose track of
1640
          // us. This is done after releasing the lock to not incur a potential index write while
1641
          // holding it in case the executor is synchronous.
1642
          logger.log(Level.WARNING, "Dropping entry with missing file", missingEntryFile);
1✔
1643
        }
1644
      } finally {
1645
        lock.unlock();
1✔
1646
      }
1647

1648
      try {
1649
        removeEntry(this);
1✔
1650
      } catch (IOException e) {
×
1651
        logger.log(Level.WARNING, "Exception while deleting already non-existent entry");
×
1652
      }
1✔
1653
      return null;
1✔
1654
    }
1655

1656
    @GuardedBy("lock") // Lock must be held due to potential write to cachedDescriptor.
1657
    private @Nullable EntryDescriptor readDescriptorForKey(
1658
        FileChannel channel, @Nullable String expectedKey) throws IOException {
1659
      var descriptor = cachedDescriptor;
1✔
1660
      if (descriptor == null) {
1✔
1661
        descriptor = readDescriptor(channel);
1✔
1662
      }
1663
      if (keyMismatches(descriptor.key, expectedKey)) {
1✔
1664
        return null;
1✔
1665
      }
1666
      cachedDescriptor = descriptor;
1✔
1667
      return descriptor;
1✔
1668
    }
1669

1670
    @GuardedBy("lock")
1671
    private EntryDescriptor readDescriptor(FileChannel channel) throws IOException {
1672
      // TODO a smarter thing to do is to read a larger buffer from the end and optimistically
1673
      //      expect key and metadata to be there. Or store the sizes of metadata, data & key in
1674
      //      the index.
1675
      long fileSize = channel.size();
1✔
1676
      var trailer = FileIO.read(channel, ENTRY_TRAILER_SIZE, fileSize - ENTRY_TRAILER_SIZE);
1✔
1677
      long magic = trailer.getLong();
1✔
1678
      int storeVersion = trailer.getInt();
1✔
1679
      int appVersion = trailer.getInt();
1✔
1680
      int keySize = getNonNegativeInt(trailer);
1✔
1681
      int metadataSize = getNonNegativeInt(trailer);
1✔
1682
      long dataSize = getNonNegativeLong(trailer);
1✔
1683
      long dataCrc32c = trailer.getInt() & INT_MASK;
1✔
1684
      long epilogueCrc32c = trailer.getInt() & INT_MASK;
1✔
1685
      checkValue(ENTRY_MAGIC, magic, "Not in entry file format");
1✔
1686
      checkValue(STORE_VERSION, storeVersion, "Unexpected store version");
1✔
1687
      checkValue(DiskStore.this.appVersion, appVersion, "Unexpected app version");
1✔
1688
      var keyAndMetadata = FileIO.read(channel, keySize + metadataSize, dataSize);
1✔
1689
      var key = UTF_8.decode(keyAndMetadata.limit(keySize)).toString();
1✔
1690
      var metadata =
1✔
1691
          ByteBuffer.allocate(keyAndMetadata.limit(keySize + metadataSize).remaining())
1✔
1692
              .put(keyAndMetadata)
1✔
1693
              .flip()
1✔
1694
              .asReadOnlyBuffer();
1✔
1695
      var crc32c = new CRC32C();
1✔
1696
      crc32c.update(keyAndMetadata.rewind());
1✔
1697
      crc32c.update(trailer.rewind().limit(trailer.limit() - Integer.BYTES));
1✔
1698
      checkValue(crc32c.getValue(), epilogueCrc32c, "Unexpected epilogue checksum");
1✔
1699
      return new EntryDescriptor(key, metadata, dataSize, dataCrc32c);
1✔
1700
    }
1701

1702
    @GuardedBy("lock")
1703
    private Viewer createViewer(FileChannel channel, int version, EntryDescriptor descriptor) {
1704
      var viewer = new DiskViewer(this, version, descriptor, channel);
1✔
1705
      viewerCount++;
1✔
1706
      lastUsed = lruClock.getAndIncrement();
1✔
1707
      return viewer;
1✔
1708
    }
1709

1710
    @Nullable Editor edit(String key, int targetVersion) throws IOException {
1711
      lock.lock();
1✔
1712
      try {
1713
        if (!writable
1✔
1714
            || currentEditor != null
1715
            || (targetVersion != ANY_VERSION && targetVersion != version)) {
1716
          return null;
1✔
1717
        }
1718
        var editor = new DiskEditor(this, key, FileChannel.open(tempEntryFile(), WRITE, CREATE));
1✔
1719
        currentEditor = editor;
1✔
1720
        return editor;
1✔
1721
      } finally {
1722
        lock.unlock();
1✔
1723
      }
1724
    }
1725

1726
    void commit(
1727
        DiskEditor editor,
1728
        String key,
1729
        ByteBuffer metadata,
1730
        FileChannel editorChannel,
1731
        long dataSize,
1732
        long dataCrc32c)
1733
        throws IOException {
1734
      long newSize;
1735
      long sizeDifference;
1736
      lock.lock();
1✔
1737
      try {
1738
        requireState(currentEditor == editor, "Edit discarded");
1!
1739
        currentEditor = null;
1✔
1740

1741
        requireState(writable, "Committing a non-discarded edit to a non-writable entry");
1✔
1742

1743
        EntryDescriptor committedDescriptor;
1744
        boolean editInPlace = dataSize < 0 && readable;
1✔
1745
        try (editorChannel;
1✔
1746
            var existingEntryChannel =
1747
                editInPlace ? FileChannel.open(entryFile(), READ, WRITE) : null) {
1✔
1748
          var targetChannel = editorChannel;
1✔
1749
          EntryDescriptor existingEntryDescriptor = null;
1✔
1750
          if (existingEntryChannel != null
1✔
1751
              && (existingEntryDescriptor = readDescriptorForKey(existingEntryChannel, key))
1!
1752
                  != null) {
1753
            targetChannel = existingEntryChannel;
1✔
1754
            committedDescriptor =
1✔
1755
                new EntryDescriptor(
1756
                    key,
1757
                    metadata,
1758
                    existingEntryDescriptor.dataSize,
1759
                    existingEntryDescriptor.dataCrc32c);
1760

1761
            // Close editor's file channel before deleting. See isolatedDeleteIfExists(Path).
1762
            closeQuietly(editorChannel);
1✔
1763

1764
            // Make the entry file temporarily unreadable before modifying it. This also has to
1765
            // reflect on store's size.
1766
            replace(entryFile(), tempEntryFile());
1✔
1767
            readable = false;
1✔
1768
            DiskStore.this.size.addAndGet(-size);
1✔
1769
            size = 0;
1✔
1770
          } else {
1771
            committedDescriptor =
1✔
1772
                new EntryDescriptor(key, metadata, Math.max(dataSize, 0), dataCrc32c);
1✔
1773
          }
1774

1775
          int written =
1✔
1776
              FileIO.write(
1✔
1777
                  targetChannel,
1778
                  committedDescriptor.encodeToEpilogue(appVersion),
1✔
1779
                  committedDescriptor.dataSize);
1780
          if (existingEntryDescriptor != null) {
1✔
1781
            // Truncate to correct size in case the previous entry had a larger epilogue.
1782
            targetChannel.truncate(committedDescriptor.dataSize + written);
1✔
1783
          }
1784
          targetChannel.force(false);
1✔
1785
        } catch (IOException e) {
×
1786
          discardCurrentEdit(editor);
×
1787
          throw e;
×
1788
        }
1✔
1789

1790
        if (viewerCount > 0) {
1✔
1791
          isolatedDeleteIfExists(entryFile());
1✔
1792
        }
1793
        replace(tempEntryFile(), entryFile());
1✔
1794
        version++;
1✔
1795
        newSize = committedDescriptor.metadata.remaining() + committedDescriptor.dataSize;
1✔
1796
        sizeDifference = newSize - size;
1✔
1797
        size = newSize;
1✔
1798
        readable = true;
1✔
1799
        lastUsed = lruClock.getAndIncrement();
1✔
1800
        cachedDescriptor = committedDescriptor;
1✔
1801
      } finally {
1802
        lock.unlock();
1✔
1803
      }
1804

1805
      long newStoreSize = DiskStore.this.size.addAndGet(sizeDifference);
1✔
1806

1807
      // Don't bother with the entry if it'll cause everything to be evicted.
1808
      if (newSize > maxSize) {
1✔
1809
        removeEntry(this);
1✔
1810
        return;
1✔
1811
      }
1812

1813
      if (newStoreSize > maxSize) {
1✔
1814
        evictionScheduler.schedule();
1✔
1815
      }
1816
      indexWriteScheduler.trySchedule();
1✔
1817
    }
1✔
1818

1819
    /**
1820
     * Evicts this entry if it matches the given version and returns its last committed size if it
1821
     * did get evicted, otherwise returns -1.
1822
     */
1823
    long evict(int targetVersion) throws IOException {
1824
      lock.lock();
1✔
1825
      try {
1826
        if (!writable || (targetVersion != ANY_VERSION && targetVersion != version)) {
1✔
1827
          return -1;
1✔
1828
        }
1829

1830
        if (viewerCount > 0) {
1✔
1831
          isolatedDeleteIfExists(entryFile());
1✔
1832
        } else {
1833
          Files.deleteIfExists(entryFile());
1✔
1834
        }
1835
        discardCurrentEdit();
1✔
1836
        readable = false;
1✔
1837
        writable = false;
1✔
1838
        return size;
1✔
1839
      } finally {
1840
        lock.unlock();
1✔
1841
      }
1842
    }
1843

1844
    void freeze() {
1845
      lock.lock();
1✔
1846
      try {
1847
        writable = false;
1✔
1848
        discardCurrentEdit();
1✔
1849
      } finally {
1850
        lock.unlock();
1✔
1851
      }
1852
    }
1✔
1853

1854
    @GuardedBy("lock")
1855
    private void discardCurrentEdit() {
1856
      var editor = currentEditor;
1✔
1857
      if (editor != null) {
1✔
1858
        currentEditor = null;
1✔
1859
        discardCurrentEdit(editor);
1✔
1860
      }
1861
    }
1✔
1862

1863
    @GuardedBy("lock")
1864
    private void discardCurrentEdit(DiskEditor editor) {
1865
      if (!readable) {
1✔
1866
        // Remove the entry as it could never be readable. It's safe to directly remove it from the
1867
        // map since it's not visible to the outside world at this point (no views/edits) and
1868
        // doesn't contribute to store size.
1869
        entries.remove(hash, this);
1✔
1870
      }
1871

1872
      editor.setClosed();
1✔
1873
      closeQuietly(editor.channel);
1✔
1874
      deleteIfExistsQuietly(tempEntryFile());
1✔
1875
    }
1✔
1876

1877
    void discardIfCurrentEdit(DiskEditor editor) {
1878
      lock.lock();
1✔
1879
      try {
1880
        if (editor == currentEditor) {
1!
1881
          currentEditor = null;
1✔
1882
          discardCurrentEdit(editor);
1✔
1883
        }
1884
      } finally {
1885
        lock.unlock();
1✔
1886
      }
1887
    }
1✔
1888

1889
    void decrementViewerCount() {
1890
      lock.lock();
1✔
1891
      try {
1892
        viewerCount--;
1✔
1893
      } finally {
1894
        lock.unlock();
1✔
1895
      }
1896
    }
1✔
1897

1898
    @Nullable String keyIfKnown(int[] versionHolder) {
1899
      lock.lock();
1✔
1900
      try {
1901
        var descriptor = cachedDescriptor;
1✔
1902
        if (descriptor != null) {
1✔
1903
          versionHolder[0] = version;
1✔
1904
          return descriptor.key;
1✔
1905
        }
1906
        return null;
1✔
1907
      } finally {
1908
        lock.unlock();
1✔
1909
      }
1910
    }
1911

1912
    @Nullable String currentEditorKey() {
1913
      lock.lock();
1✔
1914
      try {
1915
        var editor = currentEditor;
1✔
1916
        return editor != null ? editor.key() : null;
1!
1917
      } finally {
1918
        lock.unlock();
1✔
1919
      }
1920
    }
1921

1922
    Path entryFile() {
1923
      var entryFile = lazyEntryFile;
1✔
1924
      if (entryFile == null) {
1✔
1925
        entryFile = directory.resolve(hash.toHexString() + ENTRY_FILE_SUFFIX);
1✔
1926
        lazyEntryFile = entryFile;
1✔
1927
      }
1928
      return entryFile;
1✔
1929
    }
1930

1931
    Path tempEntryFile() {
1932
      var entryFile = lazyTempEntryFile;
1✔
1933
      if (entryFile == null) {
1✔
1934
        entryFile = directory.resolve(hash.toHexString() + TEMP_ENTRY_FILE_SUFFIX);
1✔
1935
        lazyTempEntryFile = entryFile;
1✔
1936
      }
1937
      return entryFile;
1✔
1938
    }
1939
  }
1940

1941
  private final class DiskViewer implements Viewer {
1942
    private final Entry entry;
1943

1944
    /**
1945
     * Entry's version at the time of opening this viewer. This is used to not edit or remove an
1946
     * entry that's been updated after this viewer had been created.
1947
     */
1948
    private final int entryVersion;
1949

1950
    private final EntryDescriptor descriptor;
1951
    private final FileChannel channel;
1952
    private final AtomicBoolean closed = new AtomicBoolean();
1✔
1953
    private final AtomicBoolean createdFirstReader = new AtomicBoolean();
1✔
1954

1955
    DiskViewer(Entry entry, int entryVersion, EntryDescriptor descriptor, FileChannel channel) {
1✔
1956
      this.entry = entry;
1✔
1957
      this.entryVersion = entryVersion;
1✔
1958
      this.descriptor = descriptor;
1✔
1959
      this.channel = channel;
1✔
1960
    }
1✔
1961

1962
    @Override
1963
    public String key() {
1964
      return descriptor.key;
1✔
1965
    }
1966

1967
    @Override
1968
    public ByteBuffer metadata() {
1969
      return descriptor.metadata.duplicate();
1✔
1970
    }
1971

1972
    @Override
1973
    public EntryReader newReader() {
1974
      return createdFirstReader.compareAndSet(false, true)
1✔
1975
          ? new ScatteringDiskEntryReader()
1✔
1976
          : new DiskEntryReader();
1✔
1977
    }
1978

1979
    @Override
1980
    public Optional<Editor> edit() throws IOException {
1981
      return Optional.ofNullable(entry.edit(key(), entryVersion));
1✔
1982
    }
1983

1984
    @Override
1985
    public CompletableFuture<Optional<Editor>> edit(Executor executor) {
1986
      return Unchecked.supplyAsync(this::edit, executor);
1✔
1987
    }
1988

1989
    @Override
1990
    public long dataSize() {
1991
      return descriptor.dataSize;
1✔
1992
    }
1993

1994
    @Override
1995
    public long entrySize() {
1996
      return descriptor.metadata.remaining() + descriptor.dataSize;
1✔
1997
    }
1998

1999
    @Override
2000
    public boolean removeEntry() throws IOException {
2001
      return DiskStore.this.removeEntry(entry, entryVersion);
1✔
2002
    }
2003

2004
    @Override
2005
    public void close() {
2006
      closeQuietly(channel);
1✔
2007
      if (closed.compareAndSet(false, true)) {
1✔
2008
        entry.decrementViewerCount();
1✔
2009
      }
2010
    }
1✔
2011

2012
    private class DiskEntryReader implements EntryReader {
2013
      final Lock lock = new ReentrantLock();
1✔
2014
      final CRC32C crc32C = new CRC32C();
1✔
2015
      long position;
2016

2017
      DiskEntryReader() {}
1✔
2018

2019
      @Override
2020
      public int read(ByteBuffer dst) throws IOException {
2021
        requireNonNull(dst);
1✔
2022
        lock.lock();
1✔
2023
        try {
2024
          // Make sure we don't exceed data stream bounds.
2025
          long available = descriptor.dataSize - position;
1✔
2026
          if (available <= 0) {
1✔
2027
            return -1;
1✔
2028
          }
2029

2030
          int maxReadable = (int) Math.min(available, dst.remaining());
1✔
2031
          var boundedDst = dst.duplicate().limit(dst.position() + maxReadable);
1✔
2032
          int read = readBytes(boundedDst);
1✔
2033
          position += read;
1✔
2034
          crc32C.update(boundedDst.rewind());
1✔
2035
          checkCrc32cIfEndOfStream();
1✔
2036
          dst.position(dst.position() + read);
1✔
2037
          return read;
1✔
2038
        } finally {
2039
          lock.unlock();
1✔
2040
        }
2041
      }
2042

2043
      @Override
2044
      public CompletableFuture<Integer> read(ByteBuffer dst, Executor executor) {
2045
        return Unchecked.supplyAsync(() -> read(dst), executor);
×
2046
      }
2047

2048
      @Override
2049
      public long read(List<ByteBuffer> dsts) throws IOException {
2050
        long totalRead = 0;
×
2051
        outerLoop:
2052
        for (var dst : dsts) {
×
2053
          int read;
2054
          while (dst.hasRemaining()) {
×
2055
            read = read(dst);
×
2056
            if (read >= 0) {
×
2057
              totalRead += read;
×
2058
            } else if (totalRead > 0) {
×
2059
              break outerLoop;
×
2060
            } else {
2061
              return -1;
×
2062
            }
2063
          }
2064
        }
×
2065
        return totalRead;
×
2066
      }
2067

2068
      @Override
2069
      public CompletableFuture<Long> read(List<ByteBuffer> dsts, Executor executor) {
2070
        return Unchecked.supplyAsync(() -> read(dsts), executor);
1✔
2071
      }
2072

2073
      int readBytes(ByteBuffer dst) throws IOException {
2074
        return FileIO.read(channel, dst, position);
1✔
2075
      }
2076

2077
      void checkCrc32cIfEndOfStream() throws StoreCorruptionException {
2078
        if (position == descriptor.dataSize) {
1✔
2079
          checkValue(crc32C.getValue(), descriptor.dataCrc32c, "Unexpected data checksum");
1✔
2080
        }
2081
      }
1✔
2082
    }
2083

2084
    /**
2085
     * A reader that uses scattering API for bulk reads. This reader relies on the file's native
2086
     * position (scattering API doesn't take a position argument). As such, it must only be created
2087
     * once. Note that this restriction makes scattering reads inefficient, or not as efficient, for
2088
     * readers created after the first reader.
2089
     */
2090
    private final class ScatteringDiskEntryReader extends DiskEntryReader {
2091
      ScatteringDiskEntryReader() {}
1✔
2092

2093
      @Override
2094
      int readBytes(ByteBuffer dst) throws IOException {
2095
        return FileIO.read(channel, dst); // Use native file position.
1✔
2096
      }
2097

2098
      @Override
2099
      public long read(List<ByteBuffer> dsts) throws IOException {
2100
        requireNonNull(dsts);
1✔
2101
        lock.lock();
1✔
2102
        try {
2103
          // Make sure we don't exceed data stream bounds.
2104
          long available = descriptor.dataSize - position;
1✔
2105
          if (available <= 0) {
1✔
2106
            return -1;
1✔
2107
          }
2108

2109
          var boundedDsts = new ArrayList<ByteBuffer>(dsts.size());
1✔
2110
          long maxReadableSoFar = 0;
1✔
2111
          for (var dst : dsts) {
1✔
2112
            int dstMaxReadable = (int) Math.min(dst.remaining(), available - maxReadableSoFar);
1✔
2113
            boundedDsts.add(dst.duplicate().limit(dst.position() + dstMaxReadable));
1✔
2114
            maxReadableSoFar = Math.addExact(maxReadableSoFar, dstMaxReadable);
1✔
2115
            if (maxReadableSoFar >= available) {
1✔
2116
              break;
1✔
2117
            }
2118
          }
1✔
2119

2120
          long read = FileIO.read(channel, boundedDsts.toArray(ByteBuffer[]::new));
1✔
2121
          position += read;
1✔
2122
          for (var boundedDst : boundedDsts) {
1✔
2123
            crc32C.update(boundedDst.rewind());
1✔
2124
          }
1✔
2125
          checkCrc32cIfEndOfStream();
1✔
2126
          for (int i = 0; i < boundedDsts.size(); i++) {
1✔
2127
            dsts.get(i).position(boundedDsts.get(i).position());
1✔
2128
          }
2129
          return read;
1✔
2130
        } finally {
2131
          lock.unlock();
1✔
2132
        }
2133
      }
2134
    }
2135
  }
2136

2137
  private static final class DiskEditor implements Editor {
2138
    private final Entry entry;
2139
    private final String key;
2140
    private final FileChannel channel;
2141
    private final DiskEntryWriter writer;
2142
    private final AtomicBoolean closed = new AtomicBoolean();
1✔
2143

2144
    DiskEditor(Entry entry, String key, FileChannel channel) {
1✔
2145
      this.entry = entry;
1✔
2146
      this.key = key;
1✔
2147
      this.channel = channel;
1✔
2148
      this.writer = new DiskEntryWriter();
1✔
2149
    }
1✔
2150

2151
    @Override
2152
    public String key() {
2153
      return key;
×
2154
    }
2155

2156
    @Override
2157
    public EntryWriter writer() {
2158
      return writer;
1✔
2159
    }
2160

2161
    @Override
2162
    public void commit(ByteBuffer metadata) throws IOException {
2163
      requireNonNull(metadata);
1✔
2164
      requireState(closed.compareAndSet(false, true), "closed");
1✔
2165
      internalCommit(metadata);
1✔
2166
    }
1✔
2167

2168
    @Override
2169
    public CompletableFuture<Void> commit(ByteBuffer metadata, Executor executor) {
2170
      requireNonNull(metadata);
1✔
2171
      requireState(closed.compareAndSet(false, true), "closed");
1✔
2172
      return Unchecked.runAsync(() -> internalCommit(metadata), executor);
1✔
2173
    }
2174

2175
    private void internalCommit(ByteBuffer metadata) throws IOException {
2176
      long[] crc32cHolder = new long[1];
1✔
2177
      long dataSize = writer.dataSizeIfWritten(crc32cHolder);
1✔
2178
      entry.commit(this, key, metadata, channel, dataSize, crc32cHolder[0]);
1✔
2179
    }
1✔
2180

2181
    @Override
2182
    public void close() {
2183
      if (closed.compareAndSet(false, true)) {
1✔
2184
        entry.discardIfCurrentEdit(this);
1✔
2185
      }
2186
    }
1✔
2187

2188
    public void setClosed() {
2189
      closed.set(true);
1✔
2190
    }
1✔
2191

2192
    private final class DiskEntryWriter implements EntryWriter {
2193
      private final Lock lock = new ReentrantLock();
1✔
2194
      private final CRC32C crc32C = new CRC32C();
1✔
2195
      private long position;
2196
      private boolean isWritten;
2197

2198
      DiskEntryWriter() {}
1✔
2199

2200
      @Override
2201
      public int write(ByteBuffer src) throws IOException {
2202
        requireNonNull(src);
1✔
2203
        requireState(!closed.get(), "closed");
1✔
2204
        lock.lock();
1✔
2205
        try {
2206
          int srcPosition = src.position();
1✔
2207
          int written = FileIO.write(channel, src);
1✔
2208
          crc32C.update(src.position(srcPosition));
1✔
2209
          position += written;
1✔
2210
          isWritten = true;
1✔
2211
          return written;
1✔
2212
        } finally {
2213
          lock.unlock();
1✔
2214
        }
2215
      }
2216

2217
      @Override
2218
      public CompletableFuture<Integer> write(ByteBuffer src, Executor executor) {
2219
        requireNonNull(src);
×
2220
        return Unchecked.supplyAsync(() -> write(src), executor);
×
2221
      }
2222

2223
      @Override
2224
      public long write(List<ByteBuffer> srcs) throws IOException {
2225
        requireNonNull(srcs);
1✔
2226
        requireState(!closed.get(), "closed");
1✔
2227
        lock.lock();
1✔
2228
        try {
2229
          var srcsArray = srcs.toArray(ByteBuffer[]::new);
1✔
2230
          int[] srcPositions = new int[srcsArray.length];
1✔
2231
          for (int i = 0; i < srcsArray.length; i++) {
1✔
2232
            srcPositions[i] = srcsArray[i].position();
1✔
2233
          }
2234
          long written = FileIO.write(channel, srcsArray);
1✔
2235
          for (int i = 0; i < srcsArray.length; i++) {
1✔
2236
            crc32C.update(srcsArray[i].position(srcPositions[i]));
1✔
2237
          }
2238
          position += written;
1✔
2239
          isWritten = true;
1✔
2240
          return written;
1✔
2241
        } finally {
2242
          lock.unlock();
1✔
2243
        }
2244
      }
2245

2246
      @Override
2247
      public CompletableFuture<Long> write(List<ByteBuffer> srcs, Executor executor) {
2248
        requireNonNull(srcs);
1✔
2249
        return Unchecked.supplyAsync(() -> write(srcs), executor);
1✔
2250
      }
2251

2252
      long dataSizeIfWritten(long[] crc32cHolder) {
2253
        lock.lock();
1✔
2254
        try {
2255
          if (isWritten) {
1✔
2256
            crc32cHolder[0] = crc32C.getValue();
1✔
2257
            return position;
1✔
2258
          } else {
2259
            return -1;
1✔
2260
          }
2261
        } finally {
2262
          lock.unlock();
1✔
2263
        }
2264
      }
2265
    }
2266
  }
2267

2268
  public static final class Builder {
2269
    private static final long DEFAULT_INDEX_UPDATE_DELAY_MILLIS = 2000;
2270
    private static final Duration DEFAULT_INDEX_UPDATE_DELAY;
2271

2272
    static {
2273
      long millis =
1✔
2274
          Long.getLong(
1✔
2275
              "com.github.mizosoft.methanol.internal.cache.DiskStore.indexUpdateDelayMillis",
2276
              DEFAULT_INDEX_UPDATE_DELAY_MILLIS);
2277
      if (millis < 0) {
1!
2278
        millis = DEFAULT_INDEX_UPDATE_DELAY_MILLIS;
×
2279
      }
2280
      DEFAULT_INDEX_UPDATE_DELAY = Duration.ofMillis(millis);
1✔
2281
    }
1✔
2282

2283
    private static final int UNSET_NUMBER = -1;
2284

2285
    private long maxSize = UNSET_NUMBER;
1✔
2286
    private @MonotonicNonNull Path directory;
2287
    private @MonotonicNonNull Executor executor;
2288
    private int appVersion = UNSET_NUMBER;
1✔
2289
    private @MonotonicNonNull Hasher hasher;
2290
    private @MonotonicNonNull Clock clock;
2291
    private @MonotonicNonNull Delayer delayer;
2292
    private @MonotonicNonNull Duration indexUpdateDelay;
2293
    private boolean debugIndexOps;
2294

2295
    Builder() {}
1✔
2296

2297
    @CanIgnoreReturnValue
2298
    public Builder directory(Path directory) {
2299
      this.directory = requireNonNull(directory);
1✔
2300
      return this;
1✔
2301
    }
2302

2303
    @CanIgnoreReturnValue
2304
    public Builder maxSize(long maxSize) {
2305
      requireArgument(maxSize > 0, "Expected a positive max size");
1!
2306
      this.maxSize = maxSize;
1✔
2307
      return this;
1✔
2308
    }
2309

2310
    @CanIgnoreReturnValue
2311
    public Builder executor(Executor executor) {
2312
      this.executor = requireNonNull(executor);
1✔
2313
      return this;
1✔
2314
    }
2315

2316
    @CanIgnoreReturnValue
2317
    public Builder appVersion(int appVersion) {
2318
      this.appVersion = appVersion;
1✔
2319
      return this;
1✔
2320
    }
2321

2322
    @CanIgnoreReturnValue
2323
    public Builder hasher(Hasher hasher) {
2324
      this.hasher = requireNonNull(hasher);
1✔
2325
      return this;
1✔
2326
    }
2327

2328
    @CanIgnoreReturnValue
2329
    public Builder clock(Clock clock) {
2330
      this.clock = requireNonNull(clock);
1✔
2331
      return this;
1✔
2332
    }
2333

2334
    @CanIgnoreReturnValue
2335
    public Builder delayer(Delayer delayer) {
2336
      this.delayer = requireNonNull(delayer);
1✔
2337
      return this;
1✔
2338
    }
2339

2340
    @CanIgnoreReturnValue
2341
    public Builder indexUpdateDelay(Duration duration) {
2342
      this.indexUpdateDelay = requireNonNegativeDuration(duration);
1✔
2343
      return this;
1✔
2344
    }
2345

2346
    /**
2347
     * If set, the store complains when the index is accessed or modified either concurrently or not
2348
     * within the index executor.
2349
     */
2350
    @CanIgnoreReturnValue
2351
    public Builder debugIndexOps(boolean on) {
2352
      this.debugIndexOps = on;
1✔
2353
      return this;
1✔
2354
    }
2355

2356
    public DiskStore build() throws IOException {
2357
      return new DiskStore(this, debugIndexOps || DebugUtils.isAssertionsEnabled());
1!
2358
    }
2359

2360
    long maxSize() {
2361
      long maxSize = this.maxSize;
1✔
2362
      requireState(maxSize != UNSET_NUMBER, "Expected maxSize to bet set");
1!
2363
      return maxSize;
1✔
2364
    }
2365

2366
    int appVersion() {
2367
      int appVersion = this.appVersion;
1✔
2368
      requireState(appVersion != UNSET_NUMBER, "Expected appVersion to be set");
1!
2369
      return appVersion;
1✔
2370
    }
2371

2372
    Path directory() {
2373
      return ensureSet(directory, "directory");
1✔
2374
    }
2375

2376
    Executor executor() {
2377
      return ensureSet(executor, "executor");
1✔
2378
    }
2379

2380
    Hasher hasher() {
2381
      return requireNonNullElse(hasher, Hasher.TRUNCATED_SHA_256);
1✔
2382
    }
2383

2384
    Clock clock() {
2385
      return requireNonNullElse(clock, Utils.systemMillisUtc());
1✔
2386
    }
2387

2388
    Duration indexUpdateDelay() {
2389
      return requireNonNullElse(indexUpdateDelay, DEFAULT_INDEX_UPDATE_DELAY);
1✔
2390
    }
2391

2392
    Delayer delayer() {
2393
      return requireNonNullElse(delayer, Delayer.defaultDelayer());
1✔
2394
    }
2395

2396
    @CanIgnoreReturnValue
2397
    private <T> T ensureSet(T property, String name) {
2398
      requireState(property != null, "Expected %s to bet set", name);
1!
2399
      return property;
1✔
2400
    }
2401
  }
2402
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc