• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mizosoft / methanol / #558

04 Mar 2025 11:52PM UTC coverage: 89.982% (-0.06%) from 90.042%
#558

push

github

mizosoft
Link to 1.8.2 in root README

7581 of 8425 relevant lines covered (89.98%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.03
/methanol/src/main/java/com/github/mizosoft/methanol/internal/cache/DiskStore.java
1
/*
2
 * Copyright (c) 2024 Moataz Hussein
3
 *
4
 * Permission is hereby granted, free of charge, to any person obtaining a copy
5
 * of this software and associated documentation files (the "Software"), to deal
6
 * in the Software without restriction, including without limitation the rights
7
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
 * copies of the Software, and to permit persons to whom the Software is
9
 * furnished to do so, subject to the following conditions:
10
 *
11
 * The above copyright notice and this permission notice shall be included in all
12
 * copies or substantial portions of the Software.
13
 *
14
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20
 * SOFTWARE.
21
 *
22
 */
23

24
package com.github.mizosoft.methanol.internal.cache;
25

26
import static com.github.mizosoft.methanol.internal.Utils.requireNonNegativeDuration;
27
import static com.github.mizosoft.methanol.internal.Validate.castNonNull;
28
import static com.github.mizosoft.methanol.internal.Validate.requireArgument;
29
import static com.github.mizosoft.methanol.internal.Validate.requireState;
30
import static java.nio.charset.StandardCharsets.UTF_8;
31
import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
32
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
33
import static java.nio.file.StandardOpenOption.CREATE;
34
import static java.nio.file.StandardOpenOption.READ;
35
import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
36
import static java.nio.file.StandardOpenOption.WRITE;
37
import static java.util.Objects.requireNonNull;
38
import static java.util.Objects.requireNonNullElse;
39

40
import com.github.mizosoft.methanol.internal.DebugUtils;
41
import com.github.mizosoft.methanol.internal.Utils;
42
import com.github.mizosoft.methanol.internal.concurrent.Delayer;
43
import com.github.mizosoft.methanol.internal.concurrent.SerialExecutor;
44
import com.github.mizosoft.methanol.internal.function.ThrowingRunnable;
45
import com.github.mizosoft.methanol.internal.function.Unchecked;
46
import com.github.mizosoft.methanol.internal.util.Compare;
47
import com.google.errorprone.annotations.CanIgnoreReturnValue;
48
import com.google.errorprone.annotations.concurrent.GuardedBy;
49
import java.io.Closeable;
50
import java.io.EOFException;
51
import java.io.IOException;
52
import java.io.InterruptedIOException;
53
import java.io.UncheckedIOException;
54
import java.lang.System.Logger;
55
import java.lang.System.Logger.Level;
56
import java.lang.invoke.MethodHandles;
57
import java.lang.invoke.VarHandle;
58
import java.nio.ByteBuffer;
59
import java.nio.channels.FileChannel;
60
import java.nio.file.AccessDeniedException;
61
import java.nio.file.DirectoryIteratorException;
62
import java.nio.file.FileAlreadyExistsException;
63
import java.nio.file.Files;
64
import java.nio.file.NoSuchFileException;
65
import java.nio.file.Path;
66
import java.security.MessageDigest;
67
import java.security.NoSuchAlgorithmException;
68
import java.time.Clock;
69
import java.time.Duration;
70
import java.time.Instant;
71
import java.util.ArrayList;
72
import java.util.Collection;
73
import java.util.Collections;
74
import java.util.Comparator;
75
import java.util.HashMap;
76
import java.util.HashSet;
77
import java.util.Iterator;
78
import java.util.List;
79
import java.util.Map;
80
import java.util.NoSuchElementException;
81
import java.util.Optional;
82
import java.util.Set;
83
import java.util.TreeMap;
84
import java.util.concurrent.CompletableFuture;
85
import java.util.concurrent.ConcurrentHashMap;
86
import java.util.concurrent.Executor;
87
import java.util.concurrent.Phaser;
88
import java.util.concurrent.ThreadLocalRandom;
89
import java.util.concurrent.atomic.AtomicBoolean;
90
import java.util.concurrent.atomic.AtomicInteger;
91
import java.util.concurrent.atomic.AtomicLong;
92
import java.util.concurrent.locks.Lock;
93
import java.util.concurrent.locks.ReadWriteLock;
94
import java.util.concurrent.locks.ReentrantLock;
95
import java.util.concurrent.locks.ReentrantReadWriteLock;
96
import java.util.function.Supplier;
97
import java.util.zip.CRC32C;
98
import org.checkerframework.checker.nullness.qual.EnsuresNonNullIf;
99
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
100
import org.checkerframework.checker.nullness.qual.Nullable;
101

102
/**
103
 * A persistent {@link Store} implementation that saves entries on disk under a specified directory.
104
 * A {@code DiskStore} instance assumes exclusive ownership of its directory; only a single {@code
105
 * DiskStore} from a single JVM process can safely operate on a given directory. This assumption is
106
 * cooperatively enforced among {@code DiskStore} instances such that attempting to initialize a
107
 * store with a directory that is in use by another store in the same or a different JVM process
108
 * will cause an {@code IOException} to be thrown.
109
 *
110
 * <p>The store keeps track of entries known to it across sessions by maintaining an on-disk
111
 * hashtable called the index. As changes are made to the store by adding, accessing or removing
112
 * entries, the index is transparently updated in a time-limited manner. By default, there's at most
113
 * one index update every 2 seconds. This rate can be changed by setting the system property: {@code
114
 * com.github.mizosoft.methanol.internal.cache.DiskStore.indexUpdateDelayMillis}. Setting a small
115
 * delay can result in too often index updates, which extracts a noticeable toll on IO and CPU,
116
 * especially if there's a relatively large number of entries (updating entails reconstructing then
117
 * rewriting the whole index). On the other hand, scarcely updating the index affords less
118
 * durability against crashes as entries that aren't indexed are dropped on initialization. Calling
119
 * the {@code flush} method forces an index update, regardless of the time limit.
120
 *
121
 * <p>To ensure entries are not lost across sessions, a store must be {@link #close() closed} after
122
 * it has been done with. The {@link #dispose()} method can be called to atomically close the store
123
 * and clear its directory if persistence isn't needed (e.g. using temp directories for storage). A
124
 * closed store usually throws an {@code IllegalStateException} when used.
125
 */
126
public final class DiskStore implements Store, TestableStore {
127
  /*
128
   * The store's layout on disk is as follows:
129
   *
130
   *   - An 'index' file.
131
   *   - A corresponding file for each entry with its name being the hex string of the first 80
132
   *     bits of the key's SHA-245, concatenated to the suffix '.ch3oh'.
133
   *   - A '.lock' indicating that the directory is currently being used.
134
   *
135
   * The index and entry files are formatted as follows (in slang BNF):
136
   *
137
   *   <index> = <index-header> <index-entry>*
138
   *   <index-header> = 8-bytes-index-magic
139
   *                    4-bytes-store-version
140
   *                    4-bytes-app-version
141
   *                    8-bytes-entry-count
142
   *   <index-entry> = 10-bytes-entry-hash
143
   *                   8-bytes-last-used-millis (maintained for LRU eviction)
144
   *                   8-bytes-entry-size
145
   *
146
   *   <entry> = <data> <entry-epilogue>
147
   *   <data> = byte*
148
   *   <entry-epilogue> = <key> <metadata> <entry-trailer>
149
   *   <key> = utf8-byte*
150
   *   <metadata> = byte*
151
   *   <entry-trailer> = 8-bytes-entry-magic
152
   *                     4-bytes-store-version
153
   *                     4-bytes-app-version
154
   *                     4-bytes-key-size
155
   *                     4-bytes-metadata-size
156
   *                     8-bytes-data-size
157
   *                     4-bytes-data-crc32c
158
   *                     4-bytes-epilogue-crc32c
159
   *
160
   * Having the key, metadata & their sizes at the end of the file makes it easier and quicker to
161
   * update an entry when only its metadata block changes (and possibly its key in case there's a
162
   * hash collision). In such case, an entry update only overwrites <entry-epilogue> next to an
163
   * existing <data>, truncating the file if necessary. Having an <entry-trailer> instead of an
164
   * <entry-header> allows validating the entry file and knowing its key & metadata sizes in a
165
   * single read.
166
   *
167
   * An effort is made to ensure store operations on disk are atomic. Index and entry writers first
168
   * do their work on a temp file. After they're done, the previous version of the file, if any, is
169
   * atomically replaced. Index writes ensure there's a channel::force before the atomic move. Viewers
170
   * opened for an entry see a constant snapshot of that entry's data even if the entry is removed or
171
   * edited one or more times.
172
   */
173

174
  private static final Logger logger = System.getLogger(DiskStore.class.getName());
1✔
175

176
  /** Indicates whether a task is currently being run by the index executor. Used for debugging. */
177
  private static final ThreadLocal<Boolean> isIndexExecutor = ThreadLocal.withInitial(() -> false);
1✔
178

179
  /**
180
   * The max number of entries an index file can contain. This caps on what to be read from the
181
   * index so that an {@code OutOfMemoryError} is not thrown when reading some corrupt index file.
182
   */
183
  private static final int MAX_ENTRY_COUNT = 1_000_000;
184

185
  static final long INDEX_MAGIC = 0x6d657468616e6f6cL;
186
  static final long ENTRY_MAGIC = 0x7b6368332d6f687dL;
187
  static final int STORE_VERSION = 2;
188
  static final int INDEX_HEADER_SIZE = 2 * Long.BYTES + 2 * Integer.BYTES;
189
  static final int INDEX_ENTRY_SIZE = Hash.BYTES + 2 * Long.BYTES;
190
  static final int ENTRY_TRAILER_SIZE = 2 * Long.BYTES + 6 * Integer.BYTES;
191
  static final long INT_MASK = 0xffffffffL;
192
  static final int SHORT_MASK = 0xffff;
193

194
  static final String LOCK_FILENAME = ".lock";
195
  static final String INDEX_FILENAME = "index";
196
  static final String TEMP_INDEX_FILENAME = "index.tmp";
197
  static final String ENTRY_FILE_SUFFIX = ".ch3oh";
198
  static final String TEMP_ENTRY_FILE_SUFFIX = ".ch3oh.tmp";
199
  static final String ISOLATED_FILE_PREFIX = "RIP_";
200

201
  private final long maxSize;
202
  private final int appVersion;
203
  private final Path directory;
204
  private final Hasher hasher;
205
  private final SerialExecutor indexExecutor;
206
  private final IndexOperator indexOperator;
207
  private final IndexWriteScheduler indexWriteScheduler;
208
  private final EvictionScheduler evictionScheduler;
209
  private final DirectoryLock directoryLock;
210
  private final ConcurrentHashMap<Hash, Entry> entries = new ConcurrentHashMap<>();
1✔
211
  private final AtomicLong size = new AtomicLong();
1✔
212

213
  /**
214
   * A monotonic clock used for ordering entries based on recency. The clock is not completely
215
   * monotonic, however, as the clock value can overflow. But a signed long gives us about 300 years
216
   * of monotonicity assuming the clock is incremented every 1 ns, which is not bad at all.
217
   */
218
  private final AtomicLong lruClock = new AtomicLong();
1✔
219

220
  private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
1✔
221

222
  @GuardedBy("closeLock")
223
  private boolean closed;
224

225
  private DiskStore(Builder builder, boolean debugIndexOps) throws IOException {
1✔
226
    maxSize = builder.maxSize();
1✔
227
    appVersion = builder.appVersion();
1✔
228
    directory = builder.directory();
1✔
229
    hasher = builder.hasher();
1✔
230
    indexExecutor =
1✔
231
        new SerialExecutor(
232
            debugIndexOps
1✔
233
                ? toDebuggingIndexExecutorDelegate(builder.executor())
1✔
234
                : builder.executor());
1✔
235
    indexOperator =
1✔
236
        debugIndexOps
1✔
237
            ? new DebugIndexOperator(directory, appVersion)
1✔
238
            : new IndexOperator(directory, appVersion);
1✔
239
    indexWriteScheduler =
1✔
240
        new IndexWriteScheduler(
241
            indexOperator,
242
            indexExecutor,
243
            this::indexEntriesSnapshot,
244
            builder.indexUpdateDelay(),
1✔
245
            builder.delayer(),
1✔
246
            builder.clock());
1✔
247
    evictionScheduler = new EvictionScheduler(this, builder.executor());
1✔
248

249
    if (debugIndexOps) {
1✔
250
      isIndexExecutor.set(true);
1✔
251
    }
252
    try {
253
      directoryLock = initialize();
1✔
254
    } finally {
255
      if (debugIndexOps) {
1✔
256
        isIndexExecutor.set(false);
1✔
257
      }
258
    }
259
  }
1✔
260

261
  Clock clock() {
262
    return indexWriteScheduler.clock();
1✔
263
  }
264

265
  Delayer delayer() {
266
    return indexWriteScheduler.delayer();
1✔
267
  }
268

269
  private DirectoryLock initialize() throws IOException {
270
    var lock = DirectoryLock.acquire(Files.createDirectories(directory));
1✔
271

272
    long totalSize = 0L;
1✔
273
    long maxLastUsed = -1;
1✔
274
    for (var indexEntry : indexOperator.recoverEntries()) {
1✔
275
      entries.put(indexEntry.hash, new Entry(indexEntry));
1✔
276
      totalSize += indexEntry.size;
1✔
277
      maxLastUsed = Math.max(maxLastUsed, indexEntry.lastUsed);
1✔
278
    }
1✔
279
    size.set(totalSize);
1✔
280
    lruClock.set(maxLastUsed + 1);
1✔
281

282
    // Make sure we start within bounds.
283
    if (totalSize > maxSize) {
1✔
284
      evictionScheduler.schedule();
1✔
285
    }
286
    return lock;
1✔
287
  }
288

289
  public Path directory() {
290
    return directory;
1✔
291
  }
292

293
  @Override
294
  public long maxSize() {
295
    return maxSize;
1✔
296
  }
297

298
  @Override
299
  public Optional<Viewer> view(String key) throws IOException {
300
    requireNonNull(key);
1✔
301
    closeLock.readLock().lock();
1✔
302
    try {
303
      requireNotClosed();
1✔
304
      var entry = entries.get(hasher.hash(key));
1✔
305
      return Optional.ofNullable(entry != null ? entry.view(key) : null);
1✔
306
    } finally {
307
      closeLock.readLock().unlock();
1✔
308
    }
309
  }
310

311
  @Override
312
  public CompletableFuture<Optional<Viewer>> view(String key, Executor executor) {
313
    return Unchecked.supplyAsync(() -> view(key), executor);
1✔
314
  }
315

316
  @Override
317
  public Optional<Editor> edit(String key) throws IOException {
318
    requireNonNull(key);
1✔
319
    closeLock.readLock().lock();
1✔
320
    try {
321
      requireNotClosed();
1✔
322
      return Optional.ofNullable(
1✔
323
          entries.computeIfAbsent(hasher.hash(key), Entry::new).edit(key, Entry.ANY_VERSION));
1✔
324
    } finally {
325
      closeLock.readLock().unlock();
1✔
326
    }
327
  }
328

329
  @Override
330
  public CompletableFuture<Optional<Editor>> edit(String key, Executor executor) {
331
    return Unchecked.supplyAsync(() -> edit(key), executor);
1✔
332
  }
333

334
  @Override
335
  public Iterator<Viewer> iterator() {
336
    return new ConcurrentViewerIterator();
1✔
337
  }
338

339
  @Override
340
  public boolean remove(String key) throws IOException {
341
    requireNonNull(key);
1✔
342
    closeLock.readLock().lock();
1✔
343
    try {
344
      requireNotClosed();
1✔
345
      var entry = entries.get(hasher.hash(key));
1✔
346
      if (entry != null) {
1✔
347
        var versionHolder = new int[1];
1✔
348
        var keyIfKnown = entry.keyIfKnown(versionHolder);
1✔
349
        if (keyIfKnown == null || key.equals(keyIfKnown) || key.equals(entry.currentEditorKey())) {
1✔
350
          return removeEntry(entry, versionHolder[0]);
1✔
351
        }
352
      }
353
      return false;
1✔
354
    } finally {
355
      closeLock.readLock().unlock();
1✔
356
    }
357
  }
358

359
  @Override
360
  public void clear() throws IOException {
361
    closeLock.readLock().lock();
1✔
362
    try {
363
      requireNotClosed();
1✔
364
      for (var entry : entries.values()) {
1✔
365
        removeEntry(entry);
1✔
366
      }
1✔
367
    } finally {
368
      closeLock.readLock().unlock();
1✔
369
    }
370
  }
1✔
371

372
  @Override
373
  public long size() {
374
    return size.get();
1✔
375
  }
376

377
  @Override
378
  public void dispose() throws IOException {
379
    doClose(true);
1✔
380
    size.set(0);
1✔
381
  }
1✔
382

383
  @Override
384
  public void close() throws IOException {
385
    doClose(false);
1✔
386
  }
1✔
387

388
  private void doClose(boolean disposing) throws IOException {
389
    closeLock.writeLock().lock();
1✔
390
    try {
391
      if (closed) {
1✔
392
        return;
1✔
393
      }
394
      closed = true;
1✔
395
    } finally {
396
      closeLock.writeLock().unlock();
1✔
397
    }
398

399
    // Make sure our final index write captures each entry's final state.
400
    entries.values().forEach(Entry::freeze);
1✔
401

402
    try (directoryLock) {
1✔
403
      if (disposing) {
1✔
404
        // Shutdown the scheduler to avoid overlapping an index write with store directory deletion.
405
        indexWriteScheduler.shutdown();
1✔
406
        deleteStoreContent(directory);
1✔
407
      } else {
408
        evictExcessiveEntries();
1✔
409
        indexWriteScheduler.forceSchedule();
1✔
410
        indexWriteScheduler.shutdown();
1✔
411
      }
412
    }
413
    indexExecutor.shutdown();
1✔
414
    evictionScheduler.shutdown();
1✔
415
    entries.clear();
1✔
416
  }
1✔
417

418
  @Override
419
  public void flush() throws IOException {
420
    indexWriteScheduler.forceSchedule();
1✔
421
  }
1✔
422

423
  @Override
424
  public String toString() {
425
    boolean closed;
426
    closeLock.readLock().lock();
1✔
427
    try {
428
      closed = this.closed;
1✔
429
    } finally {
430
      closeLock.readLock().unlock();
1✔
431
    }
432
    return Utils.toStringIdentityPrefix(this)
1✔
433
        + "[directory="
434
        + directory
435
        + ", fileSystem="
436
        + directory.getFileSystem()
1✔
437
        + ", appVersion="
438
        + appVersion
439
        + ", maxSize="
440
        + maxSize
441
        + ", size="
442
        + size
443
        + ", "
444
        + (closed ? "CLOSED" : "OPEN")
1✔
445
        + "]";
446
  }
447

448
  private Set<IndexEntry> indexEntriesSnapshot() {
449
    var snapshot = new HashSet<IndexEntry>();
1✔
450
    for (var entry : entries.values()) {
1✔
451
      var indexEntry = entry.toIndexEntry();
1✔
452
      if (indexEntry != null) {
1✔
453
        snapshot.add(indexEntry);
1✔
454
      }
455
    }
1✔
456
    return Collections.unmodifiableSet(snapshot);
1✔
457
  }
458

459
  @CanIgnoreReturnValue
460
  private boolean removeEntry(Entry entry) throws IOException {
461
    return removeEntry(entry, Entry.ANY_VERSION);
1✔
462
  }
463

464
  /**
465
   * Atomically evicts the given entry and decrements its size, returning {@code true} if the entry
466
   * was evicted by this call.
467
   */
468
  @CanIgnoreReturnValue
469
  private boolean removeEntry(Entry entry, int targetVersion) throws IOException {
470
    long evictedSize = evict(entry, targetVersion);
1✔
471
    if (evictedSize >= 0) {
1✔
472
      size.addAndGet(-evictedSize);
1✔
473
      return true;
1✔
474
    }
475
    return false;
1✔
476
  }
477

478
  /**
479
   * Atomically evicts the given entry if it matches the given version, returning its last committed
480
   * size if evicted by this call or -1 otherwise.
481
   */
482
  private long evict(Entry entry, int targetVersion) throws IOException {
483
    long evictedSize = entry.evict(targetVersion);
1✔
484
    if (evictedSize >= 0) {
1✔
485
      entries.remove(entry.hash, entry);
1✔
486
      indexWriteScheduler.trySchedule();
1✔
487
    }
488
    return evictedSize;
1✔
489
  }
490

491
  /** Attempts to call {@link #evictExcessiveEntries()} if not closed. */
492
  private boolean evictExcessiveEntriesIfOpen() throws IOException {
493
    closeLock.readLock().lock();
1✔
494
    try {
495
      if (closed) {
1✔
496
        return false;
×
497
      }
498
      evictExcessiveEntries();
1✔
499
      return true;
1✔
500
    } finally {
501
      closeLock.readLock().unlock();
1✔
502
    }
503
  }
504

505
  /** Keeps evicting entries in LRU order till the size bound is satisfied. */
506
  private void evictExcessiveEntries() throws IOException {
507
    Iterator<Entry> lruIterator = null;
1✔
508
    for (long currentSize = size.get(); currentSize > maxSize; ) {
1✔
509
      if (lruIterator == null) {
1✔
510
        lruIterator = entriesSnapshotInLruOrder().iterator();
1✔
511
      }
512
      if (!lruIterator.hasNext()) {
1✔
513
        break;
×
514
      }
515

516
      long evictedSize = evict(lruIterator.next(), Entry.ANY_VERSION);
1✔
517
      if (evictedSize >= 0) {
1✔
518
        currentSize = size.addAndGet(-evictedSize);
1✔
519
      } else {
520
        // Get fresh size in case of eviction races.
521
        currentSize = size.get();
×
522
      }
523
    }
1✔
524
  }
1✔
525

526
  private Collection<Entry> entriesSnapshotInLruOrder() {
527
    var lruEntries = new TreeMap<IndexEntry, Entry>(IndexEntry.LRU_ORDER);
1✔
528
    for (var entry : entries.values()) {
1✔
529
      var indexEntry = entry.toIndexEntry();
1✔
530
      if (indexEntry != null) {
1✔
531
        lruEntries.put(indexEntry, entry);
1✔
532
      }
533
    }
1✔
534
    return Collections.unmodifiableCollection(lruEntries.values());
1✔
535
  }
536

537
  private void requireNotClosed() {
538
    assert holdsCloseLock();
1✔
539
    requireState(!closed, "closed");
1✔
540
  }
1✔
541

542
  private boolean holdsCloseLock() {
543
    var lock = (ReentrantReadWriteLock) closeLock;
1✔
544
    return lock.isWriteLocked() || lock.getReadLockCount() > 0;
1✔
545
  }
546

547
  int indexWriteCount() {
548
    requireState(indexOperator instanceof DebugIndexOperator, "not debugging!");
1✔
549
    return ((DebugIndexOperator) indexOperator).writeCount();
1✔
550
  }
551

552
  long lruTime() {
553
    return lruClock.get();
1✔
554
  }
555

556
  @Override
557
  public List<String> entriesOnUnderlyingStorageForTesting(String key) {
558
    var hash = hasher.hash(key);
1✔
559
    var path = directory.resolve(hash.toHexString() + ENTRY_FILE_SUFFIX);
1✔
560
    try (var viewer = new Entry(hash).view(key)) {
1✔
561
      return viewer != null ? List.of(path.toString()) : List.of();
1✔
562
    } catch (IOException e) {
×
563
      throw new UncheckedIOException(e);
×
564
    }
565
  }
566

567
  private static Executor toDebuggingIndexExecutorDelegate(Executor delegate) {
568
    return runnable ->
1✔
569
        delegate.execute(
1✔
570
            () -> {
571
              isIndexExecutor.set(true);
1✔
572
              try {
573
                runnable.run();
1✔
574
              } finally {
575
                isIndexExecutor.set(false);
1✔
576
              }
577
            });
1✔
578
  }
579

580
  private static void checkValue(long expected, long found, String msg)
581
      throws StoreCorruptionException {
582
    if (expected != found) {
1✔
583
      throw new StoreCorruptionException(
1✔
584
          String.format("%s; expected: %#x, found: %#x", msg, expected, found));
1✔
585
    }
586
  }
1✔
587

588
  private static void checkValue(boolean valueIsValid, String msg, long value)
589
      throws StoreCorruptionException {
590
    if (!valueIsValid) {
1✔
591
      throw new StoreCorruptionException(String.format("%s: %d", msg, value));
×
592
    }
593
  }
1✔
594

595
  private static int getNonNegativeInt(ByteBuffer buffer) throws StoreCorruptionException {
596
    int value = buffer.getInt();
1✔
597
    checkValue(value >= 0, "Expected a value >= 0", value);
1✔
598
    return value;
1✔
599
  }
600

601
  private static long getNonNegativeLong(ByteBuffer buffer) throws StoreCorruptionException {
602
    long value = buffer.getLong();
1✔
603
    checkValue(value >= 0, "Expected a value >= 0", value);
1✔
604
    return value;
1✔
605
  }
606

607
  private static long getPositiveLong(ByteBuffer buffer) throws StoreCorruptionException {
608
    long value = buffer.getLong();
1✔
609
    checkValue(value > 0, "Expected a positive value", value);
1✔
610
    return value;
1✔
611
  }
612

613
  private static @Nullable Hash entryFileToHash(String filename) {
614
    assert filename.endsWith(ENTRY_FILE_SUFFIX) || filename.endsWith(TEMP_ENTRY_FILE_SUFFIX);
1✔
615
    int suffixLength =
616
        filename.endsWith(ENTRY_FILE_SUFFIX)
1✔
617
            ? ENTRY_FILE_SUFFIX.length()
1✔
618
            : TEMP_ENTRY_FILE_SUFFIX.length();
1✔
619
    return Hash.tryParse(filename.substring(0, filename.length() - suffixLength));
1✔
620
  }
621

622
  private static void replace(Path source, Path target) throws IOException {
623
    Files.move(source, target, ATOMIC_MOVE, REPLACE_EXISTING);
1✔
624
  }
1✔
625

626
  private static void deleteStoreContent(Path directory) throws IOException {
627
    // Retain the lock file as we're still using the directory.
628
    var lockFile = directory.resolve(LOCK_FILENAME);
1✔
629
    try (var stream = Files.newDirectoryStream(directory, file -> !file.equals(lockFile))) {
1✔
630
      for (var file : stream) {
1✔
631
        safeDeleteIfExists(file);
1✔
632
      }
1✔
633
    } catch (DirectoryIteratorException e) {
×
634
      throw e.getCause();
×
635
    }
1✔
636
  }
1✔
637

638
  /**
639
   * Deletes the given file in isolation from its original name. This is done by randomly renaming
640
   * it beforehand.
641
   *
642
   * <p>Typically, Windows denys access to names of files deleted while having open handles (these
643
   * are deletable when opened with FILE_SHARE_DELETE, which is NIO's case). The reason seems to be
644
   * that 'deletion' in such case merely tags the file for physical deletion when all open handles
645
   * are closed. However, it appears that handles in Windows are associated with the names of files
646
   * they're opened for (<a
647
   * href="https://devblogs.microsoft.com/oldnewthing/20040607-00/?p=38993">check this blog</a>).
648
   *
649
   * <p>This causes problems when an entry is deleted while being viewed. We're prevented from using
650
   * that entry's file name in case it's recreated (i.e. by committing an edit) while at least one
651
   * viewer is still open. The solution is to randomly rename these files before deletion, so the OS
652
   * associates any open handles with that random name instead. The original name is reusable
653
   * thereafter.
654
   */
655
  private static void isolatedDeleteIfExists(Path file) throws IOException {
656
    try {
657
      Files.deleteIfExists(isolate(file));
1✔
658
    } catch (NoSuchFileException ignored) {
1✔
659
      // This can be thrown by isolate(Path), meaning the file is already gone!
660
    }
1✔
661
  }
1✔
662

663
  private static Path isolate(Path file) throws IOException {
664
    while (true) {
665
      var randomFilename =
666
          ISOLATED_FILE_PREFIX + Long.toHexString(ThreadLocalRandom.current().nextLong());
1✔
667
      try {
668
        return Files.move(file, file.resolveSibling(randomFilename), ATOMIC_MOVE);
1✔
669
      } catch (FileAlreadyExistsException | AccessDeniedException filenameAlreadyInUse) {
×
670
        // We can then try again with a new random name. Note that an AccessDeniedException is
671
        // thrown on a name clash with another 'isolated' file that still has open handles.
672
      }
673
    }
×
674
  }
675

676
  /**
677
   * Deletes the given file with {@link DiskStore#isolatedDeleteIfExists(Path)} if it's an entry
678
   * file (and thus may have open handles), otherwise deletes it with {@link
679
   * Files#deleteIfExists(Path)}.
680
   */
681
  private static void safeDeleteIfExists(Path file) throws IOException {
682
    var filenameComponent = file.getFileName();
1✔
683
    var filename = filenameComponent != null ? filenameComponent.toString() : "";
1✔
684
    if (filename.endsWith(ENTRY_FILE_SUFFIX)) {
1✔
685
      isolatedDeleteIfExists(file);
1✔
686
    } else if (filename.startsWith(ISOLATED_FILE_PREFIX)) {
1✔
687
      try {
688
        Files.deleteIfExists(file);
×
689
      } catch (AccessDeniedException ignored) {
×
690
        // An isolated file can be awaiting deletion if it has open handles. In this case, an
691
        // AccessDeniedException is always thrown on Windows, so there's nothing we can do.
692
      }
×
693
    } else {
694
      Files.deleteIfExists(file);
1✔
695
    }
696
  }
1✔
697

698
  private static void closeQuietly(Closeable closeable) {
699
    try {
700
      closeable.close();
1✔
701
    } catch (IOException e) {
×
702
      logger.log(Level.WARNING, "Exception thrown when closing: " + closeable, e);
×
703
    }
1✔
704
  }
1✔
705

706
  private static void deleteIfExistsQuietly(Path path) {
707
    try {
708
      Files.deleteIfExists(path);
1✔
709
    } catch (IOException e) {
×
710
      logger.log(Level.WARNING, "Exception thrown when deleting: " + path, e);
×
711
    }
1✔
712
  }
1✔
713

714
  private static boolean keyMismatches(
715
      @Nullable String keyIfKnown, @Nullable String expectedKeyIfKnown) {
716
    return keyIfKnown != null
1✔
717
        && expectedKeyIfKnown != null
718
        && !keyIfKnown.equals(expectedKeyIfKnown);
1✔
719
  }
720

721
  public static Builder newBuilder() {
722
    return new Builder();
1✔
723
  }
724

725
  private static final class FileChannelCloseable implements Closeable {
726
    private final FileChannel channel;
727
    private boolean close = true;
1✔
728

729
    FileChannelCloseable(FileChannel channel) {
1✔
730
      this.channel = channel;
1✔
731
    }
1✔
732

733
    FileChannel channel() {
734
      return channel;
1✔
735
    }
736

737
    void keepOpen() {
738
      close = false;
1✔
739
    }
1✔
740

741
    @Override
742
    public void close() throws IOException {
743
      if (close) {
1✔
744
        channel.close();
1✔
745
      }
746
    }
1✔
747
  }
748

749
  private final class ConcurrentViewerIterator implements Iterator<Viewer> {
750
    private final Iterator<Entry> entryIterator = entries.values().iterator();
1✔
751

752
    private @Nullable Viewer nextViewer;
753
    private @Nullable Viewer currentViewer;
754

755
    ConcurrentViewerIterator() {}
1✔
756

757
    @Override
758
    @EnsuresNonNullIf(expression = "nextViewer", result = true)
759
    public boolean hasNext() {
760
      return nextViewer != null || findNext();
1✔
761
    }
762

763
    @Override
764
    public Viewer next() {
765
      if (!hasNext()) {
1✔
766
        throw new NoSuchElementException();
1✔
767
      }
768
      var viewer = castNonNull(nextViewer);
1✔
769
      nextViewer = null;
1✔
770
      currentViewer = viewer;
1✔
771
      return viewer;
1✔
772
    }
773

774
    @Override
775
    public void remove() {
776
      var viewer = currentViewer;
1✔
777
      requireState(viewer != null, "next() must be called before remove()");
1✔
778
      currentViewer = null;
1✔
779
      try {
780
        castNonNull(viewer).removeEntry();
1✔
781
      } catch (IOException e) {
×
782
        logger.log(Level.WARNING, "Exception thrown when removing entry", e);
×
783
      }
1✔
784
    }
1✔
785

786
    @EnsuresNonNullIf(expression = "nextViewer", result = true)
787
    private boolean findNext() {
788
      while (entryIterator.hasNext()) {
1✔
789
        var entry = entryIterator.next();
1✔
790
        try {
791
          var viewer = view(entry);
1✔
792
          if (viewer != null) {
1✔
793
            nextViewer = viewer;
1✔
794
            return true;
1✔
795
          }
796
        } catch (IOException e) {
×
797
          // Try next entry.
798
          logger.log(Level.WARNING, "Exception thrown when iterating over entries", e);
×
799
        } catch (IllegalStateException e) {
1✔
800
          // Handle closure gracefully by ending iteration.
801
          return false;
1✔
802
        }
1✔
803
      }
1✔
804
      return false;
1✔
805
    }
806

807
    private @Nullable Viewer view(Entry entry) throws IOException {
808
      closeLock.readLock().lock();
1✔
809
      try {
810
        requireState(!closed, "Closed");
1✔
811
        return entry.view(null);
1✔
812
      } finally {
813
        closeLock.readLock().unlock();
1✔
814
      }
815
    }
816
  }
817

818
  private static final class Sha256MessageDigestFactory {
819
    private static final @Nullable MessageDigest TEMPLATE = lookupTemplateIfCloneable();
1✔
820

821
    private Sha256MessageDigestFactory() {}
822

823
    static MessageDigest create() {
824
      if (TEMPLATE == null) {
1✔
825
        return lookup();
×
826
      }
827
      try {
828
        return (MessageDigest) TEMPLATE.clone();
1✔
829
      } catch (CloneNotSupportedException e) {
×
830
        throw new AssertionError(e);
×
831
      }
832
    }
833

834
    private static @Nullable MessageDigest lookupTemplateIfCloneable() {
835
      try {
836
        return (MessageDigest) lookup().clone();
1✔
837
      } catch (CloneNotSupportedException ignored) {
×
838
        return null;
×
839
      }
840
    }
841

842
    private static MessageDigest lookup() {
843
      try {
844
        return MessageDigest.getInstance("SHA-256");
1✔
845
      } catch (NoSuchAlgorithmException e) {
×
846
        throw new UnsupportedOperationException("SHA-256 not available!", e);
×
847
      }
848
    }
849
  }
850

851
  /** A function that computes an 80-bit hash from a string key. */
852
  @FunctionalInterface
853
  public interface Hasher {
854
    /** A Hasher returning the first 80 bits of the SHA-256 of the key's UTF-8 encoded bytes. */
855
    Hasher TRUNCATED_SHA_256 = Hasher::truncatedSha256Hash;
1✔
856

857
    Hash hash(String key);
858

859
    private static Hash truncatedSha256Hash(String key) {
860
      return new Hash(
1✔
861
          ByteBuffer.wrap(Sha256MessageDigestFactory.create().digest(key.getBytes(UTF_8)))
1✔
862
              .limit(Hash.BYTES));
1✔
863
    }
864
  }
865

866
  private static class IndexOperator {
867
    private final Path directory;
868
    private final Path indexFile;
869
    private final Path tempIndexFile;
870
    private final int appVersion;
871

872
    IndexOperator(Path directory, int appVersion) {
1✔
873
      this.directory = directory;
1✔
874
      this.indexFile = directory.resolve(INDEX_FILENAME);
1✔
875
      this.tempIndexFile = directory.resolve(TEMP_INDEX_FILENAME);
1✔
876
      this.appVersion = appVersion;
1✔
877
    }
1✔
878

879
    Set<IndexEntry> recoverEntries() throws IOException {
880
      var diskEntries = scanDirectoryForEntries();
1✔
881
      var indexEntries = readOrCreateIndex();
1✔
882
      var retainedIndexEntries = new HashSet<IndexEntry>(indexEntries.size());
1✔
883
      var filesToDelete = new HashSet<Path>();
1✔
884
      for (var entry : indexEntries) {
1✔
885
        var entryFiles = diskEntries.get(entry.hash);
1✔
886
        if (entryFiles != null) {
1✔
887
          if (entryFiles.cleanFile != null) {
1✔
888
            retainedIndexEntries.add(entry);
1✔
889
          }
890
          if (entryFiles.dirtyFile != null) {
1✔
891
            // Delete trails of unsuccessful edits.
892
            filesToDelete.add(entryFiles.dirtyFile);
1✔
893
          }
894
        }
895
      }
1✔
896

897
      // Delete entries found on disk but not referenced by the index.
898
      // TODO consider trying to recover these entries.
899
      if (retainedIndexEntries.size() != diskEntries.size()) {
1✔
900
        var untrackedEntries = new HashMap<>(diskEntries);
1✔
901
        retainedIndexEntries.forEach(entries -> untrackedEntries.remove(entries.hash));
1✔
902
        for (var entryFiles : untrackedEntries.values()) {
1✔
903
          if (entryFiles.cleanFile != null) {
1✔
904
            filesToDelete.add(entryFiles.cleanFile);
1✔
905
          }
906
          if (entryFiles.dirtyFile != null) {
1✔
907
            filesToDelete.add(entryFiles.dirtyFile);
1✔
908
          }
909
        }
1✔
910
      }
911

912
      for (var file : filesToDelete) {
1✔
913
        safeDeleteIfExists(file);
1✔
914
      }
1✔
915
      return Collections.unmodifiableSet(retainedIndexEntries);
1✔
916
    }
917

918
    private Set<IndexEntry> readOrCreateIndex() throws IOException {
919
      try {
920
        return readIndex();
1✔
921
      } catch (NoSuchFileException e) {
1✔
922
        return Set.of();
1✔
923
      } catch (StoreCorruptionException | EOFException e) {
1✔
924
        // TODO consider trying to rebuild the index from a directory scan instead.
925
        logger.log(Level.WARNING, "Dropping store content due to an unreadable index", e);
1✔
926
        deleteStoreContent(directory);
1✔
927
        return Set.of();
1✔
928
      }
929
    }
930

931
    Set<IndexEntry> readIndex() throws IOException {
932
      try (var channel = FileChannel.open(indexFile, READ)) {
1✔
933
        var header = FileIO.read(channel, INDEX_HEADER_SIZE);
1✔
934
        checkValue(INDEX_MAGIC, header.getLong(), "Not in index format");
1✔
935
        checkValue(STORE_VERSION, header.getInt(), "Unexpected store version");
1✔
936
        checkValue(appVersion, header.getInt(), "Unexpected app version");
1✔
937

938
        long entryCount = header.getLong();
1✔
939
        checkValue(
1✔
940
            entryCount >= 0 && entryCount <= MAX_ENTRY_COUNT, "Invalid entry count", entryCount);
941
        if (entryCount == 0) {
1✔
942
          return Set.of();
1✔
943
        }
944

945
        int intEntryCount = (int) entryCount;
1✔
946
        int entryTableSize = intEntryCount * INDEX_ENTRY_SIZE;
1✔
947
        var entryTable = FileIO.read(channel, entryTableSize);
1✔
948
        var entries = new HashSet<IndexEntry>(intEntryCount);
1✔
949
        for (int i = 0; i < intEntryCount; i++) {
1✔
950
          entries.add(new IndexEntry(entryTable));
1✔
951
        }
952
        return Collections.unmodifiableSet(entries);
1✔
953
      }
1✔
954
    }
955

956
    void writeIndex(Set<IndexEntry> entries) throws IOException {
957
      requireArgument(entries.size() <= MAX_ENTRY_COUNT, "Too many entries");
1✔
958
      try (var channel = FileChannel.open(tempIndexFile, CREATE, WRITE, TRUNCATE_EXISTING)) {
1✔
959
        var index =
1✔
960
            ByteBuffer.allocate(INDEX_HEADER_SIZE + INDEX_ENTRY_SIZE * entries.size())
1✔
961
                .putLong(INDEX_MAGIC)
1✔
962
                .putInt(STORE_VERSION)
1✔
963
                .putInt(appVersion)
1✔
964
                .putLong(entries.size());
1✔
965
        entries.forEach(entry -> entry.writeTo(index));
1✔
966
        FileIO.write(channel, index.flip());
1✔
967
        channel.force(false);
1✔
968
      }
969
      replace(tempIndexFile, indexFile);
1✔
970
    }
1✔
971

972
    private Map<Hash, EntryFiles> scanDirectoryForEntries() throws IOException {
973
      var diskEntries = new HashMap<Hash, EntryFiles>();
1✔
974
      try (var stream = Files.newDirectoryStream(directory)) {
1✔
975
        for (var file : stream) {
1✔
976
          var filenameComponent = file.getFileName();
1✔
977
          var filename = filenameComponent != null ? filenameComponent.toString() : "";
1✔
978
          if (filename.equals(INDEX_FILENAME)
1✔
979
              || filename.equals(TEMP_INDEX_FILENAME)
1✔
980
              || filename.equals(LOCK_FILENAME)) {
1✔
981
            // Skip non-entry files.
982
            continue;
1✔
983
          }
984

985
          Hash hash;
986
          if ((filename.endsWith(ENTRY_FILE_SUFFIX) || filename.endsWith(TEMP_ENTRY_FILE_SUFFIX))
1✔
987
              && (hash = entryFileToHash(filename)) != null) {
1✔
988
            var files = diskEntries.computeIfAbsent(hash, __ -> new EntryFiles());
1✔
989
            if (filename.endsWith(ENTRY_FILE_SUFFIX)) {
1✔
990
              files.cleanFile = file;
1✔
991
            } else {
992
              files.dirtyFile = file;
1✔
993
            }
994
          } else if (filename.startsWith(ISOLATED_FILE_PREFIX)) {
1✔
995
            // Clean trails of isolatedDeleteIfExists in case it failed in a previous session.
996
            safeDeleteIfExists(file);
×
997
          } else {
998
            logger.log(
×
999
                Level.WARNING,
1000
                "Unrecognized file or directory found during initialization <"
1001
                    + file
1002
                    + ">. "
1003
                    + System.lineSeparator()
×
1004
                    + "It is generally not a good idea to let the store directory be used by other entities.");
1005
          }
1006
        }
1✔
1007
      }
1008
      return diskEntries;
1✔
1009
    }
1010

1011
    /** Entry related files found by a directory scan. */
1012
    private static final class EntryFiles {
1013
      @MonotonicNonNull Path cleanFile;
1014
      @MonotonicNonNull Path dirtyFile;
1015

1016
      EntryFiles() {}
1✔
1017
    }
1018
  }
1019

1020
  private static final class DebugIndexOperator extends IndexOperator {
1021
    private static final VarHandle RUNNING_OPERATION;
1022

1023
    static {
1024
      try {
1025
        RUNNING_OPERATION =
1026
            MethodHandles.lookup()
1✔
1027
                .findVarHandle(DebugIndexOperator.class, "runningOperation", String.class);
1✔
1028
      } catch (NoSuchFieldException | IllegalAccessException e) {
×
1029
        throw new ExceptionInInitializerError(e);
×
1030
      }
1✔
1031
    }
1✔
1032

1033
    private final AtomicInteger writeCount = new AtomicInteger(0);
1✔
1034

1035
    @SuppressWarnings({"FieldCanBeLocal", "UnusedVariable"}) // VarHandle indirection.
1036
    private @Nullable String runningOperation;
1037

1038
    DebugIndexOperator(Path directory, int appVersion) {
1039
      super(directory, appVersion);
1✔
1040
    }
1✔
1041

1042
    @Override
1043
    Set<IndexEntry> readIndex() throws IOException {
1044
      enter("readIndex");
1✔
1045
      try {
1046
        return super.readIndex();
1✔
1047
      } finally {
1048
        exit();
1✔
1049
      }
1050
    }
1051

1052
    @Override
1053
    void writeIndex(Set<IndexEntry> entries) throws IOException {
1054
      enter("writeIndex");
1✔
1055
      try {
1056
        super.writeIndex(entries);
1✔
1057
        writeCount.incrementAndGet();
1✔
1058
      } finally {
1059
        exit();
1✔
1060
      }
1061
    }
1✔
1062

1063
    private void enter(String operation) {
1064
      if (!isIndexExecutor.get()) {
1✔
1065
        logger.log(
×
1066
            Level.ERROR,
1067
            () -> "IndexOperator::" + operation + " isn't called by the index executor");
×
1068
      }
1069

1070
      var currentOperation = RUNNING_OPERATION.compareAndExchange(this, null, operation);
1✔
1071
      if (currentOperation != null) {
1✔
1072
        logger.log(
×
1073
            Level.ERROR,
1074
            () ->
1075
                "IndexOperator::"
×
1076
                    + operation
1077
                    + " is called while IndexOperator::"
1078
                    + currentOperation
1079
                    + " is running");
1080
      }
1081
    }
1✔
1082

1083
    private void exit() {
1084
      runningOperation = null;
1✔
1085
    }
1✔
1086

1087
    int writeCount() {
1088
      return writeCount.get();
1✔
1089
    }
1090
  }
1091

1092
  /**
1093
   * A time-limited scheduler for index writes that arranges no more than 1 write per the specified
1094
   * time period.
1095
   */
1096
  private static final class IndexWriteScheduler {
1097
    private static final VarHandle SCHEDULED_WRITE_TASK;
1098

1099
    static {
1100
      try {
1101
        SCHEDULED_WRITE_TASK =
1102
            MethodHandles.lookup()
1✔
1103
                .findVarHandle(IndexWriteScheduler.class, "scheduledWriteTask", WriteTask.class);
1✔
1104
      } catch (NoSuchFieldException | IllegalAccessException e) {
×
1105
        throw new ExceptionInInitializerError(e);
×
1106
      }
1✔
1107
    }
1108

1109
    /** Terminal marker that is set when no more writes are to be scheduled. */
1110
    private static final WriteTask TOMBSTONE =
1✔
1111
        new WriteTask() {
1✔
1112
          @Override
1113
          Instant fireTime() {
1114
            return Instant.MIN;
×
1115
          }
1116

1117
          @Override
1118
          void cancel() {}
×
1119
        };
1120

1121
    private final IndexOperator indexOperator;
1122
    private final Executor indexExecutor;
1123
    private final Supplier<Set<IndexEntry>> indexEntriesSnapshotSupplier;
1124
    private final Duration period;
1125
    private final Delayer delayer;
1126
    private final Clock clock;
1127
    private @MonotonicNonNull WriteTask scheduledWriteTask;
1128

1129
    /**
1130
     * A barrier for shutdowns to await the currently running task. Scheduled WriteTasks have the
1131
     * following transitions:
1132
     *
1133
     * <pre>{@code
1134
     * T1 -> T2 -> .... -> Tn
1135
     * }</pre>
1136
     *
1137
     * Where Tn is the currently scheduled and hence the only referenced task, and the time between
1138
     * two consecutive Ts is generally the specified period, or less if there are immediate flushes
1139
     * (note that Ts don't overlap since the executor is serialized). Ensuring no Ts are running
1140
     * after shutdown entails awaiting the currently running task (if any) to finish then preventing
1141
     * ones following it from starting. If the update delay is small enough, or if the executor
1142
     * and/or the system-wide scheduler are busy, the currently running task might be lagging behind
1143
     * Tn by multiple Ts, so it's not ideal to somehow keep a reference to it in order to await it
1144
     * when needed. This Phaser solves this issue by having the currently running T to register
1145
     * itself then arriveAndDeregister when finished. During shutdown, the scheduler de-registers
1146
     * from, then attempts to await, the phaser, where it is only awaited if there is still one
1147
     * registered party (a running T). When registerers reach 0, the phaser is terminated,
1148
     * preventing yet to arrive tasks from registering, so they can choose not to run.
1149
     */
1150
    private final Phaser runningTaskAwaiter = new Phaser(1); // Register self.
1✔
1151

1152
    IndexWriteScheduler(
1153
        IndexOperator indexOperator,
1154
        Executor indexExecutor,
1155
        Supplier<Set<IndexEntry>> indexEntriesSnapshotSupplier,
1156
        Duration period,
1157
        Delayer delayer,
1158
        Clock clock) {
1✔
1159
      this.indexOperator = indexOperator;
1✔
1160
      this.indexExecutor = indexExecutor;
1✔
1161
      this.indexEntriesSnapshotSupplier = indexEntriesSnapshotSupplier;
1✔
1162
      this.period = period;
1✔
1163
      this.delayer = delayer;
1✔
1164
      this.clock = clock;
1✔
1165
    }
1✔
1166

1167
    Clock clock() {
1168
      return clock;
1✔
1169
    }
1170

1171
    Delayer delayer() {
1172
      return delayer;
1✔
1173
    }
1174

1175
    @SuppressWarnings("FutureReturnValueIgnored")
1176
    void trySchedule() {
1177
      // Decide whether to schedule and when as follows:
1178
      //   - If TOMBSTONE is set, don't schedule anything.
1179
      //   - If scheduledWriteTask is null, then this is the first call, so schedule immediately.
1180
      //   - If scheduledWriteTask is set to run in the future, then it'll see the changes made so
1181
      //     far and there's no need to schedule.
1182
      //   - If less than INDEX_UPDATE_DELAY time has passed since the last write, then schedule the
1183
      //     writer to run when the period evaluates from the last write.
1184
      //   - Otherwise, a timeslot is available, so schedule immediately.
1185
      //
1186
      // This is retried in case of contention.
1187

1188
      var now = clock.instant();
1✔
1189
      while (true) {
1190
        var currentTask = scheduledWriteTask;
1✔
1191
        var nextFireTime = currentTask != null ? currentTask.fireTime() : null;
1✔
1192
        Duration delay;
1193
        if (nextFireTime == null) {
1✔
1194
          delay = Duration.ZERO;
1✔
1195
        } else if (currentTask == TOMBSTONE || nextFireTime.isAfter(now)) {
1✔
1196
          return; // No writes are needed.
1✔
1197
        } else {
1198
          var idleness = Duration.between(nextFireTime, now);
1✔
1199
          delay = Compare.max(period.minus(idleness), Duration.ZERO);
1✔
1200
        }
1201

1202
        var newTask = new RunnableWriteTask(now.plus(delay));
1✔
1203
        if (SCHEDULED_WRITE_TASK.compareAndSet(this, currentTask, newTask)) {
1✔
1204
          delayer.delay(newTask::runUnchecked, delay, indexExecutor);
1✔
1205
          return;
1✔
1206
        }
1207
      }
×
1208
    }
1209

1210
    /** Forcibly submits an index write to the index executor, ignoring the time rate. */
1211
    void forceSchedule() throws IOException {
1212
      Utils.getIo(forceScheduleAsync());
1✔
1213
    }
1✔
1214

1215
    private CompletableFuture<Void> forceScheduleAsync() {
1216
      var now = clock.instant();
1✔
1217
      while (true) {
1218
        var currentTask = scheduledWriteTask;
1✔
1219
        requireState(currentTask != TOMBSTONE, "Shutdown");
1✔
1220

1221
        var newTask = new RunnableWriteTask(now);
1✔
1222
        if (SCHEDULED_WRITE_TASK.compareAndSet(this, currentTask, newTask)) {
1✔
1223
          if (currentTask != null) {
1✔
1224
            currentTask.cancel();
1✔
1225
          }
1226
          return Unchecked.runAsync(newTask, indexExecutor);
1✔
1227
        }
1228
      }
×
1229
    }
1230

1231
    void shutdown() throws InterruptedIOException {
1232
      scheduledWriteTask = TOMBSTONE;
1✔
1233
      try {
1234
        runningTaskAwaiter.awaitAdvanceInterruptibly(runningTaskAwaiter.arriveAndDeregister());
1✔
1235
        assert runningTaskAwaiter.isTerminated();
1✔
1236
      } catch (InterruptedException e) {
×
1237
        throw Utils.toInterruptedIOException(e);
×
1238
      }
1✔
1239
    }
1✔
1240

1241
    private abstract static class WriteTask {
1242
      abstract Instant fireTime();
1243

1244
      abstract void cancel();
1245
    }
1246

1247
    private final class RunnableWriteTask extends WriteTask implements ThrowingRunnable {
1248
      private final Instant fireTime;
1249
      private volatile boolean cancelled;
1250

1251
      RunnableWriteTask(Instant fireTime) {
1✔
1252
        this.fireTime = fireTime;
1✔
1253
      }
1✔
1254

1255
      @Override
1256
      Instant fireTime() {
1257
        return fireTime;
1✔
1258
      }
1259

1260
      @Override
1261
      void cancel() {
1262
        cancelled = true;
1✔
1263
      }
1✔
1264

1265
      @Override
1266
      public void run() throws IOException {
1267
        if (!cancelled && runningTaskAwaiter.register() >= 0) {
1✔
1268
          try {
1269
            indexOperator.writeIndex(indexEntriesSnapshotSupplier.get());
1✔
1270
          } finally {
1271
            runningTaskAwaiter.arriveAndDeregister();
1✔
1272
          }
1273
        }
1274
      }
1✔
1275

1276
      void runUnchecked() {
1277
        // TODO consider disabling the store if failure happens too often.
1278
        try {
1279
          run();
1✔
1280
        } catch (IOException e) {
×
1281
          logger.log(Level.ERROR, "Exception thrown when writing the index", e);
×
1282
        }
1✔
1283
      }
1✔
1284
    }
1285
  }
1286

1287
  /** Schedules eviction tasks on demand while ensuring they're run sequentially. */
1288
  private static final class EvictionScheduler {
1289
    private static final int RUN = 1;
1290
    private static final int KEEP_ALIVE = 2;
1291
    private static final int SHUTDOWN = 4;
1292

1293
    private static final VarHandle SYNC;
1294

1295
    static {
1296
      try {
1297
        var lookup = MethodHandles.lookup();
1✔
1298
        SYNC = lookup.findVarHandle(EvictionScheduler.class, "sync", int.class);
1✔
1299
      } catch (NoSuchFieldException | IllegalAccessException e) {
×
1300
        throw new ExceptionInInitializerError(e);
×
1301
      }
1✔
1302
    }
1✔
1303

1304
    private final DiskStore store;
1305
    private final Executor executor;
1306

1307
    @SuppressWarnings("unused") // VarHandle indirection.
1308
    private volatile int sync;
1309

1310
    EvictionScheduler(DiskStore store, Executor executor) {
1✔
1311
      this.store = store;
1✔
1312
      this.executor = executor;
1✔
1313
    }
1✔
1314

1315
    void schedule() {
1316
      for (int s; ((s = sync) & SHUTDOWN) == 0; ) {
1✔
1317
        int bit = (s & RUN) == 0 ? RUN : KEEP_ALIVE; // Run or keep-alive.
1✔
1318
        if (SYNC.compareAndSet(this, s, (s | bit))) {
1✔
1319
          if (bit == RUN) {
1✔
1320
            executor.execute(this::runEviction);
1✔
1321
          }
1322
          break;
1323
        }
1324
      }
×
1325
    }
1✔
1326

1327
    private void runEviction() {
1328
      for (int s; ((s = sync) & SHUTDOWN) == 0; ) {
1✔
1329
        try {
1330
          if (!store.evictExcessiveEntriesIfOpen()) {
1✔
1331
            // Ignore eviction, the store ensures it's closed within bounds.
1332
            break;
×
1333
          }
1334
        } catch (IOException e) {
×
1335
          logger.log(Level.ERROR, "Exception thrown when evicting entries in background", e);
×
1336
        }
1✔
1337

1338
        // Exit or consume keep-alive bit.
1339
        int bit = (s & KEEP_ALIVE) != 0 ? KEEP_ALIVE : RUN;
1✔
1340
        if (SYNC.compareAndSet(this, s, s & ~bit) && bit == RUN) {
1✔
1341
          break;
1✔
1342
        }
1343
      }
1✔
1344
    }
1✔
1345

1346
    void shutdown() {
1347
      SYNC.getAndBitwiseOr(this, SHUTDOWN);
1✔
1348
    }
1✔
1349
  }
1350

1351
  /**
1352
   * A lock on the store directory that ensures it's operated upon by a single DiskStore instance in
1353
   * a single JVM process. This only works in a cooperative manner; it doesn't prevent other
1354
   * entities from using the directory.
1355
   */
1356
  private static final class DirectoryLock implements AutoCloseable {
1357
    private final Path lockFile;
1358
    private final FileChannel channel;
1359

1360
    private DirectoryLock(Path lockFile, FileChannel channel) {
1✔
1361
      this.lockFile = lockFile;
1✔
1362
      this.channel = channel;
1✔
1363
    }
1✔
1364

1365
    @Override
1366
    public void close() {
1367
      deleteIfExistsQuietly(lockFile);
1✔
1368
      closeQuietly(channel); // Closing the channel releases the lock.
1✔
1369
    }
1✔
1370

1371
    static DirectoryLock acquire(Path directory) throws IOException {
1372
      var lockFile = directory.resolve(LOCK_FILENAME);
1✔
1373
      try (var closeable =
1✔
1374
          new FileChannelCloseable(FileChannel.open(lockFile, READ, WRITE, CREATE))) {
1✔
1375
        var channel = closeable.channel();
1✔
1376
        var fileLock = channel.tryLock();
1✔
1377
        if (fileLock == null) {
1✔
1378
          throw new IOException("Store directory <" + directory + "> already in use");
×
1379
        }
1380
        var lock = new DirectoryLock(lockFile, channel);
1✔
1381
        closeable.keepOpen();
1✔
1382
        return lock;
1✔
1383
      } catch (IOException e) {
×
1384
        deleteIfExistsQuietly(lockFile);
×
1385
        throw e;
×
1386
      }
1387
    }
1388
  }
1389

1390
  /** An immutable 80-bit hash code. */
1391
  public static final class Hash {
1✔
1392
    static final int BYTES = 10;
1393
    static final int HEX_STRING_LENGTH = 2 * BYTES;
1394

1395
    // Upper 64 bits + lower 16 bits in big-endian order.
1396
    private final long upper64Bits;
1397
    private final short lower16Bits;
1398

1399
    private @MonotonicNonNull String lazyHex;
1400

1401
    public Hash(ByteBuffer buffer) {
1402
      this(buffer.getLong(), buffer.getShort());
1✔
1403
    }
1✔
1404

1405
    Hash(long upper64Bits, short lower16Bits) {
1✔
1406
      this.upper64Bits = upper64Bits;
1✔
1407
      this.lower16Bits = lower16Bits;
1✔
1408
    }
1✔
1409

1410
    void writeTo(ByteBuffer buffer) {
1411
      buffer.putLong(upper64Bits);
1✔
1412
      buffer.putShort(lower16Bits);
1✔
1413
    }
1✔
1414

1415
    String toHexString() {
1416
      var hex = lazyHex;
1✔
1417
      if (hex == null) {
1✔
1418
        hex =
1✔
1419
            toPaddedHexString(upper64Bits, Long.BYTES)
1✔
1420
                + toPaddedHexString(lower16Bits & SHORT_MASK, Short.BYTES);
1✔
1421
        lazyHex = hex;
1✔
1422
      }
1423
      return hex;
1✔
1424
    }
1425

1426
    @Override
1427
    public int hashCode() {
1428
      return Long.hashCode(upper64Bits) ^ Short.hashCode(lower16Bits);
1✔
1429
    }
1430

1431
    @Override
1432
    public boolean equals(@Nullable Object obj) {
1433
      if (obj == this) {
1✔
1434
        return true;
×
1435
      }
1436
      if (!(obj instanceof Hash)) {
1✔
1437
        return false;
×
1438
      }
1439
      var other = (Hash) obj;
1✔
1440
      return upper64Bits == other.upper64Bits && lower16Bits == other.lower16Bits;
1✔
1441
    }
1442

1443
    @Override
1444
    public String toString() {
1445
      return toHexString();
×
1446
    }
1447

1448
    static @Nullable Hash tryParse(String hex) {
1449
      if (hex.length() != HEX_STRING_LENGTH) {
1✔
1450
        return null;
×
1451
      }
1452
      try {
1453
        // There's no Short.parseShort that accepts a CharSequence region, so use downcast result of
1454
        // Integer.parseInt. This will certainly fit in a short since exactly 32 hex characters are
1455
        // parsed, and we don't care about the sign.
1456
        return new Hash(
1✔
1457
            Long.parseUnsignedLong(hex, 0, Long.BYTES << 1, 16),
1✔
1458
            (short) Integer.parseInt(hex, Long.BYTES << 1, hex.length(), 16));
1✔
1459
      } catch (NumberFormatException ignored) {
×
1460
        return null;
×
1461
      }
1462
    }
1463

1464
    private static String toPaddedHexString(long value, int size) {
1465
      var hex = Long.toHexString(value);
1✔
1466
      int padding = (size << 1) - hex.length();
1✔
1467
      assert padding >= 0;
1✔
1468
      if (padding > 0) {
1✔
1469
        hex = "0".repeat(padding) + hex;
1✔
1470
      }
1471
      return hex;
1✔
1472
    }
1473
  }
1474

1475
  private static final class IndexEntry {
1476
    /**
1477
     * A comparator that defines LRU eviction order. It is assumed that there can be no ties based
1478
     * on latest usage time.
1479
     */
1480
    static final Comparator<IndexEntry> LRU_ORDER =
1✔
1481
        Comparator.comparingLong(entry -> entry.lastUsed);
1✔
1482

1483
    final Hash hash;
1484
    final long lastUsed;
1485
    final long size;
1486

1487
    IndexEntry(Hash hash, long lastUsed, long size) {
1✔
1488
      this.hash = hash;
1✔
1489
      this.lastUsed = lastUsed;
1✔
1490
      this.size = size;
1✔
1491
    }
1✔
1492

1493
    IndexEntry(ByteBuffer buffer) throws StoreCorruptionException {
1✔
1494
      hash = new Hash(buffer);
1✔
1495
      lastUsed = buffer.getLong();
1✔
1496
      size = getPositiveLong(buffer);
1✔
1497
    }
1✔
1498

1499
    void writeTo(ByteBuffer buffer) {
1500
      hash.writeTo(buffer);
1✔
1501
      buffer.putLong(lastUsed);
1✔
1502
      buffer.putLong(size);
1✔
1503
    }
1✔
1504

1505
    @Override
1506
    public int hashCode() {
1507
      return hash.hashCode();
1✔
1508
    }
1509

1510
    @Override
1511
    public boolean equals(@Nullable Object obj) {
1512
      if (obj == this) {
×
1513
        return true;
×
1514
      }
1515
      if (!(obj instanceof IndexEntry)) {
×
1516
        return false;
×
1517
      }
1518
      return hash.equals(((IndexEntry) obj).hash);
×
1519
    }
1520
  }
1521

1522
  private static final class EntryDescriptor {
1523
    final String key;
1524
    final ByteBuffer metadata;
1525
    final long dataSize;
1526
    final long dataCrc32c;
1527

1528
    EntryDescriptor(String key, ByteBuffer metadata, long dataSize, long dataCrc32c) {
1✔
1529
      this.key = key;
1✔
1530
      this.metadata = metadata.asReadOnlyBuffer();
1✔
1531
      this.dataSize = dataSize;
1✔
1532
      this.dataCrc32c = dataCrc32c;
1✔
1533
    }
1✔
1534

1535
    ByteBuffer encodeToEpilogue(int appVersion) {
1536
      var encodedKey = UTF_8.encode(key);
1✔
1537
      int keySize = encodedKey.remaining();
1✔
1538
      int metadataSize = metadata.remaining();
1✔
1539
      var epilogue =
1✔
1540
          ByteBuffer.allocate(keySize + metadataSize + ENTRY_TRAILER_SIZE)
1✔
1541
              .put(encodedKey)
1✔
1542
              .put(metadata.duplicate())
1✔
1543
              .putLong(ENTRY_MAGIC)
1✔
1544
              .putInt(STORE_VERSION)
1✔
1545
              .putInt(appVersion)
1✔
1546
              .putInt(keySize)
1✔
1547
              .putInt(metadataSize)
1✔
1548
              .putLong(dataSize)
1✔
1549
              .putInt((int) dataCrc32c);
1✔
1550
      var crc32c = new CRC32C();
1✔
1551
      crc32c.update(epilogue.flip());
1✔
1552
      return epilogue.limit(epilogue.capacity()).putInt((int) crc32c.getValue()).flip();
1✔
1553
    }
1554
  }
1555

1556
  private final class Entry {
1557
    static final int ANY_VERSION = -1;
1558

1559
    final Hash hash;
1560

1561
    private final ReentrantLock lock = new ReentrantLock();
1✔
1562

1563
    @GuardedBy("lock")
1564
    private long lastUsed;
1565

1566
    @GuardedBy("lock")
1567
    private long size;
1568

1569
    @GuardedBy("lock")
1570
    private int viewerCount;
1571

1572
    @GuardedBy("lock")
1573
    private @Nullable DiskEditor currentEditor;
1574

1575
    @GuardedBy("lock")
1576
    private int version;
1577

1578
    @GuardedBy("lock")
1579
    private boolean readable;
1580

1581
    @GuardedBy("lock")
1582
    private boolean writable;
1583

1584
    private @MonotonicNonNull Path lazyEntryFile;
1585
    private @MonotonicNonNull Path lazyTempEntryFile;
1586

1587
    /** This entry's descriptor as known from the last read or write. */
1588
    @GuardedBy("lock")
1589
    private @MonotonicNonNull EntryDescriptor cachedDescriptor;
1590

1591
    Entry(Hash hash) {
1✔
1592
      this.hash = hash;
1✔
1593
      this.lastUsed = -1;
1✔
1594
      this.readable = false;
1✔
1595
      this.writable = true;
1✔
1596
    }
1✔
1597

1598
    Entry(IndexEntry indexEntry) {
1✔
1599
      this.hash = indexEntry.hash;
1✔
1600
      this.lastUsed = indexEntry.lastUsed;
1✔
1601
      this.size = indexEntry.size;
1✔
1602
      this.readable = true;
1✔
1603
      this.writable = true;
1✔
1604
    }
1✔
1605

1606
    @Nullable IndexEntry toIndexEntry() {
1607
      lock.lock();
1✔
1608
      try {
1609
        return readable ? new IndexEntry(hash, lastUsed, size) : null;
1✔
1610
      } finally {
1611
        lock.unlock();
1✔
1612
      }
1613
    }
1614

1615
    @Nullable Viewer view(@Nullable String expectedKey) throws IOException {
1616
      var viewer = openViewerForKey(expectedKey);
1✔
1617
      if (viewer != null) {
1✔
1618
        indexWriteScheduler.trySchedule();
1✔
1619
      }
1620
      return viewer;
1✔
1621
    }
1622

1623
    private @Nullable Viewer openViewerForKey(@Nullable String expectedKey) throws IOException {
1624
      lock.lock();
1✔
1625
      try {
1626
        if (!readable) {
1✔
1627
          return null;
1✔
1628
        }
1629

1630
        try (var closeable = new FileChannelCloseable(FileChannel.open(entryFile(), READ))) {
1✔
1631
          var channel = closeable.channel();
1✔
1632
          var descriptor = readDescriptorForKey(channel, expectedKey);
1✔
1633
          if (descriptor == null) {
1✔
1634
            return null;
1✔
1635
          }
1636
          var viewer = createViewer(channel, version, descriptor);
1✔
1637
          closeable.keepOpen();
1✔
1638
          return viewer;
1✔
1639
        } catch (NoSuchFileException missingEntryFile) {
1✔
1640
          // Our file disappeared! We'll handle this gracefully by making the store lose track of
1641
          // us. This is done after releasing the lock to not incur a potential index write while
1642
          // holding it in case the executor is synchronous.
1643
          logger.log(Level.WARNING, "Dropping entry with missing file", missingEntryFile);
1✔
1644
        }
1645
      } finally {
1646
        lock.unlock();
1✔
1647
      }
1648

1649
      try {
1650
        removeEntry(this);
1✔
1651
      } catch (IOException e) {
×
1652
        logger.log(Level.WARNING, "Exception while deleting already non-existent entry");
×
1653
      }
1✔
1654
      return null;
1✔
1655
    }
1656

1657
    @GuardedBy("lock") // Lock must be held due to potential write to cachedDescriptor.
1658
    private @Nullable EntryDescriptor readDescriptorForKey(
1659
        FileChannel channel, @Nullable String expectedKey) throws IOException {
1660
      var descriptor = cachedDescriptor;
1✔
1661
      if (descriptor == null) {
1✔
1662
        descriptor = readDescriptor(channel);
1✔
1663
      }
1664
      if (keyMismatches(descriptor.key, expectedKey)) {
1✔
1665
        return null;
1✔
1666
      }
1667
      cachedDescriptor = descriptor;
1✔
1668
      return descriptor;
1✔
1669
    }
1670

1671
    @GuardedBy("lock")
1672
    private EntryDescriptor readDescriptor(FileChannel channel) throws IOException {
1673
      // TODO a smarter thing to do is to read a larger buffer from the end and optimistically
1674
      //      expect key and metadata to be there. Or store the sizes of metadata, data & key in
1675
      //      the index.
1676
      long fileSize = channel.size();
1✔
1677
      var trailer = FileIO.read(channel, ENTRY_TRAILER_SIZE, fileSize - ENTRY_TRAILER_SIZE);
1✔
1678
      long magic = trailer.getLong();
1✔
1679
      int storeVersion = trailer.getInt();
1✔
1680
      int appVersion = trailer.getInt();
1✔
1681
      int keySize = getNonNegativeInt(trailer);
1✔
1682
      int metadataSize = getNonNegativeInt(trailer);
1✔
1683
      long dataSize = getNonNegativeLong(trailer);
1✔
1684
      long dataCrc32c = trailer.getInt() & INT_MASK;
1✔
1685
      long epilogueCrc32c = trailer.getInt() & INT_MASK;
1✔
1686
      checkValue(ENTRY_MAGIC, magic, "Not in entry file format");
1✔
1687
      checkValue(STORE_VERSION, storeVersion, "Unexpected store version");
1✔
1688
      checkValue(DiskStore.this.appVersion, appVersion, "Unexpected app version");
1✔
1689
      var keyAndMetadata = FileIO.read(channel, keySize + metadataSize, dataSize);
1✔
1690
      var key = UTF_8.decode(keyAndMetadata.limit(keySize)).toString();
1✔
1691
      var metadata =
1✔
1692
          ByteBuffer.allocate(keyAndMetadata.limit(keySize + metadataSize).remaining())
1✔
1693
              .put(keyAndMetadata)
1✔
1694
              .flip()
1✔
1695
              .asReadOnlyBuffer();
1✔
1696
      var crc32c = new CRC32C();
1✔
1697
      crc32c.update(keyAndMetadata.rewind());
1✔
1698
      crc32c.update(trailer.rewind().limit(trailer.limit() - Integer.BYTES));
1✔
1699
      checkValue(crc32c.getValue(), epilogueCrc32c, "Unexpected epilogue checksum");
1✔
1700
      return new EntryDescriptor(key, metadata, dataSize, dataCrc32c);
1✔
1701
    }
1702

1703
    @GuardedBy("lock")
1704
    private Viewer createViewer(FileChannel channel, int version, EntryDescriptor descriptor) {
1705
      var viewer = new DiskViewer(this, version, descriptor, channel);
1✔
1706
      viewerCount++;
1✔
1707
      lastUsed = lruClock.getAndIncrement();
1✔
1708
      return viewer;
1✔
1709
    }
1710

1711
    @Nullable Editor edit(String key, int targetVersion) throws IOException {
1712
      lock.lock();
1✔
1713
      try {
1714
        if (!writable
1✔
1715
            || currentEditor != null
1716
            || (targetVersion != ANY_VERSION && targetVersion != version)) {
1717
          return null;
1✔
1718
        }
1719
        var editor = new DiskEditor(this, key, FileChannel.open(tempEntryFile(), WRITE, CREATE));
1✔
1720
        currentEditor = editor;
1✔
1721
        return editor;
1✔
1722
      } finally {
1723
        lock.unlock();
1✔
1724
      }
1725
    }
1726

1727
    void commit(
1728
        DiskEditor editor,
1729
        String key,
1730
        ByteBuffer metadata,
1731
        FileChannel editorChannel,
1732
        long dataSize,
1733
        long dataCrc32c)
1734
        throws IOException {
1735
      long newSize;
1736
      long sizeDifference;
1737
      lock.lock();
1✔
1738
      try {
1739
        requireState(currentEditor == editor, "Edit discarded");
1✔
1740
        currentEditor = null;
1✔
1741

1742
        requireState(writable, "Committing a non-discarded edit to a non-writable entry");
1✔
1743

1744
        EntryDescriptor committedDescriptor;
1745
        boolean editInPlace = dataSize < 0 && readable;
1✔
1746
        try (editorChannel;
1✔
1747
            var existingEntryChannel =
1748
                editInPlace ? FileChannel.open(entryFile(), READ, WRITE) : null) {
1✔
1749
          var targetChannel = editorChannel;
1✔
1750
          EntryDescriptor existingEntryDescriptor = null;
1✔
1751
          if (existingEntryChannel != null
1✔
1752
              && (existingEntryDescriptor = readDescriptorForKey(existingEntryChannel, key))
1✔
1753
                  != null) {
1754
            targetChannel = existingEntryChannel;
1✔
1755
            committedDescriptor =
1✔
1756
                new EntryDescriptor(
1757
                    key,
1758
                    metadata,
1759
                    existingEntryDescriptor.dataSize,
1760
                    existingEntryDescriptor.dataCrc32c);
1761

1762
            // Close editor's file channel before deleting. See isolatedDeleteIfExists(Path).
1763
            closeQuietly(editorChannel);
1✔
1764

1765
            // Make the entry file temporarily unreadable before modifying it. This also has to
1766
            // reflect on store's size.
1767
            replace(entryFile(), tempEntryFile());
1✔
1768
            readable = false;
1✔
1769
            DiskStore.this.size.addAndGet(-size);
1✔
1770
            size = 0;
1✔
1771
          } else {
1772
            committedDescriptor =
1✔
1773
                new EntryDescriptor(key, metadata, Math.max(dataSize, 0), dataCrc32c);
1✔
1774
          }
1775

1776
          int written =
1✔
1777
              FileIO.write(
1✔
1778
                  targetChannel,
1779
                  committedDescriptor.encodeToEpilogue(appVersion),
1✔
1780
                  committedDescriptor.dataSize);
1781
          if (existingEntryDescriptor != null) {
1✔
1782
            // Truncate to correct size in case the previous entry had a larger epilogue.
1783
            targetChannel.truncate(committedDescriptor.dataSize + written);
1✔
1784
          }
1785
          targetChannel.force(false);
1✔
1786
        } catch (IOException e) {
×
1787
          discardCurrentEdit(editor);
×
1788
          throw e;
×
1789
        }
1✔
1790

1791
        if (viewerCount > 0) {
1✔
1792
          isolatedDeleteIfExists(entryFile());
1✔
1793
        }
1794
        replace(tempEntryFile(), entryFile());
1✔
1795
        version++;
1✔
1796
        newSize = committedDescriptor.metadata.remaining() + committedDescriptor.dataSize;
1✔
1797
        sizeDifference = newSize - size;
1✔
1798
        size = newSize;
1✔
1799
        readable = true;
1✔
1800
        lastUsed = lruClock.getAndIncrement();
1✔
1801
        cachedDescriptor = committedDescriptor;
1✔
1802
      } finally {
1803
        lock.unlock();
1✔
1804
      }
1805

1806
      long newStoreSize = DiskStore.this.size.addAndGet(sizeDifference);
1✔
1807

1808
      // Don't bother with the entry if it'll cause everything to be evicted.
1809
      if (newSize > maxSize) {
1✔
1810
        removeEntry(this);
1✔
1811
        return;
1✔
1812
      }
1813

1814
      if (newStoreSize > maxSize) {
1✔
1815
        evictionScheduler.schedule();
1✔
1816
      }
1817
      indexWriteScheduler.trySchedule();
1✔
1818
    }
1✔
1819

1820
    /**
1821
     * Evicts this entry if it matches the given version and returns its last committed size if it
1822
     * did get evicted, otherwise returns -1.
1823
     */
1824
    long evict(int targetVersion) throws IOException {
1825
      lock.lock();
1✔
1826
      try {
1827
        if (!writable || (targetVersion != ANY_VERSION && targetVersion != version)) {
1✔
1828
          return -1;
1✔
1829
        }
1830

1831
        if (viewerCount > 0) {
1✔
1832
          isolatedDeleteIfExists(entryFile());
1✔
1833
        } else {
1834
          Files.deleteIfExists(entryFile());
1✔
1835
        }
1836
        discardCurrentEdit();
1✔
1837
        readable = false;
1✔
1838
        writable = false;
1✔
1839
        return size;
1✔
1840
      } finally {
1841
        lock.unlock();
1✔
1842
      }
1843
    }
1844

1845
    void freeze() {
1846
      lock.lock();
1✔
1847
      try {
1848
        writable = false;
1✔
1849
        discardCurrentEdit();
1✔
1850
      } finally {
1851
        lock.unlock();
1✔
1852
      }
1853
    }
1✔
1854

1855
    @GuardedBy("lock")
1856
    private void discardCurrentEdit() {
1857
      var editor = currentEditor;
1✔
1858
      if (editor != null) {
1✔
1859
        currentEditor = null;
1✔
1860
        discardCurrentEdit(editor);
1✔
1861
      }
1862
    }
1✔
1863

1864
    @GuardedBy("lock")
1865
    private void discardCurrentEdit(DiskEditor editor) {
1866
      if (!readable) {
1✔
1867
        // Remove the entry as it could never be readable. It's safe to directly remove it from the
1868
        // map since it's not visible to the outside world at this point (no views/edits) and
1869
        // doesn't contribute to store size.
1870
        entries.remove(hash, this);
1✔
1871
      }
1872

1873
      editor.setClosed();
1✔
1874
      closeQuietly(editor.channel);
1✔
1875
      deleteIfExistsQuietly(tempEntryFile());
1✔
1876
    }
1✔
1877

1878
    void discardIfCurrentEdit(DiskEditor editor) {
1879
      lock.lock();
1✔
1880
      try {
1881
        if (editor == currentEditor) {
1✔
1882
          currentEditor = null;
1✔
1883
          discardCurrentEdit(editor);
1✔
1884
        }
1885
      } finally {
1886
        lock.unlock();
1✔
1887
      }
1888
    }
1✔
1889

1890
    void decrementViewerCount() {
1891
      lock.lock();
1✔
1892
      try {
1893
        viewerCount--;
1✔
1894
      } finally {
1895
        lock.unlock();
1✔
1896
      }
1897
    }
1✔
1898

1899
    @Nullable String keyIfKnown(int[] versionHolder) {
1900
      lock.lock();
1✔
1901
      try {
1902
        var descriptor = cachedDescriptor;
1✔
1903
        if (descriptor != null) {
1✔
1904
          versionHolder[0] = version;
1✔
1905
          return descriptor.key;
1✔
1906
        }
1907
        return null;
1✔
1908
      } finally {
1909
        lock.unlock();
1✔
1910
      }
1911
    }
1912

1913
    @Nullable String currentEditorKey() {
1914
      lock.lock();
1✔
1915
      try {
1916
        var editor = currentEditor;
1✔
1917
        return editor != null ? editor.key() : null;
1✔
1918
      } finally {
1919
        lock.unlock();
1✔
1920
      }
1921
    }
1922

1923
    Path entryFile() {
1924
      var entryFile = lazyEntryFile;
1✔
1925
      if (entryFile == null) {
1✔
1926
        entryFile = directory.resolve(hash.toHexString() + ENTRY_FILE_SUFFIX);
1✔
1927
        lazyEntryFile = entryFile;
1✔
1928
      }
1929
      return entryFile;
1✔
1930
    }
1931

1932
    Path tempEntryFile() {
1933
      var entryFile = lazyTempEntryFile;
1✔
1934
      if (entryFile == null) {
1✔
1935
        entryFile = directory.resolve(hash.toHexString() + TEMP_ENTRY_FILE_SUFFIX);
1✔
1936
        lazyTempEntryFile = entryFile;
1✔
1937
      }
1938
      return entryFile;
1✔
1939
    }
1940
  }
1941

1942
  private final class DiskViewer implements Viewer {
1943
    private final Entry entry;
1944

1945
    /**
1946
     * Entry's version at the time of opening this viewer. This is used to not edit or remove an
1947
     * entry that's been updated after this viewer had been created.
1948
     */
1949
    private final int entryVersion;
1950

1951
    private final EntryDescriptor descriptor;
1952
    private final FileChannel channel;
1953
    private final AtomicBoolean closed = new AtomicBoolean();
1✔
1954
    private final AtomicBoolean createdFirstReader = new AtomicBoolean();
1✔
1955

1956
    DiskViewer(Entry entry, int entryVersion, EntryDescriptor descriptor, FileChannel channel) {
1✔
1957
      this.entry = entry;
1✔
1958
      this.entryVersion = entryVersion;
1✔
1959
      this.descriptor = descriptor;
1✔
1960
      this.channel = channel;
1✔
1961
    }
1✔
1962

1963
    @Override
1964
    public String key() {
1965
      return descriptor.key;
1✔
1966
    }
1967

1968
    @Override
1969
    public ByteBuffer metadata() {
1970
      return descriptor.metadata.duplicate();
1✔
1971
    }
1972

1973
    @Override
1974
    public EntryReader newReader() {
1975
      return createdFirstReader.compareAndSet(false, true)
1✔
1976
          ? new ScatteringDiskEntryReader()
1✔
1977
          : new DiskEntryReader();
1✔
1978
    }
1979

1980
    @Override
1981
    public Optional<Editor> edit() throws IOException {
1982
      return Optional.ofNullable(entry.edit(key(), entryVersion));
1✔
1983
    }
1984

1985
    @Override
1986
    public CompletableFuture<Optional<Editor>> edit(Executor executor) {
1987
      return Unchecked.supplyAsync(this::edit, executor);
1✔
1988
    }
1989

1990
    @Override
1991
    public long dataSize() {
1992
      return descriptor.dataSize;
1✔
1993
    }
1994

1995
    @Override
1996
    public long entrySize() {
1997
      return descriptor.metadata.remaining() + descriptor.dataSize;
1✔
1998
    }
1999

2000
    @Override
2001
    public boolean removeEntry() throws IOException {
2002
      return DiskStore.this.removeEntry(entry, entryVersion);
1✔
2003
    }
2004

2005
    @Override
2006
    public void close() {
2007
      closeQuietly(channel);
1✔
2008
      if (closed.compareAndSet(false, true)) {
1✔
2009
        entry.decrementViewerCount();
1✔
2010
      }
2011
    }
1✔
2012

2013
    private class DiskEntryReader implements EntryReader {
2014
      final Lock lock = new ReentrantLock();
1✔
2015
      final CRC32C crc32C = new CRC32C();
1✔
2016
      long position;
2017

2018
      DiskEntryReader() {}
1✔
2019

2020
      @Override
2021
      public int read(ByteBuffer dst) throws IOException {
2022
        requireNonNull(dst);
1✔
2023
        lock.lock();
1✔
2024
        try {
2025
          // Make sure we don't exceed data stream bounds.
2026
          long available = descriptor.dataSize - position;
1✔
2027
          if (available <= 0) {
1✔
2028
            return -1;
1✔
2029
          }
2030

2031
          int maxReadable = (int) Math.min(available, dst.remaining());
1✔
2032
          var boundedDst = dst.duplicate().limit(dst.position() + maxReadable);
1✔
2033
          int read = readBytes(boundedDst);
1✔
2034
          position += read;
1✔
2035
          crc32C.update(boundedDst.rewind());
1✔
2036
          checkCrc32cIfEndOfStream();
1✔
2037
          dst.position(dst.position() + read);
1✔
2038
          return read;
1✔
2039
        } finally {
2040
          lock.unlock();
1✔
2041
        }
2042
      }
2043

2044
      @Override
2045
      public CompletableFuture<Integer> read(ByteBuffer dst, Executor executor) {
2046
        return Unchecked.supplyAsync(() -> read(dst), executor);
×
2047
      }
2048

2049
      @Override
2050
      public long read(List<ByteBuffer> dsts) throws IOException {
2051
        long totalRead = 0;
×
2052
        outerLoop:
2053
        for (var dst : dsts) {
×
2054
          int read;
2055
          while (dst.hasRemaining()) {
×
2056
            read = read(dst);
×
2057
            if (read >= 0) {
×
2058
              totalRead += read;
×
2059
            } else if (totalRead > 0) {
×
2060
              break outerLoop;
×
2061
            } else {
2062
              return -1;
×
2063
            }
2064
          }
2065
        }
×
2066
        return totalRead;
×
2067
      }
2068

2069
      @Override
2070
      public CompletableFuture<Long> read(List<ByteBuffer> dsts, Executor executor) {
2071
        return Unchecked.supplyAsync(() -> read(dsts), executor);
1✔
2072
      }
2073

2074
      int readBytes(ByteBuffer dst) throws IOException {
2075
        return FileIO.read(channel, dst, position);
1✔
2076
      }
2077

2078
      void checkCrc32cIfEndOfStream() throws StoreCorruptionException {
2079
        if (position == descriptor.dataSize) {
1✔
2080
          checkValue(crc32C.getValue(), descriptor.dataCrc32c, "Unexpected data checksum");
1✔
2081
        }
2082
      }
1✔
2083
    }
2084

2085
    /**
2086
     * A reader that uses scattering API for bulk reads. This reader relies on the file's native
2087
     * position (scattering API doesn't take a position argument). As such, it must only be created
2088
     * once. Note that this restriction makes scattering reads inefficient, or not as efficient, for
2089
     * readers created after the first reader.
2090
     */
2091
    private final class ScatteringDiskEntryReader extends DiskEntryReader {
2092
      ScatteringDiskEntryReader() {}
1✔
2093

2094
      @Override
2095
      int readBytes(ByteBuffer dst) throws IOException {
2096
        return FileIO.read(channel, dst); // Use native file position.
1✔
2097
      }
2098

2099
      @Override
2100
      public long read(List<ByteBuffer> dsts) throws IOException {
2101
        requireNonNull(dsts);
1✔
2102
        lock.lock();
1✔
2103
        try {
2104
          // Make sure we don't exceed data stream bounds.
2105
          long available = descriptor.dataSize - position;
1✔
2106
          if (available <= 0) {
1✔
2107
            return -1;
1✔
2108
          }
2109

2110
          var boundedDsts = new ArrayList<ByteBuffer>(dsts.size());
1✔
2111
          long maxReadableSoFar = 0;
1✔
2112
          for (var dst : dsts) {
1✔
2113
            int dstMaxReadable = (int) Math.min(dst.remaining(), available - maxReadableSoFar);
1✔
2114
            boundedDsts.add(dst.duplicate().limit(dst.position() + dstMaxReadable));
1✔
2115
            maxReadableSoFar = Math.addExact(maxReadableSoFar, dstMaxReadable);
1✔
2116
            if (maxReadableSoFar >= available) {
1✔
2117
              break;
1✔
2118
            }
2119
          }
1✔
2120

2121
          long read = FileIO.read(channel, boundedDsts.toArray(ByteBuffer[]::new));
1✔
2122
          position += read;
1✔
2123
          for (var boundedDst : boundedDsts) {
1✔
2124
            crc32C.update(boundedDst.rewind());
1✔
2125
          }
1✔
2126
          checkCrc32cIfEndOfStream();
1✔
2127
          for (int i = 0; i < boundedDsts.size(); i++) {
1✔
2128
            dsts.get(i).position(boundedDsts.get(i).position());
1✔
2129
          }
2130
          return read;
1✔
2131
        } finally {
2132
          lock.unlock();
1✔
2133
        }
2134
      }
2135
    }
2136
  }
2137

2138
  private static final class DiskEditor implements Editor {
2139
    private final Entry entry;
2140
    private final String key;
2141
    private final FileChannel channel;
2142
    private final DiskEntryWriter writer;
2143
    private final AtomicBoolean closed = new AtomicBoolean();
1✔
2144

2145
    DiskEditor(Entry entry, String key, FileChannel channel) {
1✔
2146
      this.entry = entry;
1✔
2147
      this.key = key;
1✔
2148
      this.channel = channel;
1✔
2149
      this.writer = new DiskEntryWriter();
1✔
2150
    }
1✔
2151

2152
    @Override
2153
    public String key() {
2154
      return key;
×
2155
    }
2156

2157
    @Override
2158
    public EntryWriter writer() {
2159
      return writer;
1✔
2160
    }
2161

2162
    @Override
2163
    public void commit(ByteBuffer metadata) throws IOException {
2164
      requireNonNull(metadata);
1✔
2165
      requireState(closed.compareAndSet(false, true), "closed");
1✔
2166
      internalCommit(metadata);
1✔
2167
    }
1✔
2168

2169
    @Override
2170
    public CompletableFuture<Void> commit(ByteBuffer metadata, Executor executor) {
2171
      requireNonNull(metadata);
1✔
2172
      requireState(closed.compareAndSet(false, true), "closed");
1✔
2173
      return Unchecked.runAsync(() -> internalCommit(metadata), executor);
1✔
2174
    }
2175

2176
    private void internalCommit(ByteBuffer metadata) throws IOException {
2177
      long[] crc32cHolder = new long[1];
1✔
2178
      long dataSize = writer.dataSizeIfWritten(crc32cHolder);
1✔
2179
      entry.commit(this, key, metadata, channel, dataSize, crc32cHolder[0]);
1✔
2180
    }
1✔
2181

2182
    @Override
2183
    public void close() {
2184
      if (closed.compareAndSet(false, true)) {
1✔
2185
        entry.discardIfCurrentEdit(this);
1✔
2186
      }
2187
    }
1✔
2188

2189
    public void setClosed() {
2190
      closed.set(true);
1✔
2191
    }
1✔
2192

2193
    private final class DiskEntryWriter implements EntryWriter {
2194
      private final Lock lock = new ReentrantLock();
1✔
2195
      private final CRC32C crc32C = new CRC32C();
1✔
2196
      private long position;
2197
      private boolean isWritten;
2198

2199
      DiskEntryWriter() {}
1✔
2200

2201
      @Override
2202
      public int write(ByteBuffer src) throws IOException {
2203
        requireNonNull(src);
1✔
2204
        requireState(!closed.get(), "closed");
1✔
2205
        lock.lock();
1✔
2206
        try {
2207
          int srcPosition = src.position();
1✔
2208
          int written = FileIO.write(channel, src);
1✔
2209
          crc32C.update(src.position(srcPosition));
1✔
2210
          position += written;
1✔
2211
          isWritten = true;
1✔
2212
          return written;
1✔
2213
        } finally {
2214
          lock.unlock();
1✔
2215
        }
2216
      }
2217

2218
      @Override
2219
      public CompletableFuture<Integer> write(ByteBuffer src, Executor executor) {
2220
        requireNonNull(src);
×
2221
        return Unchecked.supplyAsync(() -> write(src), executor);
×
2222
      }
2223

2224
      @Override
2225
      public long write(List<ByteBuffer> srcs) throws IOException {
2226
        requireNonNull(srcs);
1✔
2227
        requireState(!closed.get(), "closed");
1✔
2228
        lock.lock();
1✔
2229
        try {
2230
          var srcsArray = srcs.toArray(ByteBuffer[]::new);
1✔
2231
          int[] srcPositions = new int[srcsArray.length];
1✔
2232
          for (int i = 0; i < srcsArray.length; i++) {
1✔
2233
            srcPositions[i] = srcsArray[i].position();
1✔
2234
          }
2235
          long written = FileIO.write(channel, srcsArray);
1✔
2236
          for (int i = 0; i < srcsArray.length; i++) {
1✔
2237
            crc32C.update(srcsArray[i].position(srcPositions[i]));
1✔
2238
          }
2239
          position += written;
1✔
2240
          isWritten = true;
1✔
2241
          return written;
1✔
2242
        } finally {
2243
          lock.unlock();
1✔
2244
        }
2245
      }
2246

2247
      @Override
2248
      public CompletableFuture<Long> write(List<ByteBuffer> srcs, Executor executor) {
2249
        requireNonNull(srcs);
1✔
2250
        return Unchecked.supplyAsync(() -> write(srcs), executor);
1✔
2251
      }
2252

2253
      long dataSizeIfWritten(long[] crc32cHolder) {
2254
        lock.lock();
1✔
2255
        try {
2256
          if (isWritten) {
1✔
2257
            crc32cHolder[0] = crc32C.getValue();
1✔
2258
            return position;
1✔
2259
          } else {
2260
            return -1;
1✔
2261
          }
2262
        } finally {
2263
          lock.unlock();
1✔
2264
        }
2265
      }
2266
    }
2267
  }
2268

2269
  public static final class Builder {
2270
    private static final long DEFAULT_INDEX_UPDATE_DELAY_MILLIS = 2000;
2271
    private static final Duration DEFAULT_INDEX_UPDATE_DELAY;
2272

2273
    static {
2274
      long millis =
1✔
2275
          Long.getLong(
1✔
2276
              "com.github.mizosoft.methanol.internal.cache.DiskStore.indexUpdateDelayMillis",
2277
              DEFAULT_INDEX_UPDATE_DELAY_MILLIS);
2278
      if (millis < 0) {
1✔
2279
        millis = DEFAULT_INDEX_UPDATE_DELAY_MILLIS;
×
2280
      }
2281
      DEFAULT_INDEX_UPDATE_DELAY = Duration.ofMillis(millis);
1✔
2282
    }
1✔
2283

2284
    private static final int UNSET_NUMBER = -1;
2285

2286
    private long maxSize = UNSET_NUMBER;
1✔
2287
    private @MonotonicNonNull Path directory;
2288
    private @MonotonicNonNull Executor executor;
2289
    private int appVersion = UNSET_NUMBER;
1✔
2290
    private @MonotonicNonNull Hasher hasher;
2291
    private @MonotonicNonNull Clock clock;
2292
    private @MonotonicNonNull Delayer delayer;
2293
    private @MonotonicNonNull Duration indexUpdateDelay;
2294
    private boolean debugIndexOps;
2295

2296
    Builder() {}
1✔
2297

2298
    @CanIgnoreReturnValue
2299
    public Builder directory(Path directory) {
2300
      this.directory = requireNonNull(directory);
1✔
2301
      return this;
1✔
2302
    }
2303

2304
    @CanIgnoreReturnValue
2305
    public Builder maxSize(long maxSize) {
2306
      requireArgument(maxSize > 0, "Expected a positive max size");
1✔
2307
      this.maxSize = maxSize;
1✔
2308
      return this;
1✔
2309
    }
2310

2311
    @CanIgnoreReturnValue
2312
    public Builder executor(Executor executor) {
2313
      this.executor = requireNonNull(executor);
1✔
2314
      return this;
1✔
2315
    }
2316

2317
    @CanIgnoreReturnValue
2318
    public Builder appVersion(int appVersion) {
2319
      this.appVersion = appVersion;
1✔
2320
      return this;
1✔
2321
    }
2322

2323
    @CanIgnoreReturnValue
2324
    public Builder hasher(Hasher hasher) {
2325
      this.hasher = requireNonNull(hasher);
1✔
2326
      return this;
1✔
2327
    }
2328

2329
    @CanIgnoreReturnValue
2330
    public Builder clock(Clock clock) {
2331
      this.clock = requireNonNull(clock);
1✔
2332
      return this;
1✔
2333
    }
2334

2335
    @CanIgnoreReturnValue
2336
    public Builder delayer(Delayer delayer) {
2337
      this.delayer = requireNonNull(delayer);
1✔
2338
      return this;
1✔
2339
    }
2340

2341
    @CanIgnoreReturnValue
2342
    public Builder indexUpdateDelay(Duration duration) {
2343
      this.indexUpdateDelay = requireNonNegativeDuration(duration);
1✔
2344
      return this;
1✔
2345
    }
2346

2347
    /**
2348
     * If set, the store complains when the index is accessed or modified either concurrently or not
2349
     * within the index executor.
2350
     */
2351
    @CanIgnoreReturnValue
2352
    public Builder debugIndexOps(boolean on) {
2353
      this.debugIndexOps = on;
1✔
2354
      return this;
1✔
2355
    }
2356

2357
    public DiskStore build() throws IOException {
2358
      return new DiskStore(this, debugIndexOps || DebugUtils.isAssertionsEnabled());
1✔
2359
    }
2360

2361
    long maxSize() {
2362
      long maxSize = this.maxSize;
1✔
2363
      requireState(maxSize != UNSET_NUMBER, "Expected maxSize to bet set");
1✔
2364
      return maxSize;
1✔
2365
    }
2366

2367
    int appVersion() {
2368
      int appVersion = this.appVersion;
1✔
2369
      requireState(appVersion != UNSET_NUMBER, "Expected appVersion to be set");
1✔
2370
      return appVersion;
1✔
2371
    }
2372

2373
    Path directory() {
2374
      return ensureSet(directory, "directory");
1✔
2375
    }
2376

2377
    Executor executor() {
2378
      return ensureSet(executor, "executor");
1✔
2379
    }
2380

2381
    Hasher hasher() {
2382
      return requireNonNullElse(hasher, Hasher.TRUNCATED_SHA_256);
1✔
2383
    }
2384

2385
    Clock clock() {
2386
      return requireNonNullElse(clock, Utils.systemMillisUtc());
1✔
2387
    }
2388

2389
    Duration indexUpdateDelay() {
2390
      return requireNonNullElse(indexUpdateDelay, DEFAULT_INDEX_UPDATE_DELAY);
1✔
2391
    }
2392

2393
    Delayer delayer() {
2394
      return requireNonNullElse(delayer, Delayer.systemDelayer());
1✔
2395
    }
2396

2397
    @CanIgnoreReturnValue
2398
    private <T> T ensureSet(T property, String name) {
2399
      requireState(property != null, "Expected %s to bet set", name);
1✔
2400
      return property;
1✔
2401
    }
2402
  }
2403
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc