• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

evolvedbinary / elemental / 982

29 Apr 2025 08:34PM UTC coverage: 56.409% (+0.007%) from 56.402%
982

push

circleci

adamretter
[feature] Improve README.md badges

28451 of 55847 branches covered (50.94%)

Branch coverage included in aggregate %.

77468 of 131924 relevant lines covered (58.72%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.74
/exist-core/src/main/java/org/exist/storage/blob/BlobStoreImpl.java
1
/*
2
 * Elemental
3
 * Copyright (C) 2024, Evolved Binary Ltd
4
 *
5
 * admin@evolvedbinary.com
6
 * https://www.evolvedbinary.com | https://www.elemental.xyz
7
 *
8
 * Use of this software is governed by the Business Source License 1.1
9
 * included in the LICENSE file and at www.mariadb.com/bsl11.
10
 *
11
 * Change Date: 2028-04-27
12
 *
13
 * On the date above, in accordance with the Business Source License, use
14
 * of this software will be governed by the Apache License, Version 2.0.
15
 *
16
 * Additional Use Grant: Production use of the Licensed Work for a permitted
17
 * purpose. A Permitted Purpose is any purpose other than a Competing Use.
18
 * A Competing Use means making the Software available to others in a commercial
19
 * product or service that: substitutes for the Software; substitutes for any
20
 * other product or service we offer using the Software that exists as of the
21
 * date we make the Software available; or offers the same or substantially
22
 * similar functionality as the Software.
23
 */
24
package org.exist.storage.blob;
25

26
import com.evolvedbinary.j8fu.Try;
27
import com.evolvedbinary.j8fu.tuple.Tuple2;
28
import com.evolvedbinary.j8fu.tuple.Tuple3;
29
import net.jcip.annotations.ThreadSafe;
30
import org.apache.commons.io.input.CountingInputStream;
31
import org.apache.logging.log4j.LogManager;
32
import org.apache.logging.log4j.Logger;
33
import org.exist.Database;
34
import org.exist.backup.RawDataBackup;
35
import org.exist.storage.journal.JournalException;
36
import org.exist.storage.journal.JournalManager;
37
import org.exist.storage.journal.LogEntryTypes;
38
import org.exist.storage.journal.LogException;
39
import org.exist.storage.txn.Txn;
40
import org.exist.storage.txn.TxnListener;
41
import org.exist.util.FileUtils;
42
import org.exist.util.UUIDGenerator;
43
import org.exist.util.crypto.digest.DigestInputStream;
44
import org.exist.util.crypto.digest.DigestType;
45
import org.exist.util.crypto.digest.MessageDigest;
46
import org.exist.util.crypto.digest.StreamableDigest;
47

48
import javax.annotation.Nullable;
49
import java.io.FileNotFoundException;
50
import java.io.FilterInputStream;
51
import java.io.IOException;
52
import java.io.InputStream;
53
import java.io.OutputStream;
54
import java.nio.ByteBuffer;
55
import java.nio.channels.SeekableByteChannel;
56
import java.nio.file.Files;
57
import java.nio.file.Path;
58
import java.util.*;
59
import java.util.concurrent.*;
60
import java.util.concurrent.atomic.AtomicBoolean;
61
import java.util.concurrent.atomic.AtomicInteger;
62
import java.util.concurrent.atomic.AtomicReference;
63
import java.util.function.Function;
64

65
import static com.evolvedbinary.j8fu.Try.TaggedTryUnchecked;
66
import static com.evolvedbinary.j8fu.tuple.Tuple.Tuple;
67
import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
68
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
69
import static java.nio.file.StandardOpenOption.*;
70
import static org.exist.storage.blob.BlobLoggable.LOG_STORE_BLOB_FILE;
71
import static org.exist.storage.blob.BlobLoggable.LOG_UPDATE_BLOB_REF_COUNT;
72
import static org.exist.storage.blob.BlobStoreImpl.BlobReference.*;
73
import static org.exist.util.FileUtils.fileName;
74
import static org.exist.util.HexEncoder.bytesToHex;
75
import static org.exist.util.ThreadUtils.nameInstanceThread;
76
import static org.exist.util.ThreadUtils.newInstanceSubThreadGroup;
77

78
/**
79
 * De-duplicating store for BLOBs (Binary Large Objects).
80
 *
81
 * Each unique BLOB is stored by checksum into a blob file on disk.
82
 *
83
 * For each BLOB a reference count and the number of active readers is maintained.
84
 * Adding a BLOB which is already present increments the reference count only,
85
 * it does not require any additional storage.
86
 *
87
 * Removing a BLOB decrements its reference count, BLOBs are only removed when
88
 * their reference count reaches zero, the blob file itself is scheduled for deletion
89
 * and will only be removed when there are no active readers and its reference is zero.
90
 * When the scheduled action for deleting a blob file is realised, if another thread
91
 * has meanwhile added a BLOB to the BLOB store where its blob file has the same
92
 * checksum, then the BLOB's reference count will have increased from zero,
93
 * therefore the schedule will not delete this now again active blob file, we call
94
 * this feature "recycling".
95
 *
96
 * The Blob Store is backed to disk by a persistent store file which
97
 * reflects the in-memory state of BlobStore.
98
 *
99
 * The persistent store file will grow for each unique blob added to
100
 * the system, space is not reclaimed in the persistent file until
101
 * {@link #compactPersistentReferences(ByteBuffer, Path)} is called,
102
 * which typically only happens the next time the blob store is re-opened.
103
 *
104
 * Each unique blob typically takes up only 36 bytes in the
105
 * persistent store file, but this can vary if a smaller or larger
106
 * digestType is specified.
107
 *
108
 * On-line compaction of the persistent file could be added in
109
 * future with relative ease if deemed necessary.
110
 *
111
 * The persistent file for the blob store has the format:
112
 *
113
 * [fileHeader entry+]
114
 *
115
 * fileHeader:          [magicNumber blobStoreVersion].
116
 * magicNumber:         4 bytes. See {@link #BLOB_STORE_MAGIC_NUMBER}.
117
 * blobStoreVersion:    2 bytes. java.lang.short, see {@link #BLOB_STORE_VERSION}.
118
 *
119
 * entry:               [blobChecksum blobReferenceCount]
120
 * blobChecksum:        n-bytes determined by the constructed {@link MessageDigest}.
121
 * blobReferenceCount:  4 bytes. java.lang.int.
122
 *
123
 * Note the persistent file may contain more than one entry
124
 * for the same blobChecksum, however all entries previous to
125
 * the last entry will have a blobReferenceCount of zero. The last
126
 * entry will have the current blobReferenceCount which may be
127
 * zero or more.
128
 *     The use of {@code orphanedBlobFileIds} in the
129
 * {@link #compactPersistentReferences(ByteBuffer, Path)} method
130
 * makes sure to only delete blob files which have a final
131
 * blobReferenceCount of zero.
132
 *
133
 * For performance, writing to the persistent store file,
134
 * removing staged blob files, and deleting blob files are all
135
 * asynchronous actions. To ensure the ability to achieve a consistent
136
 * state after a system crash, the BLOB Store writes entries to a
137
 * Journal WAL (Write-Ahead-Log) which is flushed to disk before each state
138
 * changing operation. If the system restarts after a crash, then a recovery
139
 * process will be performed from the entries in the WAL.
140
 *
141
 * Journal and Recovery of the BLOB Store works as follows:
142
 *
143
 *  Add Blob:
144
 *    Writes two journal entries:
145
 *      * StoreBlobFile(blobId, stagedUuid)
146
 *      * UpdateBlobReferenceCount(blobId, currentCount, newCount + 1)
147
 *
148
 *      On crash recovery, firstly:
149
 *          the StoreBlobFile will either be:
150
 *              1. undone, which copies the blob file from the blob store to the staging area,
151
 *              2. redone, which copies the blob file from the staging area to the blob store,
152
 *              3. or both undone and redone.
153
 *          This is possible because the blob file in the staging area is ONLY deleted after
154
 *          a COMMIT and CHECKPOINT have been written to the Journal, which means
155
 *          that the staged file is always available for recovery, and that no
156
 *          crash recovery of the staged file itself is needed
157
 *
158
 *          Deletion of the staged
159
 *          file happens on a best effort basis, any files which were not deleted due
160
 *          to a system crash, will be deleted upon restart (after recovery) when
161
 *          the Blob Store is next opened.
162
 *
163
 *      Secondly:
164
 *          the BlobReferenceCount will either be undone, redone, or both.
165
 *
166
 *
167
 *  Remove Blob:
168
 *      Writes a single journal entry:
169
 *        *  UpdateBlobReferenceCount(blobId, currentCount, currentCount - 1)
170
 *
171
 *      On crash recovery the BlobReferenceCount will either be undone, redone,
172
 *      or both.
173
 *
174
 *      It is worth noting that the actual blob file on disk is only ever deleted
175
 *      after a COMMIT and CHECKPOINT have been written to the Journal, and then
176
 *      only when it has zero references and zero readers. As it is only
177
 *      deleted after a CHECKPOINT, there is no need for any crash recovery of the
178
 *      disk file itself.
179
 *
180
 *      Deletion of the blob file happens on a best effort basis, any files which
181
 *      were not deleted due to a system crash, will be deleted upon restart
182
 *      (after recovery) by the {@link #compactPersistentReferences(ByteBuffer, Path)}
183
 *      process when the Blob Store is next opened.
184
 *
185
 *
186
 * @author <a href="mailto:adam@evolvedbinary.com">Adam Retter</a>
187
 */
188
@ThreadSafe
189
public class BlobStoreImpl implements BlobStore {
190

191
    private static final Logger LOG = LogManager.getLogger(BlobStoreImpl.class);
1✔
192

193
    /**
194
     * Maximum time to wait whilst trying add an
195
     * item to the vacuum queue {@link #vacuumQueue}.
196
     */
197
    private static final long VACUUM_ENQUEUE_TIMEOUT = 5000;  // 5 seconds
198

199
    /*
200
     * Journal entry types
201
     */
202
    static {
203
        LogEntryTypes.addEntryType(LOG_STORE_BLOB_FILE, StoreBlobFileLoggable::new);
1✔
204
        LogEntryTypes.addEntryType(LOG_UPDATE_BLOB_REF_COUNT, UpdateBlobRefCountLoggable::new);
1✔
205
    }
206

207
    /**
208
     * Length in bytes of the reference count.
209
     */
210
    static final int REFERENCE_COUNT_LEN = 4;
211

212
    /**
213
     * File header length
214
     */
215
    static final int BLOB_STORE_HEADER_LEN = 6;
216

217
    /**
218
     * File header - magic number
219
     */
220
    static final byte[] BLOB_STORE_MAGIC_NUMBER = {0x0E, 0x0D, 0x0B, 0x02};
1✔
221

222
    /**
223
     * File header - blob store version
224
     */
225
    public static final short BLOB_STORE_VERSION = 1;
1✔
226

227
    private ByteBuffer buffer;
228
    private SeekableByteChannel channel;
229

230
    /**
231
     * In-memory representation of the Blob Store.
232
     */
233
    private ConcurrentMap<BlobId, BlobReference> references;
234

235
    /**
236
     * Queue for communicating between the thread calling
237
     * the BlobStore and the {@link #persistentWriter} thread.
238
     *
239
     * Holds blob references which need to be updated in the
240
     * blob stores persistent dbx file ({@link #persistentFile})
241
     * on disk.
242
     */
243
    private final BlockingQueue<Tuple3<BlobId, BlobReference, Integer>> persistQueue = new LinkedBlockingQueue<>();
1✔
244

245
    /**
246
     * Queue for communicating between the thread calling
247
     * the BlobStore and the {@link #blobVacuum} thread.
248
     *
249
     * Head of the queue is the blob with the least active readers.
250
     *
251
     * Holds blob references which are scheduled to have their blob file
252
     * removed from the blob file store.
253
     */
254
    private final BlockingQueue<BlobVacuum.Request> vacuumQueue = new PriorityBlockingQueue<>();
1✔
255

256
    private final Database database;
257
    private final Path persistentFile;
258
    private final Path blobDir;
259
    private final Path stagingDir;
260
    private final DigestType digestType;
261

262
    /**
263
     * Enumeration of possible
264
     * Blob Store states.
265
     */
266
    private enum State {
1✔
267
        OPENING,
1✔
268
        OPEN,
1✔
269
        RECOVERY,
1✔
270
        CLOSING,
1✔
271
        CLOSED
1✔
272
    }
273

274
    /**
275
     * State of the Blob Store
276
     */
277
    private final AtomicReference<State> state = new AtomicReference<>(State.CLOSED);
1✔
278

279
    /**
280
     * Thread which updates the persistent blob
281
     * store file on disk.
282
     */
283
    private PersistentWriter persistentWriter;
284
    private Thread persistentWriterThread;
285

286
    /**
287
     * Thread which deletes de-referenced
288
     * blob files when there are no
289
     * more readers.
290
     */
291
    private BlobVacuum blobVacuum;
292
    private Thread blobVacuumThread;
293

294
    /**
295
     * @param database the database that this BlobStore is operating within
296
     * @param persistentFile the file path for the persistent blob store metadata.
297
     * @param blobDir the directory to store BLOBs in.
298
     * @param digestType the message digest type to use for creating checksums of the BLOBs.
299
     */
300
    public BlobStoreImpl(final Database database, final Path persistentFile, final Path blobDir,
1✔
301
            final DigestType digestType) {
302
        this.database = database;
1✔
303
        this.persistentFile = persistentFile;
1✔
304
        this.blobDir = blobDir;
1✔
305
        this.stagingDir = blobDir.resolve("staging");
1✔
306
        this.digestType = digestType;
1✔
307
    }
1✔
308

309
    @Override
310
    public void open() throws IOException {
311
        openBlobStore(false);
1✔
312

313
        // thread group for the blob store
314
        final ThreadGroup blobStoreThreadGroup = newInstanceSubThreadGroup(database, "blob-store");
1✔
315

316
        // startup the persistent writer thread
317
        this.persistentWriter = new PersistentWriter(persistQueue, buffer, channel,
1✔
318
                this::abnormalPersistentWriterShutdown);
1✔
319
        this.persistentWriterThread = new Thread(blobStoreThreadGroup, persistentWriter,
1✔
320
                nameInstanceThread(database, "blob-store.persistent-writer"));
1✔
321
        persistentWriterThread.start();
1✔
322

323
        // startup the blob vacuum thread
324
        this.blobVacuum = new BlobVacuum(vacuumQueue);
1✔
325
        this.blobVacuumThread = new Thread(blobStoreThreadGroup, blobVacuum,
1✔
326
                nameInstanceThread(database, "blob-store.vacuum"));
1✔
327
        blobVacuumThread.start();
1✔
328

329
        // we are now open!
330
        state.set(State.OPEN);
1✔
331
    }
1✔
332

333
    @Override
334
    public void openForRecovery() throws IOException {
335
        openBlobStore(true);
1✔
336
        state.set(State.RECOVERY);
1✔
337
    }
1✔
338

339
    /**
340
     * Opens the BLOB Store's persistent store file,
341
     * and prepares the staging area.
342
     *
343
     * @param forRecovery true if the Blob Store is being opened for crash recovery, false otherwise
344
     *
345
     * @throws IOException if an error occurs whilst opening the BLOB Store
346
     */
347
    private void openBlobStore(final boolean forRecovery) throws IOException {
348
        if (state.get() == State.OPEN) {
1!
349
            if (forRecovery) {
×
350
                throw new IOException("BlobStore is already open!");
×
351
            } else {
352
                return;
×
353
            }
354
        }
355

356
        if (!state.compareAndSet(State.CLOSED, State.OPENING)) {
1!
357
            throw new IOException("BlobStore is not closed");
×
358
        }
359

360
        // size the buffer to hold a complete entry
361
        buffer = ByteBuffer.allocate(digestType.getDigestLengthBytes() + REFERENCE_COUNT_LEN);
1✔
362
        try {
363
            // open the dbx file
364
            if (Files.exists(persistentFile)) {
1✔
365
                if (!forRecovery) {
1✔
366
                    // compact existing blob store file and then open
367
                    this.references = compactPersistentReferences(buffer, persistentFile);
1✔
368
                    channel = Files.newByteChannel(persistentFile, WRITE);
1✔
369

370
                    /*
371
                     * We are not recovering, so we can delete any staging area left over
372
                     * from a previous running database instance
373
                     */
374
                    FileUtils.deleteQuietly(stagingDir);
1✔
375
                } else {
1✔
376
                    // recovery... so open the existing blob store file and just validate its header
377
                    channel = Files.newByteChannel(persistentFile, WRITE, READ);
1✔
378
                    validateFileHeader(buffer, persistentFile, channel);
1✔
379
                }
380
            } else {
1✔
381
                // open new blob store file
382
                if (forRecovery) {
1!
383
                    // we are trying to recover, but there is no existing Blob Store!
384
                    throw new FileNotFoundException("No Blob Store found at '"
×
385
                            + persistentFile.toAbsolutePath().toString() + "' to recover!");
×
386
                }
387

388
                references = new ConcurrentHashMap<>();
1✔
389
                channel = Files.newByteChannel(persistentFile, CREATE_NEW, WRITE);
1✔
390
                writeFileHeader(buffer, channel);
1✔
391
            }
392

393
            // create the staging directory if it does not exist
394
            Files.createDirectories(stagingDir);
1✔
395
        } catch (final IOException e) {
1✔
396
            if (channel != null) {
×
397
                try {
398
                    channel.close();
×
399
                } catch (final IOException ce) {
×
400
                    // ignore
401
                }
402
            }
403
            state.set(State.CLOSED);
×
404
            throw e;
×
405
        }
406
    }
1✔
407

408
    @Override
409
    public void close() throws IOException {
410
        // check the blob store is open
411
        if (state.get() == State.CLOSED) {
1!
412
            return;
×
413
        }
414

415
        if (state.compareAndSet(State.OPEN, State.CLOSING)) {
1✔
416

417
            // close up normally
418
            normalClose();
1✔
419

420
        } else if (state.compareAndSet(State.RECOVERY, State.CLOSING)) {
1!
421

422
            // close up after recovery was attempted
423
            closeAfterRecoveryAttempt();
1✔
424

425
        } else {
1✔
426
            throw new IOException("BlobStore is not open");
×
427
        }
428
    }
1✔
429

430
    /**
431
     * Closes the Blob Store.
432
     *
433
     * @throws IOException if an error occurs whilst closing the Blob Store
434
     */
435
    private void normalClose() throws IOException {
436
        try {
437
            // shutdown the persistent writer
438
            if (persistentWriter != null) {
1!
439
                persistQueue.put(PersistentWriter.POISON_PILL);
1✔
440
            }
441
            persistentWriterThread.join();
1✔
442

443
            // shutdown the vacuum
444
            if (blobVacuum != null) {
1!
445
                blobVacuumThread.interrupt();
1✔
446
            }
447
            blobVacuumThread.join();
1✔
448
        } catch (final InterruptedException e) {
1✔
449
            // Restore the interrupted status
450
            Thread.currentThread().interrupt();
×
451
            throw new IOException(e);
×
452
        } finally {
453
            closeBlobStore();
1✔
454

455
            // we are now closed!
456
            state.set(State.CLOSED);
1✔
457
        }
458
    }
1✔
459

460
    /**
461
     * Closes the Blob Store after it was opened for Recovery.
462
     */
463
    private void closeAfterRecoveryAttempt() {
464
        closeBlobStore();
1✔
465

466
        // we are now closed!
467
        state.set(State.CLOSED);
1✔
468
    }
1✔
469

470
    /**
471
     * Closes the resources associated
472
     * with the Blob Store persistent file.
473
     *
474
     * Clears the {@link #buffer} and closes the {@link #channel}.
475
     */
476
    private void closeBlobStore() {
477
        if (buffer != null) {
1!
478
            buffer.clear();
1✔
479
            buffer = null;
1✔
480
        }
481

482
        // close the file channel
483
        if (channel != null) {
1!
484
            try {
485
                channel.close();
1✔
486
                channel = null;
1✔
487
            } catch (final IOException e) {
1✔
488
                // non-critical error
489
                LOG.error("Error whilst closing blob.dbx: {}", e.getMessage(), e);
×
490
            }
491
        }
492
    }
1✔
493

494
    /**
495
     * Closes the BlobStore if the {@link #persistentWriter} has
496
     * to shutdown due to abnormal circumstances.
497
     *
498
     * Only called when the Blob Store is in the {@link State#OPEN} state!
499
     */
500
    private void abnormalPersistentWriterShutdown() {
501
        // check the blob store is open
502
        if (state.get() == State.CLOSED) {
×
503
            return;
×
504
        }
505
        if (!state.compareAndSet(State.OPEN, State.CLOSING)) {
×
506
            return;
×
507
        }
508

509
        try {
510

511
            // NOTE: persistent writer thread will join when this method finishes!
512

513
            // shutdown the vacuum
514
            if (blobVacuum != null) {
×
515
                blobVacuumThread.interrupt();
×
516
            }
517
            blobVacuumThread.join();
×
518

519
        } catch (final InterruptedException e) {
×
520
            // Restore the interrupted status
521
            Thread.currentThread().interrupt();
×
522
            LOG.error(e.getMessage(), e);
×
523
        } finally {
524
            closeBlobStore();
×
525

526
            // we are now closed!
527
            state.set(State.CLOSED);
×
528
        }
529
    }
×
530

531
    /**
532
     * Compacts an existing Blob Store file.
533
     *
534
     * Reads the existing Blob Store file and copies non zero reference
535
     * entries to a new Blob Store file. We call this compaction.
536
     * Once complete, the existing file is replaced with the new file.
537
     *
538
     * @param persistentFile an existing persistentFile to compact.
539
     *
540
     * @return An in-memory representation of the compacted Blob Store
541
     *
542
     * @throws IOException if an error occurs during compaction.
543
     */
544
    private ConcurrentMap<BlobId, BlobReference> compactPersistentReferences(final ByteBuffer buffer,
545
            final Path persistentFile) throws IOException {
546

547
        final ConcurrentMap<BlobId, BlobReference> compactReferences = new ConcurrentHashMap<>();
1✔
548
        final Path compactPersistentFile = persistentFile.getParent().resolve(
1✔
549
                persistentFile.getFileName() + ".new." + System.currentTimeMillis());
1✔
550

551
        // tracks the BlobIds of Blob Files which have been orphaned
552
        final Set<BlobId> orphanedBlobFileIds = new HashSet<>();
1✔
553

554
        try (final SeekableByteChannel channel = Files.newByteChannel(persistentFile, READ)) {
1✔
555

556
            validateFileHeader(buffer, persistentFile, channel);
1✔
557
            buffer.clear();
1✔
558

559
            try (final SeekableByteChannel compactChannel = Files.newByteChannel(compactPersistentFile,
1✔
560
                    CREATE_NEW, APPEND)) {
1✔
561

562
                writeFileHeader(buffer, compactChannel);
1✔
563

564
                buffer.clear();
1✔
565

566
                while (channel.read(buffer) > -1) {
1✔
567
                    final byte[] id = new byte[digestType.getDigestLengthBytes()];
1✔
568
                    buffer.flip();
1✔
569
                    buffer.get(id);
1✔
570
                    final BlobId blobId = new BlobId(id);
1✔
571
                    final int count = buffer.getInt();
1✔
572

573
                    if (count == 0) {
1✔
574
                        orphanedBlobFileIds.add(blobId);
1✔
575
                    } else {
1✔
576
                        orphanedBlobFileIds.remove(blobId);
1✔
577

578
                        compactReferences.put(blobId, new BlobReference(count, compactChannel.position()));
1✔
579

580
                        buffer.flip();
1✔
581
                        compactChannel.write(buffer);
1✔
582
                    }
583

584
                    buffer.clear();
1✔
585
                }
586
            }
587
        }
588

589
        // cleanup any orphaned Blob files
590
        for (final BlobId orphanedBlobFileId : orphanedBlobFileIds) {
1✔
591
            deleteBlob(blobDir, orphanedBlobFileId, false);
1✔
592
        }
593

594
        // replace the persistent file with the new compact persistent file
595
        Files.move(compactPersistentFile, persistentFile, ATOMIC_MOVE, REPLACE_EXISTING);
1✔
596

597
        return compactReferences;
1✔
598
    }
599

600
    /**
601
     * Writes the persistent file header
602
     *
603
     * @param buffer a byte buffer to use
604
     * @param channel the channel to write to
605
     * @return the number of bytes written.
606
     * @throws IOException if an error occurs whilst writing the header.
607
     */
608
    private long writeFileHeader(final ByteBuffer buffer, final SeekableByteChannel channel) throws IOException {
609
        final long start = channel.position();
1✔
610

611
        buffer.clear();
1✔
612
        writeFileHeader(buffer);
1✔
613

614
        buffer.flip();
1✔
615
        buffer.limit(BLOB_STORE_HEADER_LEN);
1✔
616
        channel.write(buffer);
1✔
617

618
        return channel.position() - start;
1✔
619
    }
620

621
    /**
622
     * Writes the persistent file header
623
     *
624
     * @param buffer the buffer to write to
625
     */
626
    private static void writeFileHeader(final ByteBuffer buffer) {
627
        buffer.put(BLOB_STORE_MAGIC_NUMBER);
1✔
628
        buffer.putShort(BLOB_STORE_VERSION);
1✔
629
    }
1✔
630

631
    /**
632
     * Validates the persistent file header.
633
     *
634
     * @param buffer a byte buffer to use
635
     * @param file the file containing the header.
636
     * @param channel the channel of the file to read from.
637
     *
638
     * @throws IOException if the header is invalid.
639
     */
640
    private void validateFileHeader(final ByteBuffer buffer, final Path file, final SeekableByteChannel channel)
641
            throws IOException {
642
        buffer.clear();
1✔
643
        buffer.limit(BLOB_STORE_HEADER_LEN);
1✔
644

645
        channel.read(buffer);
1✔
646

647
        buffer.flip();
1✔
648

649
        final boolean validMagic =
1✔
650
                buffer.get() == BLOB_STORE_MAGIC_NUMBER[0]
1!
651
                        && buffer.get() == BLOB_STORE_MAGIC_NUMBER[1]
1!
652
                        && buffer.get() == BLOB_STORE_MAGIC_NUMBER[2]
1!
653
                        && buffer.get() == BLOB_STORE_MAGIC_NUMBER[3];
1!
654

655
        if (!validMagic) {
1!
656
            throw new IOException("File was not recognised as a valid Elemental Blob Store: "
×
657
                    + file.toAbsolutePath().toString());
×
658
        }
659

660
        // check the version of the blob store format
661
        final short storedVersion = buffer.getShort();
1✔
662
        final boolean validVersion =
1✔
663
                storedVersion == BLOB_STORE_VERSION;
1!
664

665
        if (!validVersion) {
1!
666
            throw new IOException("Blob Store file was version " + storedVersion + ", but required version "
×
667
                    + BLOB_STORE_VERSION + ": " + file.toAbsolutePath().toString());
×
668
        }
669
    }
1✔
670

671
    @Override
672
    public Tuple2<BlobId, Long> add(final Txn transaction, final InputStream is) throws IOException {
673
        if (state.get() != State.OPEN) {
1!
674
            throw new IOException("Blob Store is not open!");
×
675
        }
676

677
        // stage the BLOB file
678
        final Tuple3<Path, Long, MessageDigest> staged = stage(is);
1✔
679

680
        final BlobVacuum.RequestDeleteStagedBlobFile requestDeleteStagedBlobFile =
1✔
681
                new BlobVacuum.RequestDeleteStagedBlobFile(stagingDir, staged._1.getFileName().toString());
1✔
682

683
        // register a callback to cleanup the staged BLOB file ONLY after commit+checkpoint
684
        final JournalManager journalManager = database.getJournalManager().orElse(null);
1✔
685
        if (journalManager != null) {
1✔
686
            final DeleteStagedBlobFile cleanupStagedBlob = new DeleteStagedBlobFile(vacuumQueue, requestDeleteStagedBlobFile);
1✔
687
            journalManager.listen(cleanupStagedBlob);
1✔
688
            transaction.registerListener(cleanupStagedBlob);
1✔
689
        }
690

691
        final BlobId blobId = new BlobId(staged._3.getValue());
1✔
692

693
        // if the blob entry does not exist, we exclusively compute it as STAGED.
694
        BlobReference blobReference = references.computeIfAbsent(blobId, k -> new BlobReference(STAGED));
1✔
695

696
        try {
697
            while (true) {
698

699
                if (blobReference.count.compareAndSet(STAGED, PROMOTING)) {
1✔
700
                    // NOTE: we are the only thread that can be in this branch for the blobId
701

702
                    // write journal entries to the WAL
703
                    if (journalManager != null) {
1✔
704
                        try {
705
                            journalManager.journal(new StoreBlobFileLoggable(transaction.getId(), blobId, staged._1.getFileName().toString()));
1✔
706
                            journalManager.journal(new UpdateBlobRefCountLoggable(transaction.getId(), blobId, 0, 1));
1✔
707
                            journalManager.flush(true, true);   // force WAL entries to disk!
1✔
708
                        } catch (final JournalException e) {
1✔
709
                            references.remove(blobId);
×
710
                            throw new IOException(e);
×
711
                        }
712
                    }
713

714
                    // promote the staged blob
715
                    promote(staged);
1✔
716
                    if (journalManager == null) {
1✔
717
                        // no journal (or recovery)... so go ahead and schedule cleanup of the staged blob file
718
                        enqueueVacuum(vacuumQueue, requestDeleteStagedBlobFile);
1✔
719
                    }
720

721
                    // schedule disk persist of the new value
722
                    persistQueue.put(Tuple(blobId, blobReference, 1));
1✔
723

724
                    // update memory with the new value
725
                    blobReference.count.set(1);
1✔
726

727
                    // done!
728
                    return Tuple(blobId, staged._2);
1✔
729
                }
730

731
                final int count = blobReference.count.get();
1✔
732

733
                // guard against a concurrent #add or #remove
734
                if (count == PROMOTING || count == UPDATING_COUNT) {
1!
735
                    // spin whilst another thread promotes the blob, or updates the reference count
736
                    // sleep a small time to save CPU
737
                    Thread.sleep(10);
×
738
                    continue;
×
739
                }
740

741
                // guard against a concurrent vacuum operation
742
                // ...retry the blob reference until the vacuum has completed!
743
                // i.e. wait for the deletion of the blob to complete, and then we can add the blob again
744
                if (count == DELETING) {
1!
745
                    blobReference = references.computeIfAbsent(blobId, k -> new BlobReference(STAGED));
×
746
                    continue;   // loop again
×
747
                }
748

749
                // only increment the blob reference if the blob is active!
750
                if (count >= 0 && blobReference.count.compareAndSet(count, UPDATING_COUNT)) {
1!
751
                    // NOTE: we are the only thread that can be in this branch for the blobId
752

753
                    final int newCount = count + 1;
1✔
754

755
                    // write journal entries to the WAL
756
                    if (journalManager != null) {
1✔
757
                        try {
758
                            journalManager.journal(new UpdateBlobRefCountLoggable(transaction.getId(), blobId, count, newCount));
1✔
759
                            journalManager.flush(true, true);   // force WAL entries to disk!
1✔
760
                        } catch (final JournalException e) {
1✔
761
                            // restore the state of the blobReference first!
762
                            blobReference.count.set(count);
×
763
                            throw new IOException(e);
×
764
                        }
765
                    }
766

767
                    // persist the new value
768
                    persistQueue.put(Tuple(blobId, blobReference, newCount));
1✔
769

770
                    // update memory with the new value, and release other spinning threads
771
                    blobReference.count.set(newCount);
1✔
772

773
                    // done!
774
                    return Tuple(blobId, staged._2);
1✔
775
                }
776
            }
777
        } catch (final InterruptedException e) {
×
778
            // thrown by persistQueue.put or Thread.sleep
779
            Thread.currentThread().interrupt();
×
780
            throw new IOException(e);
×
781
        }
782
    }
783

784
    @Override
785
    public BlobId copy(final Txn transaction, final BlobId blobId) throws IOException {
786
        if (state.get() != State.OPEN) {
1!
787
            throw new IOException("Blob Store is not open!");
×
788
        }
789

790
        final BlobReference blobReference = references.get(blobId);
1✔
791
        if (blobReference == null) {
1!
792
            return null;
×
793
        }
794

795
        // NOTE: that copy is simply an increment of the reference count!
796
        try {
797
            while (true) {
798

799
                final int count = blobReference.count.get();
1✔
800

801
                // guard against a concurrent #add or #remove
802
                if (count == STAGED || count == PROMOTING || count == UPDATING_COUNT) {
1!
803
                    // spin whilst another thread promotes the blob, or updates the reference count
804
                    // sleep a small time to save CPU
805
                    Thread.sleep(10);
×
806
                    continue;
×
807
                }
808

809
                // guard against a concurrent vacuum operation
810
                if (count == DELETING) {
1!
811
                    return null;
×
812
                }
813

814
                // only increment the blob reference if the blob is active!
815
                if (count >= 0 && blobReference.count.compareAndSet(count, UPDATING_COUNT)) {
1!
816
                    // NOTE: we are the only thread that can be in this branch for the blobId
817

818
                    final int newCount = count + 1;
1✔
819

820
                    // write journal entries to the WAL
821
                    final JournalManager journalManager = database.getJournalManager().orElse(null);
1✔
822
                    if (journalManager != null) {
1✔
823
                        try {
824
                            journalManager.journal(new UpdateBlobRefCountLoggable(transaction.getId(), blobId, count, newCount));
1✔
825
                            journalManager.flush(true, true);   // force WAL entries to disk!
1✔
826
                        } catch (final JournalException e) {
1✔
827
                            // restore the state of the blobReference first!
828
                            blobReference.count.set(count);
×
829
                            throw new IOException(e);
×
830
                        }
831
                    }
832

833
                    // persist the new value
834
                    persistQueue.put(Tuple(blobId, blobReference, newCount));
1✔
835

836
                    // update memory with the new value, and release other spinning threads
837
                    blobReference.count.set(newCount);
1✔
838

839
                    // done!
840
                    return blobId;
1✔
841
                }
842
            }
843
        } catch (final InterruptedException e) {
×
844
            // thrown by persistQueue.put or Thread.sleep
845
            Thread.currentThread().interrupt();
×
846
            throw new IOException(e);
×
847
        }
848
    }
849

850
    @Override
851
    @Nullable public InputStream get(final Txn transaction, final BlobId blobId) throws IOException {
852
        final BlobFileLease blobFileLease = readLeaseBlobFile(transaction, blobId);
1✔
853
        if (blobFileLease == null) {
1✔
854
            return null;
1✔
855
        }
856

857
        // blob file lease is released either when the input stream is closed, or if an error occurs opening the stream
858
        try {
859
            return new OnCloseInputStream(Files.newInputStream(blobFileLease.path), blobFileLease.release);
1✔
860
        } catch (final IOException e) {
×
861
            blobFileLease.release.run();  // MUST release the read lease!
×
862
            throw e;
×
863
        }
864
    }
865

866
    @Override
867
    @Nullable public MessageDigest getDigest(final Txn transaction, final BlobId blobId, final DigestType digestType)
868
            throws IOException {
869
        if (this.digestType.equals(digestType)) {
1✔
870
            // optimisation, we can just return the BlobId as that is the digest for this digest type!
871
            return new MessageDigest(digestType, blobId.getId());
1✔
872

873
        } else {
874
            // calculate the digest
875
            final StreamableDigest streamableDigest = digestType.newStreamableDigest();
1✔
876
            final Try<MessageDigest, IOException> result = with(transaction, blobId, maybeBlobFile ->
1✔
877
                    maybeBlobFile == null ? null :
1!
878
                            TaggedTryUnchecked(IOException.class, () -> {
1✔
879
                                FileUtils.digest(maybeBlobFile, streamableDigest);
1✔
880
                                return new MessageDigest(streamableDigest.getDigestType(),
1✔
881
                                        streamableDigest.getMessageDigest());
1✔
882
                            })
1✔
883
            );
884
            return result.get();
1✔
885
        }
886
    }
887

888
    @Override
889
    public <T> T with(final Txn transaction, final BlobId blobId, final Function<Path, T> fnFile) throws IOException {
890
        final BlobFileLease blobFileLease = readLeaseBlobFile(transaction, blobId);
1✔
891
        try {
892
            return fnFile.apply(blobFileLease == null ? null : blobFileLease.path);
1!
893
        } finally {
894
            if (blobFileLease != null) {
1!
895
                blobFileLease.release.run();  // MUST release the read lease!
1✔
896
            }
897
        }
898
    }
899

900
    /**
901
     * Lease a Blob file for reading from the Blob Store.
902
     *
903
     * @param transaction the current database transaction.
904
     * @param blobId the identifier of the blob to lease the blob file from.
905
     *
906
     * @return the blob file lease, or null if the blob does not exist in the Blob Store
907
     *
908
     * @throws IOException if an error occurs whilst retrieving the BLOB file.
909
     */
910
    private BlobFileLease readLeaseBlobFile(final Txn transaction, final BlobId blobId) throws IOException {
911
        if (state.get() != State.OPEN) {
1!
912
            throw new IOException("Blob Store is not open!");
×
913
        }
914

915
        final BlobReference blobReference = references.get(blobId);
1✔
916
        if (blobReference == null) {
1✔
917
            return null;
1✔
918
        }
919

920
        try {
921
            while (true) {
922
                final int count = blobReference.count.get();
1✔
923

924
                if (count == 0) {
1✔
925
                    // can't return something with has zero references
926
                    return null;
1✔
927
                }
928

929
                // guard against a concurrent vacuum operation
930
                if (count == DELETING) {
1✔
931
                    // can't return something with has zero references (because it is being deleted)
932
                    return null;
1✔
933
                }
934

935
                // guard against a concurrent #add doing staging
936
                if (count == STAGED || count == PROMOTING) {
1!
937
                    // spin whilst another thread promotes the blob
938
                    // sleep a small time to save CPU
939
                    Thread.sleep(10);
×
940
                    continue;
×
941
                }
942

943
                // read a blob which has references
944
                if (count > 0) {
1!
945
                    // we are reading
946
                    blobReference.readers.incrementAndGet();
1✔
947

948
                    // get the blob
949
                    final Path blobFile = blobDir.resolve(bytesToHex(blobId.getId()));
1✔
950
                    return new BlobFileLease(blobFile, blobReference.readers::decrementAndGet);
1✔
951
                }
952
            }
953
        } catch (final InterruptedException e) {
×
954
            // only thrown by Thread.sleep above
955
            Thread.currentThread().interrupt();
×
956
            throw new IOException(e);
×
957
        }
958
    }
959

960
    /**
961
     * Gets the reference count for the Blob
962
     *
963
     * NOTE: this method is not thread-safe and should ONLY
964
     * be used for testing, which is why this method is
965
     * marked package-private!
966
     *
967
     * @param blobId The id of the blob
968
     *
969
     * @return the reference count, or null if the blob id is not in the references table.
970
     *
971
     * @throws IOException if the BlobStore is not open.
972
     */
973
    @Nullable Integer getReferenceCount(final BlobId blobId) throws IOException {
974
        if (state.get() != State.OPEN) {
1!
975
            throw new IOException("Blob Store is not open!");
×
976
        }
977

978
        final BlobReference blobReference = references.get(blobId);
1✔
979
        if (blobReference == null) {
1✔
980
            return null;
1✔
981
        }
982
        return blobReference.count.get();
1✔
983
    }
984

985
    @Override
986
    public void remove(final Txn transaction, final BlobId blobId) throws IOException {
987
        if (state.get() != State.OPEN) {
1!
988
            throw new IOException("Blob Store is not open!");
×
989
        }
990

991
        final BlobReference blobReference = references.get(blobId);
1✔
992
        if (blobReference == null) {
1!
993
            return;
×
994
        }
995

996
        try {
997
            while (true) {
998
                final int count = blobReference.count.get();
1✔
999

1000
                if (count == 0) {
1!
1001
                    // can't remove something which has zero references
1002
                    return;
×
1003
                }
1004

1005
                // guard against a concurrent vacuum operation
1006
                if (count == DELETING) {
1!
1007
                    // can't remove something which has zero references (because it is being deleted)
1008
                    return;
×
1009
                }
1010

1011
                // guard against a concurrent #add or #remove
1012
                if (count == STAGED || count == PROMOTING || count == UPDATING_COUNT) {
1!
1013
                    // spin whilst another thread promotes the blob or updates the reference count
1014
                    // sleep a small time to save CPU
1015
                    Thread.sleep(10);
×
1016
                    continue;
×
1017
                }
1018

1019
                // only decrement the blob reference if the blob has more than zero references
1020
                if (count > 0 && blobReference.count.compareAndSet(count, UPDATING_COUNT)) {
1!
1021
                    // NOTE: we are the only thread that can be in this branch for the blobId
1022

1023
                    final int newCount = count - 1;
1✔
1024

1025
                    // write journal entries to the WAL
1026
                    final JournalManager journalManager = database.getJournalManager().orElse(null);
1✔
1027
                    if (journalManager != null) {
1✔
1028
                        try {
1029
                            journalManager.journal(new UpdateBlobRefCountLoggable(transaction.getId(), blobId, count, newCount));
1✔
1030
                            journalManager.flush(true, true);   // force WAL entries to disk!
1✔
1031
                        } catch (final JournalException e) {
1✔
1032
                            // restore the state of the blobReference first!
1033
                            blobReference.count.set(count);
×
1034
                            throw new IOException(e);
×
1035
                        }
1036
                    }
1037

1038
                    // schedule disk persist of the new value
1039
                    persistQueue.put(Tuple(blobId, blobReference, newCount));
1✔
1040

1041
                    if (newCount == 0) {
1✔
1042
                        // schedule blob file for vacuum.
1043

1044
                        final BlobVacuum.RequestDeleteBlobFile requestDeleteBlobFile =
1✔
1045
                                new BlobVacuum.RequestDeleteBlobFile(references, blobDir, blobId, blobReference);
1✔
1046

1047
                        if (journalManager != null) {
1✔
1048
                            // register a callback to schedule the BLOB file for vacuum ONLY after commit+checkpoint
1049
                            final ScheduleDeleteBlobFile scheduleDeleteBlobFile = new ScheduleDeleteBlobFile(
1✔
1050
                                    vacuumQueue, requestDeleteBlobFile);
1✔
1051
                            journalManager.listen(scheduleDeleteBlobFile);
1✔
1052
                            transaction.registerListener(scheduleDeleteBlobFile);
1✔
1053
                        } else {
1✔
1054
                            // no journal (or recovery)... so go ahead and schedule
1055
                            enqueueVacuum(vacuumQueue, requestDeleteBlobFile);
1✔
1056
                        }
1057
                    }
1058

1059
                    // update memory with the new value, and release other spinning threads
1060
                    blobReference.count.set(newCount);
1✔
1061

1062
                    // done!
1063
                    return;
1✔
1064
                }
1065
            }
1066
        } catch (final InterruptedException e) {
×
1067
            // thrown by persistQueue.put or Thread.sleep
1068
            Thread.currentThread().interrupt();
×
1069
            throw new IOException(e);
×
1070
        }
1071
    }
1072

1073
    @Override
1074
    public void backupToArchive(final RawDataBackup backup) throws IOException {
1075
        if (state.get() != State.OPEN) {
1!
1076
            throw new IOException("Blob Store is not open!");
×
1077
        }
1078

1079
        // TODO(AR) should we enter an exclusive backup state?
1080

1081
        // NOTE: do not use try-with-resources here, closing the OutputStream will close the entire backup
1082

1083
        // backup the blob.dbx
1084
        try {
1085
            final OutputStream os = backup.newEntry(fileName(persistentFile));
1✔
1086
            Files.copy(persistentFile, os);
1✔
1087
        } finally {
1✔
1088
            backup.closeEntry();
1✔
1089
        }
1090

1091
        // backup the blob files
1092
        for (final Path blobFile : FileUtils.list(blobDir, Files::isRegularFile)) {
1!
1093
            try {
1094
                final OutputStream os = backup.newEntry(fileName(blobDir) + '/' + fileName(blobFile));
×
1095
                Files.copy(persistentFile, os);
×
1096
            } finally {
×
1097
                backup.closeEntry();
×
1098
            }
1099
        }
1100

1101
        // backup the staging area
1102
        for (final Path blobFile : FileUtils.list(stagingDir, Files::isRegularFile)) {
1!
1103
            try {
1104
                final OutputStream os = backup.newEntry(fileName(blobDir) + '/' + fileName(stagingDir) + '/' + fileName(blobFile));
×
1105
                Files.copy(persistentFile, os);
×
1106
            } finally {
×
1107
                backup.closeEntry();
×
1108
            }
1109
        }
1110
    }
1✔
1111

1112
    @Override
1113
    public void redo(final BlobLoggable blobLoggable) throws LogException {
1114
        try {
1115
            if (blobLoggable instanceof StoreBlobFileLoggable storeBlobFileLoggable) {
1✔
1116
                redoStoreBlobFile(storeBlobFileLoggable.getBlobId(), storeBlobFileLoggable.getStagedUuid());
1✔
1117

1118
            } else if (blobLoggable instanceof UpdateBlobRefCountLoggable updateBlobRefCountLoggable) {
1!
1119
                updateBlogRefCount(updateBlobRefCountLoggable.getBlobId(), updateBlobRefCountLoggable.getNewCount());
1✔
1120
            }
1121
        } catch (final IOException e) {
1✔
1122
            throw new LogException(e.getMessage(), e);
×
1123
        }
1124
    }
1✔
1125

1126
    @Override
1127
    public void undo(final BlobLoggable blobLoggable) throws LogException {
1128
        try {
1129
            if (blobLoggable instanceof StoreBlobFileLoggable storeBlobFileLoggable) {
1✔
1130
                undoStoreBlobFile(storeBlobFileLoggable.getBlobId(), storeBlobFileLoggable.getStagedUuid());
1✔
1131

1132
            } else if (blobLoggable instanceof UpdateBlobRefCountLoggable updateBlobRefCountLoggable) {
1!
1133
                updateBlogRefCount(updateBlobRefCountLoggable.getBlobId(), updateBlobRefCountLoggable.getCurrentCount());
1✔
1134
            }
1135
        } catch (final IOException e) {
1✔
1136
            throw new LogException(e.getMessage(), e);
×
1137
        }
1138
    }
1✔
1139

1140
    /**
1141
     * Recovery - redo: Promotes the Staged Blob File after performing some checks.
1142
     *
1143
     * This is possible because the Staged Blob file is not
1144
     * removed until a checkpoint is written AFTER the transaction
1145
     * was committed.
1146
     *
1147
     * @param blobId the blobId
1148
     * @param stagedUuid The uuid of the staged blob file.
1149
     *
1150
     * @throws IOException if the staged blob file cannot be promoted
1151
     */
1152
    private void redoStoreBlobFile(final BlobId blobId, final String stagedUuid) throws IOException {
1153
        final Path stagedBlobFile = stagingDir.resolve(stagedUuid);
1✔
1154

1155
        // check the staged file exists
1156
        if (!Files.exists(stagedBlobFile)) {
1!
1157
            throw new IOException("Staged Blob File does not exist: " + stagedBlobFile.toAbsolutePath());
×
1158
        }
1159

1160
        // check the staged file has the correct checksum
1161
        final StreamableDigest streamableDigest = digestType.newStreamableDigest();
1✔
1162
        FileUtils.digest(stagedBlobFile, streamableDigest);
1✔
1163
        final String blobFilename = bytesToHex(blobId.getId());
1✔
1164
        if (!Arrays.equals(blobId.getId(), streamableDigest.getMessageDigest())) {
1!
1165
            throw new IOException("Staged Blob File checksum '"
×
1166
                    + bytesToHex(streamableDigest.getMessageDigest()) + "', does not match checksum of blobId ''"
×
1167
                    + blobFilename + "'");
×
1168
        }
1169

1170
        final Path blobFile = blobDir.resolve(blobFilename);
1✔
1171

1172
        Files.copy(stagedBlobFile, blobFile, REPLACE_EXISTING);
1✔
1173
    }
1✔
1174

1175
    /**
1176
     * Recovery - undo: Demotes the Blob File back to the staging area after performing some checks.
1177
     *
1178
     * @param blobId the blobId
1179
     * @param stagedUuid The uuid of the staged blob file.
1180
     *
1181
     * @throws IOException if the blob file cannot be demoted to the staging area
1182
     */
1183
    private void undoStoreBlobFile(final BlobId blobId, final String stagedUuid) throws IOException {
1184
        final String blobFilename = bytesToHex(blobId.getId());
1✔
1185
        final Path blobFile = blobDir.resolve(blobFilename);
1✔
1186

1187
        // check the blob file exists
1188
        if (!Files.exists(blobFile)) {
1!
1189
            throw new IOException("Blob File does not exist: " + blobFile.toAbsolutePath());
×
1190
        }
1191

1192
        final Path stagedBlobFile = stagingDir.resolve(stagedUuid);
1✔
1193

1194
        Files.copy(blobFile, stagedBlobFile, REPLACE_EXISTING);
1✔
1195
    }
1✔
1196

1197
    /**
1198
     * Recovery - redo/undo: sets the reference count of a blob.
1199
     *
1200
     * @param blobId the blobId
1201
     * @param count The reference count to set.
1202
     *
1203
     * @throws IOException if the blob's reference count cannot be set
1204
     */
1205
    private void updateBlogRefCount(final BlobId blobId, final int count) throws IOException {
1206
        buffer.clear();
1✔
1207
        buffer.limit(digestType.getDigestLengthBytes());  // we are only going to read the BlobIds
1✔
1208

1209
        // start immediately after the file header
1210
        channel.position(BLOB_STORE_HEADER_LEN);
1✔
1211

1212
        boolean updatedCount = false;
1✔
1213

1214
        while (channel.read(buffer) > 0) {
1✔
1215
            buffer.flip();
1✔
1216
            final byte[] id = new byte[digestType.getDigestLengthBytes()];
1✔
1217
            buffer.get(id);
1✔
1218
            final BlobId readBlobId = new BlobId(id);
1✔
1219

1220
            if (blobId.equals(readBlobId)) {
1✔
1221

1222
                buffer.clear();
1✔
1223
                buffer.limit(REFERENCE_COUNT_LEN);
1✔
1224
                buffer.putInt(count);
1✔
1225
                buffer.flip();
1✔
1226

1227
                channel.write(buffer);
1✔
1228

1229
                updatedCount = true;
1✔
1230

1231
                break;
1✔
1232
            }
1233

1234
            // skip over the reference count
1235
            channel.position(channel.position() + REFERENCE_COUNT_LEN);
1✔
1236
        }
1237

1238
        /*
1239
         * If we could not update the count of an existing entry, append a new entry to the end of the file.
1240
         * We even include those entries with count = 0, so that their blob files will be cleared up by
1241
         * the next call to compactPersistentReferences
1242
         */
1243
        if (!updatedCount) {
1✔
1244
            buffer.clear();
1✔
1245
            buffer.put(blobId.getId());
1✔
1246
            buffer.putInt(count);
1✔
1247

1248
            buffer.flip();
1✔
1249

1250
            channel.write(buffer);
1✔
1251
        }
1252
    }
1✔
1253

1254
    /**
1255
     * Stages a BLOB file.
1256
     *
1257
     * Writes a BLOB to a file in the Blob Store staging area.
1258
     *
1259
     * @param is data stream for the BLOB.
1260
     * @return The file path, length and checksum of the staged BLOB
1261
     * @throws IOException if an error occurs whilst staging the BLOB.
1262
     */
1263
    private Tuple3<Path, Long, MessageDigest> stage(final InputStream is) throws IOException {
1264
        final Path stageFile = stagingDir.resolve(UUIDGenerator.getUUIDversion4());
1✔
1265
        final CountingInputStream cis = new CountingInputStream(is);
1✔
1266
        final StreamableDigest streamableDigest = digestType.newStreamableDigest();
1✔
1267
        final DigestInputStream dis = new DigestInputStream(cis, streamableDigest);
1✔
1268

1269
        Files.copy(dis, stageFile);
1✔
1270

1271
        return Tuple(stageFile, cis.getByteCount(), streamableDigest.copyMessageDigest());
1✔
1272
    }
1273

1274
    /**
1275
     * Promotes a staged BLOB file to the BLOB store.
1276
     *
1277
     * Copies a staged BLOB file in the Blob Store staging area to
1278
     * the live Blob Store.
1279
     *
1280
     * The staged BLOB will be removed as part of the Journalling
1281
     * and Recovery.
1282
     *
1283
     * @param staged the staged BLOB.
1284
     * @throws IOException if an error occurs whilst promoting the BLOB.
1285
     */
1286
    private void promote(final Tuple3<Path, Long, MessageDigest> staged) throws IOException {
1287
        Files.copy(staged._1, blobDir.resolve(staged._3.toHexString()), REPLACE_EXISTING);
1✔
1288
    }
1✔
1289

1290
    /**
1291
     * Deletes a BLOB file from the Blob Store.
1292
     *
1293
     * @param blobDir the blob directory.
1294
     * @param blobId the identifier of the BLOB file to delete.
1295
     * @param always true if we should always be able to delete the file,
1296
     *     false if the file may not exist.
1297
     *
1298
     * @throws IOException if the file cannot be deleted, for example if {@code always}
1299
     *                     is set to true and the BLOB does not exist.
1300
     */
1301
    private static void deleteBlob(final Path blobDir, final BlobId blobId, final boolean always) throws IOException {
1302
        final Path blobFile = blobDir.resolve(bytesToHex(blobId.getId()));
1✔
1303
        if (always) {
1✔
1304
            Files.delete(blobFile);
1✔
1305
        } else {
1✔
1306
            Files.deleteIfExists(blobFile);
1✔
1307
        }
1308
    }
1✔
1309

1310
    /**
1311
     * Value class which represents the reference
1312
     * count for a blob, the number of active readers,
1313
     * and the offset of its entry in the persistent file.
1314
     */
1315
    static class BlobReference {
1316
        static final int DELETING = -4;
1317
        static final int STAGED = -3;
1318
        static final int PROMOTING = -2;
1319
        static final int UPDATING_COUNT = -1;
1320

1321
        final AtomicInteger count;
1322
        final AtomicInteger readers = new AtomicInteger();
1✔
1323

1324
        static final long NOT_PERSISTED = -1;
1325

1326
        /**
1327
         * Is only read and written from a single
1328
         * thread in {@link PersistentWriter}
1329
         * so no synchronization needed.
1330
         */
1331
        long persistentOffset = NOT_PERSISTED;
1✔
1332

1333
        /**
1334
         * Construct a new Blob Reference which has not yet
1335
         * been persisted.
1336
         *
1337
         * @param count the reference count
1338
         *
1339
         * The persistentOffset will be set to {@link #NOT_PERSISTED}
1340
         */
1341
        public BlobReference(final int count) {
1✔
1342
            this.count = new AtomicInteger(count);
1✔
1343
        }
1✔
1344

1345
        /**
1346
         * Construct a new Blob Reference to a persistent
1347
         * blob.
1348
         *
1349
         * @param count the reference count
1350
         * @param persistentOffset the offset of the blob reference in the persistent file
1351
         */
1352
        public BlobReference(final int count, final long persistentOffset) {
1✔
1353
            this.count = new AtomicInteger(count);
1✔
1354
            this.persistentOffset = persistentOffset;
1✔
1355
        }
1✔
1356
    }
1357

1358
    /**
1359
     * Value class which represents a lease
1360
     * of a Blob's file.
1361
     */
1362
    private static class BlobFileLease {
1363
        final Path path;
1364
        final Runnable release;
1365

1366
        /**
1367
         * @param path the blob file
1368
         * @param release the action to run to release the lease
1369
         */
1370
        public BlobFileLease(final Path path, final Runnable release) {
1✔
1371
            this.path = path;
1✔
1372
            this.release = release;
1✔
1373
        }
1✔
1374
    }
1375

1376
    /**
1377
     * A FilterInputStream which executes an action when
1378
     * the underlying stream is closed.
1379
     */
1380
    public static class OnCloseInputStream extends FilterInputStream {
1381
        private final Runnable closeAction;
1382

1383
        /**
1384
         * Ensures that the close action is only executed once.
1385
         */
1386
        private final AtomicBoolean closed = new AtomicBoolean(false);
1✔
1387

1388
        /**
1389
         * @param in  An input stream.
1390
         * @param closeAction an action to run after the stream is closed.
1391
         */
1392
        public OnCloseInputStream(final InputStream in, final Runnable closeAction) {
1393
            super(in);
1✔
1394
            this.closeAction = closeAction;
1✔
1395
        }
1✔
1396

1397
        @Override
1398
        public int read(final byte[] b) throws IOException {
1399
            return in.read(b);
1✔
1400
        }
1401

1402
        /**
1403
         * First, closes the underlying Input Stream
1404
         * by calling {@code super#close()} and then
1405
         * always executes the {@link #closeAction}.
1406
         *
1407
         * This method is idempotent, which is to say that
1408
         * the operation will only be
1409
         * applied once.
1410
         *
1411
         * @throws IOException if an I/O error occurs.
1412
         */
1413
        @Override
1414
        public void close() throws IOException {
1415
            if (closed.compareAndSet(false, true)) {
1✔
1416
                try {
1417
                    super.close();
1✔
1418
                } finally {
1✔
1419
                    closeAction.run();
1✔
1420
                }
1421
            }
1422
        }
1✔
1423
    }
1424

1425
    /**
1426
     * A Journal and Transaction listener which will execute an action only
1427
     * after the transaction has been completed (aborted or committed) and
1428
     * a checkpoint has been written.
1429
     */
1430
    private static abstract class CommitThenCheckpointListener implements TxnListener, JournalManager.JournalListener {
1✔
1431
        // written from single-thread, read from multiple threads
1432
        private volatile boolean committedOrAborted = false;
1✔
1433

1434
        @Override
1435
        public void commit() {
1436
            committedOrAborted = true;
1✔
1437
        }
1✔
1438

1439
        @Override
1440
        public void abort() {
1441
            committedOrAborted = true;
1✔
1442
        }
1✔
1443

1444
        @Override
1445
        public boolean afterCheckpoint(final long txnId) {
1446
            if (!committedOrAborted) {
×
1447
                /*
1448
                 * we have not yet/committed or aborted
1449
                 * so keep receiving checkpoint events!
1450
                 */
1451
                return true;
×
1452
            }
1453

1454
            execute();
×
1455

1456
            return false;
×
1457
        }
1458

1459
        /**
1460
         * Called after the transaction has completed
1461
         * and a checkpoint has been written.
1462
         */
1463
        public abstract void execute();
1464
    }
1465

1466
    /**
1467
     * Deletes a staged Blob file once the transaction that promoted it has
1468
     * completed and a checkpoint has been written.
1469
     */
1470
    private static class DeleteStagedBlobFile extends CommitThenCheckpointListener {
1471
        private final BlockingQueue<BlobVacuum.Request> vacuumQueue;
1472
        private final BlobVacuum.RequestDeleteStagedBlobFile requestDeleteStagedBlobFile;
1473

1474
        /**
1475
         * @param vacuumQueue the vacuum queue.
1476
         * @param requestDeleteStagedBlobFile the request to delete the staged blob file.
1477
         */
1478
        public DeleteStagedBlobFile(final BlockingQueue<BlobVacuum.Request> vacuumQueue, final BlobVacuum.RequestDeleteStagedBlobFile requestDeleteStagedBlobFile) {
1✔
1479
            this.vacuumQueue = vacuumQueue;
1✔
1480
            this.requestDeleteStagedBlobFile = requestDeleteStagedBlobFile;
1✔
1481
        }
1✔
1482

1483
        @Override
1484
        public void execute() {
1485
            enqueueVacuum(vacuumQueue, requestDeleteStagedBlobFile);
×
1486
        }
×
1487
    }
1488

1489
    /**
1490
     * Schedules a Blob File for deletion once the transaction that removed it
1491
     * has completed and a checkpoint has been written.
1492
     */
1493
    private static class ScheduleDeleteBlobFile extends CommitThenCheckpointListener {
1494
        private final BlockingQueue<BlobVacuum.Request> vacuumQueue;
1495
        private final BlobVacuum.RequestDeleteBlobFile requestDeleteBlobFile;
1496

1497
        /**
1498
         * @param vacuumQueue the vacuum queue.
1499
         * @param requestDeleteBlobFile the request to delete the blob file.
1500
         */
1501
        public ScheduleDeleteBlobFile(final BlockingQueue<BlobVacuum.Request> vacuumQueue,
1✔
1502
                final BlobVacuum.RequestDeleteBlobFile requestDeleteBlobFile) {
1503
            this.vacuumQueue = vacuumQueue;
1✔
1504
            this.requestDeleteBlobFile = requestDeleteBlobFile;
1✔
1505
        }
1✔
1506

1507
        @Override
1508
        public void execute() {
1509
            enqueueVacuum(vacuumQueue, requestDeleteBlobFile);
×
1510
        }
×
1511
    }
1512

1513
    private static void enqueueVacuum(final BlockingQueue<BlobVacuum.Request> vacuumQueue,
1514
            final BlobVacuum.Request request) {
1515
        try {
1516
            /*
1517
             * We offer with timeout because vacuum
1518
             * is best effort rather than essential, anything
1519
             * we cannot vacuum will be cleaned up at next startup
1520
             * either as a part of crash recovery or compaction
1521
             */
1522
            if (!vacuumQueue.offer(request, VACUUM_ENQUEUE_TIMEOUT, TimeUnit.MILLISECONDS)) {
1!
1523
                LOG.error("Timeout, could not not enqueue for vacuum: {}", request);
×
1524
            }
1525
        } catch (final InterruptedException e) {
×
1526
            LOG.error("Interrupted, could not not enqueue for vacuum: {}", request, e);
×
1527
            Thread.currentThread().interrupt();  // restore interrupted status!
×
1528
            return;
×
1529
        }
1530
    }
1✔
1531

1532
    /**
1533
     * The PersistentWriter is designed to be run
1534
     * exclusively on its own single thread for a BlobStore
1535
     * and is solely responsible for writing updates to the
1536
     * persistent blob store file.
1537
     */
1538
    private static class PersistentWriter implements Runnable {
1539

1540
        /**
1541
         * The Poison Pill can be placed on the {@link #persistQueue},
1542
         * when encountered the {@link PersistentWriter} will
1543
         * shutdown.
1544
         */
1545
        public static final Tuple3<BlobId, BlobReference, Integer> POISON_PILL = Tuple(null, null, null);
1✔
1546

1547
        private final BlockingQueue<Tuple3<BlobId, BlobReference, Integer>> persistQueue;
1548
        private final ByteBuffer buffer;
1549
        private final SeekableByteChannel channel;
1550
        private final Runnable abnormalShutdownCallback;
1551

1552
        PersistentWriter(final BlockingQueue<Tuple3<BlobId, BlobReference, Integer>> persistQueue,
1✔
1553
                final ByteBuffer buffer, final SeekableByteChannel channel, final Runnable abnormalShutdownCallback) {
1554
            this.persistQueue = persistQueue;
1✔
1555
            this.buffer = buffer;
1✔
1556
            this.channel = channel;
1✔
1557
            this.abnormalShutdownCallback = abnormalShutdownCallback;
1✔
1558
        }
1✔
1559

1560
        @Override
1561
        public void run() {
1562
            try {
1563
                while (true) {
1✔
1564
                    final Tuple3<BlobId, BlobReference, Integer> blobData = persistQueue.take();
1✔
1565
                    if (blobData == POISON_PILL) {
1✔
1566
                        // if we received the Poison Pill, we should shutdown!
1567
                        break;  // exit
1✔
1568
                    }
1569

1570
                    // write an entry
1571
                    writeEntry(blobData._1, blobData._2, blobData._3);
1✔
1572
                }
1573
            } catch (final InterruptedException e) {
×
1574
                // Restore the interrupted status
1575
                LOG.error("PersistentWriter Shutting down due to interrupt: {}", e.getMessage());
×
1576
                Thread.currentThread().interrupt();
×
1577
                abnormalShutdownCallback.run();
×
1578
            } catch (final IOException e) {
×
1579
                LOG.error("PersistentWriter Shutting down, received: {}", e.getMessage(), e);
×
1580
                abnormalShutdownCallback.run();
×
1581
            }
1582
        }
1✔
1583

1584
        /**
1585
         * Stores the reference count for a blob to the persistent blob store file.
1586
         *
1587
         * When a new reference count is written for the first time it updates
1588
         * the {@link BlobStoreImpl.BlobReference#persistentOffset} with the
1589
         * location of the reference in the persistent file.
1590
         *
1591
         * @param blobId the identifier of the blob.
1592
         * @param blobReference the reference details for the blob
1593
         * @param newCount the new reference count to store.
1594
         *
1595
         * @throws IOException if an error occurs whilst writing the persistent file.
1596
         */
1597
        private void writeEntry(final BlobId blobId, final BlobReference blobReference, final int newCount)
1598
                throws IOException {
1599

1600
            // if new record (i.e. not yet persisted), append to the end of the file
1601
            if (blobReference.persistentOffset == NOT_PERSISTED) {
1✔
1602
                blobReference.persistentOffset = channel.size();
1✔
1603
            }
1604

1605
            channel.position(blobReference.persistentOffset);
1✔
1606

1607
            buffer.clear();
1✔
1608
            buffer.put(blobId.getId());
1✔
1609
            buffer.putInt(newCount);
1✔
1610
            buffer.flip();
1✔
1611

1612
            channel.write(buffer);
1✔
1613
        }
1✔
1614
    }
1615

1616
    /**
1617
     * The BlobVacuum is designed to be run
1618
     * exclusively on its own single thread for a BlobStore
1619
     * and is solely responsible for deleting blob files from
1620
     * the blob file store.
1621
     */
1622
    private static class BlobVacuum implements Runnable {
1623
        private final BlockingQueue<Request> vacuumQueue;
1624

1625
        public BlobVacuum(final BlockingQueue<Request> vacuumQueue) {
1✔
1626
            this.vacuumQueue = vacuumQueue;
1✔
1627
        }
1✔
1628

1629
        @Override
1630
        public void run() {
1631
            try {
1632
                while (true) {
1633
                    final Request request = vacuumQueue.take();
1✔
1634
                    if (!request.service()) {
1!
1635
                        // if the request could not be serviced then enque it so we can try again in future
1636

1637
                        try {
1638
                            if (!vacuumQueue.offer(request, VACUUM_ENQUEUE_TIMEOUT, TimeUnit.MILLISECONDS)) {
×
1639
                                LOG.error("Timeout, could not not enqueue for vacuum: {}", request);
×
1640
                            }
1641
                        } catch (final InterruptedException e) {
×
1642
                            LOG.error("Interrupted, could not not enqueue for vacuum: {}", request, e);
×
1643
                            Thread.currentThread().interrupt();  // restore interrupted status!
×
1644
                            throw e;
×
1645
                        }
1646
                    }
1647
                }
1648
            } catch (final InterruptedException e) {
1✔
1649
                // expected when we are shutting down, only thrown by vacuumQueue.take/offer.
1650
                // Any remaining objects in the queue which we have not yet vacuumed will
1651
                // be taken care of by {@link #compactPersistentReferences(ByteBuffer, Path)
1652
                // when the persistent blob store file is next opened
1653

1654
                // Restore the interrupted status
1655
                Thread.currentThread().interrupt();
1✔
1656
            }
1657
        }
1✔
1658

1659
        /**
1660
         * The type of Vacuum Request
1661
         */
1662
        interface Request extends Comparable<Request> {
1663
            /**
1664
             * @return true if the request was serviced,
1665
             *     false if the request should be re-scheduled.
1666
             */
1667
            boolean service();
1668
        }
1669

1670
        /**
1671
         * Vacuum request for deleting a Blob File for a Blob that has been removed.
1672
         *
1673
         * The Blob File will only be deleted if it has no references and no active readers.
1674
         *
1675
         * As vacuuming happens asynchronously, a new Blob may have been added which
1676
         * has the same de-duplicated Blob File, causing an increase in references,
1677
         * in which case the Blob File will be recycled instead
1678
         * and will not be deleted here.
1679
         */
1680
        public static final class RequestDeleteBlobFile implements Request {
1681
            private final ConcurrentMap<BlobId, BlobReference> references;
1682
            private final Path blobDir;
1683
            private final BlobId blobId;
1684
            private final BlobReference blobReference;
1685

1686
            public RequestDeleteBlobFile(final ConcurrentMap<BlobId, BlobReference> references,
1✔
1687
                    final Path blobDir, final BlobId blobId, final BlobReference blobReference) {
1688
                this.references = references;
1✔
1689
                this.blobDir = blobDir;
1✔
1690
                this.blobId = blobId;
1✔
1691
                this.blobReference = blobReference;
1✔
1692
            }
1✔
1693

1694
            @Override
1695
            public String toString() {
1696
                return "RequestDeleteBlobFile(" + blobId + ")";
×
1697
            }
1698

1699
            @Override
1700
            public int compareTo(final Request other) {
1701
                if (other instanceof RequestDeleteBlobFile) {
1!
1702
                    return ((RequestDeleteBlobFile) other).blobReference.readers.get() - blobReference.readers.get();
1✔
1703
                } else {
1704
                    // This class has higher priority than other classes
1705
                    return 1;
×
1706
                }
1707
            }
1708

1709
            @Override
1710
            public boolean service() {
1711
                // we can only delete the blob file itelf if there are no references
1712
                if (blobReference.count.compareAndSet(0, DELETING)) {
1!
1713

1714
                    // make sure there are no readers still actively reading the blob file
1715
                    if (blobReference.readers.get() == 0) {
1!
1716

1717
                        // no more readers can be taken whilst count == DELETING, so we can delete
1718
                        try {
1719
                            deleteBlob(blobDir, blobId, true);
1✔
1720
                        } catch (final IOException ioe) {
1✔
1721
                            // non-critical error
1722
                            LOG.error("Unable to delete blob file: {}", bytesToHex(blobId.getId()), ioe);
×
1723
                        }
1724

1725
                        // remove from shared map
1726
                        references.remove(blobId);
1✔
1727

1728
                    } else {
1✔
1729
                        // reschedule the blob vacuum for later (when hopefully there are no active readers)
1730
                        return false;
×
1731
                    }
1732

1733
                    // NOTE: DELETING is the last state of a BlobReference#count -- there is no coming back from this!
1734

1735
                } else {
1736
                    /*
1737
                     * no-op: ignore this blob as it now again has active references,
1738
                     * so we don't need to delete it, instead it has been recycled :-)
1739
                     * Therefore we can just continue...
1740
                     */
1741
                }
1742

1743
                // we serviced this request!
1744
                return true;
1✔
1745
            }
1746
        }
1747

1748
        public static final class RequestDeleteStagedBlobFile implements Request {
1749
            private final Path stagingDir;
1750
            private final String stagedBlobUuid;
1751

1752
            public RequestDeleteStagedBlobFile(final Path stagingDir, final String stagedBlobUuid) {
1✔
1753
                this.stagingDir = stagingDir;
1✔
1754
                this.stagedBlobUuid = stagedBlobUuid;
1✔
1755
            }
1✔
1756

1757
            @Override
1758
            public String toString() {
1759
                return "RequestDeleteStagedBlobFile(" + stagedBlobUuid + ")";
×
1760
            }
1761

1762
            @Override
1763
            public int compareTo(final Request other) {
1764
                if (other instanceof RequestDeleteStagedBlobFile) {
×
1765
                    return stagedBlobUuid.compareTo(((RequestDeleteStagedBlobFile)other).stagedBlobUuid);
×
1766
                } else {
1767
                    // This class has lower priority than other classes
1768
                    return -1;
×
1769
                }
1770
            }
1771

1772
            @Override
1773
            public boolean service() {
1774
                final Path stagedBlobFile = stagingDir.resolve(stagedBlobUuid);
1✔
1775
                FileUtils.deleteQuietly(stagedBlobFile);
1✔
1776
                return true;
1✔
1777
            }
1778
        }
1779
    }
1780
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc