• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

evolvedbinary / elemental / 863

26 Apr 2025 05:03PM UTC coverage: 56.413% (-0.002%) from 56.415%
863

push

circleci

adamretter
[bugfix] Update Codacy badge

28457 of 55846 branches covered (50.96%)

Branch coverage included in aggregate %.

77466 of 131918 relevant lines covered (58.72%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.47
/exist-core/src/main/java/org/exist/storage/blob/BlobStoreImpl.java
1
/*
2
 * Copyright (C) 2014, Evolved Binary Ltd
3
 *
4
 * This file was originally ported from FusionDB to eXist-db by
5
 * Evolved Binary, for the benefit of the eXist-db Open Source community.
6
 * Only the ported code as it appears in this file, at the time that
7
 * it was contributed to eXist-db, was re-licensed under The GNU
8
 * Lesser General Public License v2.1 only for use in eXist-db.
9
 *
10
 * This license grant applies only to a snapshot of the code as it
11
 * appeared when ported, it does not offer or infer any rights to either
12
 * updates of this source code or access to the original source code.
13
 *
14
 * The GNU Lesser General Public License v2.1 only license follows.
15
 *
16
 * =====================================================================
17
 *
18
 * Copyright (C) 2014, Evolved Binary Ltd
19
 *
20
 * This library is free software; you can redistribute it and/or
21
 * modify it under the terms of the GNU Lesser General Public
22
 * License as published by the Free Software Foundation; version 2.1.
23
 *
24
 * This library is distributed in the hope that it will be useful,
25
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
26
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
27
 * Lesser General Public License for more details.
28
 *
29
 * You should have received a copy of the GNU Lesser General Public
30
 * License along with this library; if not, write to the Free Software
31
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
32
 */
33
package org.exist.storage.blob;
34

35
import com.evolvedbinary.j8fu.Try;
36
import com.evolvedbinary.j8fu.tuple.Tuple2;
37
import com.evolvedbinary.j8fu.tuple.Tuple3;
38
import net.jcip.annotations.ThreadSafe;
39
import org.apache.commons.io.input.CountingInputStream;
40
import org.apache.logging.log4j.LogManager;
41
import org.apache.logging.log4j.Logger;
42
import org.exist.Database;
43
import org.exist.backup.RawDataBackup;
44
import org.exist.storage.journal.JournalException;
45
import org.exist.storage.journal.JournalManager;
46
import org.exist.storage.journal.LogEntryTypes;
47
import org.exist.storage.journal.LogException;
48
import org.exist.storage.txn.Txn;
49
import org.exist.storage.txn.TxnListener;
50
import org.exist.util.FileUtils;
51
import org.exist.util.UUIDGenerator;
52
import org.exist.util.crypto.digest.DigestInputStream;
53
import org.exist.util.crypto.digest.DigestType;
54
import org.exist.util.crypto.digest.MessageDigest;
55
import org.exist.util.crypto.digest.StreamableDigest;
56

57
import javax.annotation.Nullable;
58
import java.io.FileNotFoundException;
59
import java.io.FilterInputStream;
60
import java.io.IOException;
61
import java.io.InputStream;
62
import java.io.OutputStream;
63
import java.nio.ByteBuffer;
64
import java.nio.channels.SeekableByteChannel;
65
import java.nio.file.Files;
66
import java.nio.file.Path;
67
import java.util.*;
68
import java.util.concurrent.*;
69
import java.util.concurrent.atomic.AtomicBoolean;
70
import java.util.concurrent.atomic.AtomicInteger;
71
import java.util.concurrent.atomic.AtomicReference;
72
import java.util.function.Function;
73

74
import static com.evolvedbinary.j8fu.Try.TaggedTryUnchecked;
75
import static com.evolvedbinary.j8fu.tuple.Tuple.Tuple;
76
import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
77
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
78
import static java.nio.file.StandardOpenOption.*;
79
import static org.exist.storage.blob.BlobLoggable.LOG_STORE_BLOB_FILE;
80
import static org.exist.storage.blob.BlobLoggable.LOG_UPDATE_BLOB_REF_COUNT;
81
import static org.exist.storage.blob.BlobStoreImpl.BlobReference.*;
82
import static org.exist.util.FileUtils.fileName;
83
import static org.exist.util.HexEncoder.bytesToHex;
84
import static org.exist.util.ThreadUtils.nameInstanceThread;
85
import static org.exist.util.ThreadUtils.newInstanceSubThreadGroup;
86

87
/**
88
 * De-duplicating store for BLOBs (Binary Large Objects).
89
 *
90
 * Each unique BLOB is stored by checksum into a blob file on disk.
91
 *
92
 * For each BLOB a reference count and the number of active readers is maintained.
93
 * Adding a BLOB which is already present increments the reference count only,
94
 * it does not require any additional storage.
95
 *
96
 * Removing a BLOB decrements its reference count, BLOBs are only removed when
97
 * their reference count reaches zero, the blob file itself is scheduled for deletion
98
 * and will only be removed when there are no active readers and its reference is zero.
99
 * When the scheduled action for deleting a blob file is realised, if another thread
100
 * has meanwhile added a BLOB to the BLOB store where its blob file has the same
101
 * checksum, then the BLOB's reference count will have increased from zero,
102
 * therefore the schedule will not delete this now again active blob file, we call
103
 * this feature "recycling".
104
 *
105
 * The Blob Store is backed to disk by a persistent store file which
106
 * reflects the in-memory state of BlobStore.
107
 *
108
 * The persistent store file will grow for each unique blob added to
109
 * the system, space is not reclaimed in the persistent file until
110
 * {@link #compactPersistentReferences(ByteBuffer, Path)} is called,
111
 * which typically only happens the next time the blob store is re-opened.
112
 *
113
 * Each unique blob typically takes up only 36 bytes in the
114
 * persistent store file, but this can vary if a smaller or larger
115
 * digestType is specified.
116
 *
117
 * On-line compaction of the persistent file could be added in
118
 * future with relative ease if deemed necessary.
119
 *
120
 * The persistent file for the blob store has the format:
121
 *
122
 * [fileHeader entry+]
123
 *
124
 * fileHeader:          [magicNumber blobStoreVersion].
125
 * magicNumber:         4 bytes. See {@link #BLOB_STORE_MAGIC_NUMBER}.
126
 * blobStoreVersion:    2 bytes. java.lang.short, see {@link #BLOB_STORE_VERSION}.
127
 *
128
 * entry:               [blobChecksum blobReferenceCount]
129
 * blobChecksum:        n-bytes determined by the constructed {@link MessageDigest}.
130
 * blobReferenceCount:  4 bytes. java.lang.int.
131
 *
132
 * Note the persistent file may contain more than one entry
133
 * for the same blobChecksum, however all entries previous to
134
 * the last entry will have a blobReferenceCount of zero. The last
135
 * entry will have the current blobReferenceCount which may be
136
 * zero or more.
137
 *     The use of {@code orphanedBlobFileIds} in the
138
 * {@link #compactPersistentReferences(ByteBuffer, Path)} method
139
 * makes sure to only delete blob files which have a final
140
 * blobReferenceCount of zero.
141
 *
142
 * For performance, writing to the persistent store file,
143
 * removing staged blob files, and deleting blob files are all
144
 * asynchronous actions. To ensure the ability to achieve a consistent
145
 * state after a system crash, the BLOB Store writes entries to a
146
 * Journal WAL (Write-Ahead-Log) which is flushed to disk before each state
147
 * changing operation. If the system restarts after a crash, then a recovery
148
 * process will be performed from the entries in the WAL.
149
 *
150
 * Journal and Recovery of the BLOB Store works as follows:
151
 *
152
 *  Add Blob:
153
 *    Writes two journal entries:
154
 *      * StoreBlobFile(blobId, stagedUuid)
155
 *      * UpdateBlobReferenceCount(blobId, currentCount, newCount + 1)
156
 *
157
 *      On crash recovery, firstly:
158
 *          the StoreBlobFile will either be:
159
 *              1. undone, which copies the blob file from the blob store to the staging area,
160
 *              2. redone, which copies the blob file from the staging area to the blob store,
161
 *              3. or both undone and redone.
162
 *          This is possible because the blob file in the staging area is ONLY deleted after
163
 *          a COMMIT and CHECKPOINT have been written to the Journal, which means
164
 *          that the staged file is always available for recovery, and that no
165
 *          crash recovery of the staged file itself is needed
166
 *
167
 *          Deletion of the staged
168
 *          file happens on a best effort basis, any files which were not deleted due
169
 *          to a system crash, will be deleted upon restart (after recovery) when
170
 *          the Blob Store is next opened.
171
 *
172
 *      Secondly:
173
 *          the BlobReferenceCount will either be undone, redone, or both.
174
 *
175
 *
176
 *  Remove Blob:
177
 *      Writes a single journal entry:
178
 *        *  UpdateBlobReferenceCount(blobId, currentCount, currentCount - 1)
179
 *
180
 *      On crash recovery the BlobReferenceCount will either be undone, redone,
181
 *      or both.
182
 *
183
 *      It is worth noting that the actual blob file on disk is only ever deleted
184
 *      after a COMMIT and CHECKPOINT have been written to the Journal, and then
185
 *      only when it has zero references and zero readers. As it is only
186
 *      deleted after a CHECKPOINT, there is no need for any crash recovery of the
187
 *      disk file itself.
188
 *
189
 *      Deletion of the blob file happens on a best effort basis, any files which
190
 *      were not deleted due to a system crash, will be deleted upon restart
191
 *      (after recovery) by the {@link #compactPersistentReferences(ByteBuffer, Path)}
192
 *      process when the Blob Store is next opened.
193
 *
194
 *
195
 * @author <a href="mailto:adam@evolvedbinary.com">Adam Retter</a>
196
 */
197
@ThreadSafe
198
public class BlobStoreImpl implements BlobStore {
199

200
    private static final Logger LOG = LogManager.getLogger(BlobStoreImpl.class);
1✔
201

202
    /**
203
     * Maximum time to wait whilst trying add an
204
     * item to the vacuum queue {@link #vacuumQueue}.
205
     */
206
    private static final long VACUUM_ENQUEUE_TIMEOUT = 5000;  // 5 seconds
207

208
    /*
209
     * Journal entry types
210
     */
211
    static {
212
        LogEntryTypes.addEntryType(LOG_STORE_BLOB_FILE, StoreBlobFileLoggable::new);
1✔
213
        LogEntryTypes.addEntryType(LOG_UPDATE_BLOB_REF_COUNT, UpdateBlobRefCountLoggable::new);
1✔
214
    }
215

216
    /**
217
     * Length in bytes of the reference count.
218
     */
219
    static final int REFERENCE_COUNT_LEN = 4;
220

221
    /**
222
     * File header length
223
     */
224
    static final int BLOB_STORE_HEADER_LEN = 6;
225

226
    /**
227
     * File header - magic number
228
     */
229
    static final byte[] BLOB_STORE_MAGIC_NUMBER = {0x0E, 0x0D, 0x0B, 0x02};
1✔
230

231
    /**
232
     * File header - blob store version
233
     */
234
    public static final short BLOB_STORE_VERSION = 1;
1✔
235

236
    private ByteBuffer buffer;
237
    private SeekableByteChannel channel;
238

239
    /**
240
     * In-memory representation of the Blob Store.
241
     */
242
    private ConcurrentMap<BlobId, BlobReference> references;
243

244
    /**
245
     * Queue for communicating between the thread calling
246
     * the BlobStore and the {@link #persistentWriter} thread.
247
     *
248
     * Holds blob references which need to be updated in the
249
     * blob stores persistent dbx file ({@link #persistentFile})
250
     * on disk.
251
     */
252
    private final BlockingQueue<Tuple3<BlobId, BlobReference, Integer>> persistQueue = new LinkedBlockingQueue<>();
1✔
253

254
    /**
255
     * Queue for communicating between the thread calling
256
     * the BlobStore and the {@link #blobVacuum} thread.
257
     *
258
     * Head of the queue is the blob with the least active readers.
259
     *
260
     * Holds blob references which are scheduled to have their blob file
261
     * removed from the blob file store.
262
     */
263
    private final BlockingQueue<BlobVacuum.Request> vacuumQueue = new PriorityBlockingQueue<>();
1✔
264

265
    private final Database database;
266
    private final Path persistentFile;
267
    private final Path blobDir;
268
    private final Path stagingDir;
269
    private final DigestType digestType;
270

271
    /**
272
     * Enumeration of possible
273
     * Blob Store states.
274
     */
275
    private enum State {
1✔
276
        OPENING,
1✔
277
        OPEN,
1✔
278
        RECOVERY,
1✔
279
        CLOSING,
1✔
280
        CLOSED
1✔
281
    }
282

283
    /**
284
     * State of the Blob Store
285
     */
286
    private final AtomicReference<State> state = new AtomicReference<>(State.CLOSED);
1✔
287

288
    /**
289
     * Thread which updates the persistent blob
290
     * store file on disk.
291
     */
292
    private PersistentWriter persistentWriter;
293
    private Thread persistentWriterThread;
294

295
    /**
296
     * Thread which deletes de-referenced
297
     * blob files when there are no
298
     * more readers.
299
     */
300
    private BlobVacuum blobVacuum;
301
    private Thread blobVacuumThread;
302

303
    /**
304
     * @param database the database that this BlobStore is operating within
305
     * @param persistentFile the file path for the persistent blob store metadata.
306
     * @param blobDir the directory to store BLOBs in.
307
     * @param digestType the message digest type to use for creating checksums of the BLOBs.
308
     */
309
    public BlobStoreImpl(final Database database, final Path persistentFile, final Path blobDir,
1✔
310
            final DigestType digestType) {
311
        this.database = database;
1✔
312
        this.persistentFile = persistentFile;
1✔
313
        this.blobDir = blobDir;
1✔
314
        this.stagingDir = blobDir.resolve("staging");
1✔
315
        this.digestType = digestType;
1✔
316
    }
1✔
317

318
    @Override
319
    public void open() throws IOException {
320
        openBlobStore(false);
1✔
321

322
        // thread group for the blob store
323
        final ThreadGroup blobStoreThreadGroup = newInstanceSubThreadGroup(database, "blob-store");
1✔
324

325
        // startup the persistent writer thread
326
        this.persistentWriter = new PersistentWriter(persistQueue, buffer, channel,
1✔
327
                this::abnormalPersistentWriterShutdown);
1✔
328
        this.persistentWriterThread = new Thread(blobStoreThreadGroup, persistentWriter,
1✔
329
                nameInstanceThread(database, "blob-store.persistent-writer"));
1✔
330
        persistentWriterThread.start();
1✔
331

332
        // startup the blob vacuum thread
333
        this.blobVacuum = new BlobVacuum(vacuumQueue);
1✔
334
        this.blobVacuumThread = new Thread(blobStoreThreadGroup, blobVacuum,
1✔
335
                nameInstanceThread(database, "blob-store.vacuum"));
1✔
336
        blobVacuumThread.start();
1✔
337

338
        // we are now open!
339
        state.set(State.OPEN);
1✔
340
    }
1✔
341

342
    @Override
343
    public void openForRecovery() throws IOException {
344
        openBlobStore(true);
1✔
345
        state.set(State.RECOVERY);
1✔
346
    }
1✔
347

348
    /**
349
     * Opens the BLOB Store's persistent store file,
350
     * and prepares the staging area.
351
     *
352
     * @param forRecovery true if the Blob Store is being opened for crash recovery, false otherwise
353
     *
354
     * @throws IOException if an error occurs whilst opening the BLOB Store
355
     */
356
    private void openBlobStore(final boolean forRecovery) throws IOException {
357
        if (state.get() == State.OPEN) {
1!
358
            if (forRecovery) {
×
359
                throw new IOException("BlobStore is already open!");
×
360
            } else {
361
                return;
×
362
            }
363
        }
364

365
        if (!state.compareAndSet(State.CLOSED, State.OPENING)) {
1!
366
            throw new IOException("BlobStore is not closed");
×
367
        }
368

369
        // size the buffer to hold a complete entry
370
        buffer = ByteBuffer.allocate(digestType.getDigestLengthBytes() + REFERENCE_COUNT_LEN);
1✔
371
        try {
372
            // open the dbx file
373
            if (Files.exists(persistentFile)) {
1✔
374
                if (!forRecovery) {
1✔
375
                    // compact existing blob store file and then open
376
                    this.references = compactPersistentReferences(buffer, persistentFile);
1✔
377
                    channel = Files.newByteChannel(persistentFile, WRITE);
1✔
378

379
                    /*
380
                     * We are not recovering, so we can delete any staging area left over
381
                     * from a previous running database instance
382
                     */
383
                    FileUtils.deleteQuietly(stagingDir);
1✔
384
                } else {
1✔
385
                    // recovery... so open the existing blob store file and just validate its header
386
                    channel = Files.newByteChannel(persistentFile, WRITE, READ);
1✔
387
                    validateFileHeader(buffer, persistentFile, channel);
1✔
388
                }
389
            } else {
1✔
390
                // open new blob store file
391
                if (forRecovery) {
1!
392
                    // we are trying to recover, but there is no existing Blob Store!
393
                    throw new FileNotFoundException("No Blob Store found at '"
×
394
                            + persistentFile.toAbsolutePath().toString() + "' to recover!");
×
395
                }
396

397
                references = new ConcurrentHashMap<>();
1✔
398
                channel = Files.newByteChannel(persistentFile, CREATE_NEW, WRITE);
1✔
399
                writeFileHeader(buffer, channel);
1✔
400
            }
401

402
            // create the staging directory if it does not exist
403
            Files.createDirectories(stagingDir);
1✔
404
        } catch (final IOException e) {
1✔
405
            if (channel != null) {
×
406
                try {
407
                    channel.close();
×
408
                } catch (final IOException ce) {
×
409
                    // ignore
410
                }
411
            }
412
            state.set(State.CLOSED);
×
413
            throw e;
×
414
        }
415
    }
1✔
416

417
    @Override
418
    public void close() throws IOException {
419
        // check the blob store is open
420
        if (state.get() == State.CLOSED) {
1!
421
            return;
×
422
        }
423

424
        if (state.compareAndSet(State.OPEN, State.CLOSING)) {
1✔
425

426
            // close up normally
427
            normalClose();
1✔
428

429
        } else if (state.compareAndSet(State.RECOVERY, State.CLOSING)) {
1!
430

431
            // close up after recovery was attempted
432
            closeAfterRecoveryAttempt();
1✔
433

434
        } else {
1✔
435
            throw new IOException("BlobStore is not open");
×
436
        }
437
    }
1✔
438

439
    /**
440
     * Closes the Blob Store.
441
     *
442
     * @throws IOException if an error occurs whilst closing the Blob Store
443
     */
444
    private void normalClose() throws IOException {
445
        try {
446
            // shutdown the persistent writer
447
            if (persistentWriter != null) {
1!
448
                persistQueue.put(PersistentWriter.POISON_PILL);
1✔
449
            }
450
            persistentWriterThread.join();
1✔
451

452
            // shutdown the vacuum
453
            if (blobVacuum != null) {
1!
454
                blobVacuumThread.interrupt();
1✔
455
            }
456
            blobVacuumThread.join();
1✔
457
        } catch (final InterruptedException e) {
1✔
458
            // Restore the interrupted status
459
            Thread.currentThread().interrupt();
×
460
            throw new IOException(e);
×
461
        } finally {
462
            closeBlobStore();
1✔
463

464
            // we are now closed!
465
            state.set(State.CLOSED);
1✔
466
        }
467
    }
1✔
468

469
    /**
470
     * Closes the Blob Store after it was opened for Recovery.
471
     */
472
    private void closeAfterRecoveryAttempt() {
473
        closeBlobStore();
1✔
474

475
        // we are now closed!
476
        state.set(State.CLOSED);
1✔
477
    }
1✔
478

479
    /**
480
     * Closes the resources associated
481
     * with the Blob Store persistent file.
482
     *
483
     * Clears the {@link #buffer} and closes the {@link #channel}.
484
     */
485
    private void closeBlobStore() {
486
        if (buffer != null) {
1!
487
            buffer.clear();
1✔
488
            buffer = null;
1✔
489
        }
490

491
        // close the file channel
492
        if (channel != null) {
1!
493
            try {
494
                channel.close();
1✔
495
                channel = null;
1✔
496
            } catch (final IOException e) {
1✔
497
                // non-critical error
498
                LOG.error("Error whilst closing blob.dbx: {}", e.getMessage(), e);
×
499
            }
500
        }
501
    }
1✔
502

503
    /**
504
     * Closes the BlobStore if the {@link #persistentWriter} has
505
     * to shutdown due to abnormal circumstances.
506
     *
507
     * Only called when the Blob Store is in the {@link State#OPEN} state!
508
     */
509
    private void abnormalPersistentWriterShutdown() {
510
        // check the blob store is open
511
        if (state.get() == State.CLOSED) {
×
512
            return;
×
513
        }
514
        if (!state.compareAndSet(State.OPEN, State.CLOSING)) {
×
515
            return;
×
516
        }
517

518
        try {
519

520
            // NOTE: persistent writer thread will join when this method finishes!
521

522
            // shutdown the vacuum
523
            if (blobVacuum != null) {
×
524
                blobVacuumThread.interrupt();
×
525
            }
526
            blobVacuumThread.join();
×
527

528
        } catch (final InterruptedException e) {
×
529
            // Restore the interrupted status
530
            Thread.currentThread().interrupt();
×
531
            LOG.error(e.getMessage(), e);
×
532
        } finally {
533
            closeBlobStore();
×
534

535
            // we are now closed!
536
            state.set(State.CLOSED);
×
537
        }
538
    }
×
539

540
    /**
541
     * Compacts an existing Blob Store file.
542
     *
543
     * Reads the existing Blob Store file and copies non zero reference
544
     * entries to a new Blob Store file. We call this compaction.
545
     * Once complete, the existing file is replaced with the new file.
546
     *
547
     * @param persistentFile an existing persistentFile to compact.
548
     *
549
     * @return An in-memory representation of the compacted Blob Store
550
     *
551
     * @throws IOException if an error occurs during compaction.
552
     */
553
    private ConcurrentMap<BlobId, BlobReference> compactPersistentReferences(final ByteBuffer buffer,
554
            final Path persistentFile) throws IOException {
555

556
        final ConcurrentMap<BlobId, BlobReference> compactReferences = new ConcurrentHashMap<>();
1✔
557
        final Path compactPersistentFile = persistentFile.getParent().resolve(
1✔
558
                persistentFile.getFileName() + ".new." + System.currentTimeMillis());
1✔
559

560
        // tracks the BlobIds of Blob Files which have been orphaned
561
        final Set<BlobId> orphanedBlobFileIds = new HashSet<>();
1✔
562

563
        try (final SeekableByteChannel channel = Files.newByteChannel(persistentFile, READ)) {
1✔
564

565
            validateFileHeader(buffer, persistentFile, channel);
1✔
566
            buffer.clear();
1✔
567

568
            try (final SeekableByteChannel compactChannel = Files.newByteChannel(compactPersistentFile,
1✔
569
                    CREATE_NEW, APPEND)) {
1✔
570

571
                writeFileHeader(buffer, compactChannel);
1✔
572

573
                buffer.clear();
1✔
574

575
                while (channel.read(buffer) > -1) {
1✔
576
                    final byte[] id = new byte[digestType.getDigestLengthBytes()];
1✔
577
                    buffer.flip();
1✔
578
                    buffer.get(id);
1✔
579
                    final BlobId blobId = new BlobId(id);
1✔
580
                    final int count = buffer.getInt();
1✔
581

582
                    if (count == 0) {
1✔
583
                        orphanedBlobFileIds.add(blobId);
1✔
584
                    } else {
1✔
585
                        orphanedBlobFileIds.remove(blobId);
1✔
586

587
                        compactReferences.put(blobId, new BlobReference(count, compactChannel.position()));
1✔
588

589
                        buffer.flip();
1✔
590
                        compactChannel.write(buffer);
1✔
591
                    }
592

593
                    buffer.clear();
1✔
594
                }
595
            }
596
        }
597

598
        // cleanup any orphaned Blob files
599
        for (final BlobId orphanedBlobFileId : orphanedBlobFileIds) {
1✔
600
            deleteBlob(blobDir, orphanedBlobFileId, false);
1✔
601
        }
602

603
        // replace the persistent file with the new compact persistent file
604
        Files.move(compactPersistentFile, persistentFile, ATOMIC_MOVE, REPLACE_EXISTING);
1✔
605

606
        return compactReferences;
1✔
607
    }
608

609
    /**
610
     * Writes the persistent file header
611
     *
612
     * @param buffer a byte buffer to use
613
     * @param channel the channel to write to
614
     * @return the number of bytes written.
615
     * @throws IOException if an error occurs whilst writing the header.
616
     */
617
    private long writeFileHeader(final ByteBuffer buffer, final SeekableByteChannel channel) throws IOException {
618
        final long start = channel.position();
1✔
619

620
        buffer.clear();
1✔
621
        writeFileHeader(buffer);
1✔
622

623
        buffer.flip();
1✔
624
        buffer.limit(BLOB_STORE_HEADER_LEN);
1✔
625
        channel.write(buffer);
1✔
626

627
        return channel.position() - start;
1✔
628
    }
629

630
    /**
631
     * Writes the persistent file header
632
     *
633
     * @param buffer the buffer to write to
634
     */
635
    private static void writeFileHeader(final ByteBuffer buffer) {
636
        buffer.put(BLOB_STORE_MAGIC_NUMBER);
1✔
637
        buffer.putShort(BLOB_STORE_VERSION);
1✔
638
    }
1✔
639

640
    /**
641
     * Validates the persistent file header.
642
     *
643
     * @param buffer a byte buffer to use
644
     * @param file the file containing the header.
645
     * @param channel the channel of the file to read from.
646
     *
647
     * @throws IOException if the header is invalid.
648
     */
649
    private void validateFileHeader(final ByteBuffer buffer, final Path file, final SeekableByteChannel channel)
650
            throws IOException {
651
        buffer.clear();
1✔
652
        buffer.limit(BLOB_STORE_HEADER_LEN);
1✔
653

654
        channel.read(buffer);
1✔
655

656
        buffer.flip();
1✔
657

658
        final boolean validMagic =
1✔
659
                buffer.get() == BLOB_STORE_MAGIC_NUMBER[0]
1!
660
                        && buffer.get() == BLOB_STORE_MAGIC_NUMBER[1]
1!
661
                        && buffer.get() == BLOB_STORE_MAGIC_NUMBER[2]
1!
662
                        && buffer.get() == BLOB_STORE_MAGIC_NUMBER[3];
1!
663

664
        if (!validMagic) {
1!
665
            throw new IOException("File was not recognised as a valid Elemental Blob Store: "
×
666
                    + file.toAbsolutePath().toString());
×
667
        }
668

669
        // check the version of the blob store format
670
        final short storedVersion = buffer.getShort();
1✔
671
        final boolean validVersion =
1✔
672
                storedVersion == BLOB_STORE_VERSION;
1!
673

674
        if (!validVersion) {
1!
675
            throw new IOException("Blob Store file was version " + storedVersion + ", but required version "
×
676
                    + BLOB_STORE_VERSION + ": " + file.toAbsolutePath().toString());
×
677
        }
678
    }
1✔
679

680
    @Override
681
    public Tuple2<BlobId, Long> add(final Txn transaction, final InputStream is) throws IOException {
682
        if (state.get() != State.OPEN) {
1!
683
            throw new IOException("Blob Store is not open!");
×
684
        }
685

686
        // stage the BLOB file
687
        final Tuple3<Path, Long, MessageDigest> staged = stage(is);
1✔
688

689
        final BlobVacuum.RequestDeleteStagedBlobFile requestDeleteStagedBlobFile =
1✔
690
                new BlobVacuum.RequestDeleteStagedBlobFile(stagingDir, staged._1.getFileName().toString());
1✔
691

692
        // register a callback to cleanup the staged BLOB file ONLY after commit+checkpoint
693
        final JournalManager journalManager = database.getJournalManager().orElse(null);
1✔
694
        if (journalManager != null) {
1✔
695
            final DeleteStagedBlobFile cleanupStagedBlob = new DeleteStagedBlobFile(vacuumQueue, requestDeleteStagedBlobFile);
1✔
696
            journalManager.listen(cleanupStagedBlob);
1✔
697
            transaction.registerListener(cleanupStagedBlob);
1✔
698
        }
699

700
        final BlobId blobId = new BlobId(staged._3.getValue());
1✔
701

702
        // if the blob entry does not exist, we exclusively compute it as STAGED.
703
        BlobReference blobReference = references.computeIfAbsent(blobId, k -> new BlobReference(STAGED));
1✔
704

705
        try {
706
            while (true) {
707

708
                if (blobReference.count.compareAndSet(STAGED, PROMOTING)) {
1✔
709
                    // NOTE: we are the only thread that can be in this branch for the blobId
710

711
                    // write journal entries to the WAL
712
                    if (journalManager != null) {
1✔
713
                        try {
714
                            journalManager.journal(new StoreBlobFileLoggable(transaction.getId(), blobId, staged._1.getFileName().toString()));
1✔
715
                            journalManager.journal(new UpdateBlobRefCountLoggable(transaction.getId(), blobId, 0, 1));
1✔
716
                            journalManager.flush(true, true);   // force WAL entries to disk!
1✔
717
                        } catch (final JournalException e) {
1✔
718
                            references.remove(blobId);
×
719
                            throw new IOException(e);
×
720
                        }
721
                    }
722

723
                    // promote the staged blob
724
                    promote(staged);
1✔
725
                    if (journalManager == null) {
1✔
726
                        // no journal (or recovery)... so go ahead and schedule cleanup of the staged blob file
727
                        enqueueVacuum(vacuumQueue, requestDeleteStagedBlobFile);
1✔
728
                    }
729

730
                    // schedule disk persist of the new value
731
                    persistQueue.put(Tuple(blobId, blobReference, 1));
1✔
732

733
                    // update memory with the new value
734
                    blobReference.count.set(1);
1✔
735

736
                    // done!
737
                    return Tuple(blobId, staged._2);
1✔
738
                }
739

740
                final int count = blobReference.count.get();
1✔
741

742
                // guard against a concurrent #add or #remove
743
                if (count == PROMOTING || count == UPDATING_COUNT) {
1!
744
                    // spin whilst another thread promotes the blob, or updates the reference count
745
                    // sleep a small time to save CPU
746
                    Thread.sleep(10);
×
747
                    continue;
×
748
                }
749

750
                // guard against a concurrent vacuum operation
751
                // ...retry the blob reference until the vacuum has completed!
752
                // i.e. wait for the deletion of the blob to complete, and then we can add the blob again
753
                if (count == DELETING) {
1!
754
                    blobReference = references.computeIfAbsent(blobId, k -> new BlobReference(STAGED));
×
755
                    continue;   // loop again
×
756
                }
757

758
                // only increment the blob reference if the blob is active!
759
                if (count >= 0 && blobReference.count.compareAndSet(count, UPDATING_COUNT)) {
1!
760
                    // NOTE: we are the only thread that can be in this branch for the blobId
761

762
                    final int newCount = count + 1;
1✔
763

764
                    // write journal entries to the WAL
765
                    if (journalManager != null) {
1✔
766
                        try {
767
                            journalManager.journal(new UpdateBlobRefCountLoggable(transaction.getId(), blobId, count, newCount));
1✔
768
                            journalManager.flush(true, true);   // force WAL entries to disk!
1✔
769
                        } catch (final JournalException e) {
1✔
770
                            // restore the state of the blobReference first!
771
                            blobReference.count.set(count);
×
772
                            throw new IOException(e);
×
773
                        }
774
                    }
775

776
                    // persist the new value
777
                    persistQueue.put(Tuple(blobId, blobReference, newCount));
1✔
778

779
                    // update memory with the new value, and release other spinning threads
780
                    blobReference.count.set(newCount);
1✔
781

782
                    // done!
783
                    return Tuple(blobId, staged._2);
1✔
784
                }
785
            }
786
        } catch (final InterruptedException e) {
×
787
            // thrown by persistQueue.put or Thread.sleep
788
            Thread.currentThread().interrupt();
×
789
            throw new IOException(e);
×
790
        }
791
    }
792

793
    @Override
794
    public BlobId copy(final Txn transaction, final BlobId blobId) throws IOException {
795
        if (state.get() != State.OPEN) {
1!
796
            throw new IOException("Blob Store is not open!");
×
797
        }
798

799
        final BlobReference blobReference = references.get(blobId);
1✔
800
        if (blobReference == null) {
1!
801
            return null;
×
802
        }
803

804
        // NOTE: that copy is simply an increment of the reference count!
805
        try {
806
            while (true) {
807

808
                final int count = blobReference.count.get();
1✔
809

810
                // guard against a concurrent #add or #remove
811
                if (count == STAGED || count == PROMOTING || count == UPDATING_COUNT) {
1!
812
                    // spin whilst another thread promotes the blob, or updates the reference count
813
                    // sleep a small time to save CPU
814
                    Thread.sleep(10);
×
815
                    continue;
×
816
                }
817

818
                // guard against a concurrent vacuum operation
819
                if (count == DELETING) {
1!
820
                    return null;
×
821
                }
822

823
                // only increment the blob reference if the blob is active!
824
                if (count >= 0 && blobReference.count.compareAndSet(count, UPDATING_COUNT)) {
1!
825
                    // NOTE: we are the only thread that can be in this branch for the blobId
826

827
                    final int newCount = count + 1;
1✔
828

829
                    // write journal entries to the WAL
830
                    final JournalManager journalManager = database.getJournalManager().orElse(null);
1✔
831
                    if (journalManager != null) {
1✔
832
                        try {
833
                            journalManager.journal(new UpdateBlobRefCountLoggable(transaction.getId(), blobId, count, newCount));
1✔
834
                            journalManager.flush(true, true);   // force WAL entries to disk!
1✔
835
                        } catch (final JournalException e) {
1✔
836
                            // restore the state of the blobReference first!
837
                            blobReference.count.set(count);
×
838
                            throw new IOException(e);
×
839
                        }
840
                    }
841

842
                    // persist the new value
843
                    persistQueue.put(Tuple(blobId, blobReference, newCount));
1✔
844

845
                    // update memory with the new value, and release other spinning threads
846
                    blobReference.count.set(newCount);
1✔
847

848
                    // done!
849
                    return blobId;
1✔
850
                }
851
            }
852
        } catch (final InterruptedException e) {
×
853
            // thrown by persistQueue.put or Thread.sleep
854
            Thread.currentThread().interrupt();
×
855
            throw new IOException(e);
×
856
        }
857
    }
858

859
    @Override
860
    @Nullable public InputStream get(final Txn transaction, final BlobId blobId) throws IOException {
861
        final BlobFileLease blobFileLease = readLeaseBlobFile(transaction, blobId);
1✔
862
        if (blobFileLease == null) {
1✔
863
            return null;
1✔
864
        }
865

866
        // blob file lease is released either when the input stream is closed, or if an error occurs opening the stream
867
        try {
868
            return new OnCloseInputStream(Files.newInputStream(blobFileLease.path), blobFileLease.release);
1✔
869
        } catch (final IOException e) {
×
870
            blobFileLease.release.run();  // MUST release the read lease!
×
871
            throw e;
×
872
        }
873
    }
874

875
    @Override
876
    @Nullable public MessageDigest getDigest(final Txn transaction, final BlobId blobId, final DigestType digestType)
877
            throws IOException {
878
        if (this.digestType.equals(digestType)) {
1✔
879
            // optimisation, we can just return the BlobId as that is the digest for this digest type!
880
            return new MessageDigest(digestType, blobId.getId());
1✔
881

882
        } else {
883
            // calculate the digest
884
            final StreamableDigest streamableDigest = digestType.newStreamableDigest();
1✔
885
            final Try<MessageDigest, IOException> result = with(transaction, blobId, maybeBlobFile ->
1✔
886
                    maybeBlobFile == null ? null :
1!
887
                            TaggedTryUnchecked(IOException.class, () -> {
1✔
888
                                FileUtils.digest(maybeBlobFile, streamableDigest);
1✔
889
                                return new MessageDigest(streamableDigest.getDigestType(),
1✔
890
                                        streamableDigest.getMessageDigest());
1✔
891
                            })
1✔
892
            );
893
            return result.get();
1✔
894
        }
895
    }
896

897
    @Override
898
    public <T> T with(final Txn transaction, final BlobId blobId, final Function<Path, T> fnFile) throws IOException {
899
        final BlobFileLease blobFileLease = readLeaseBlobFile(transaction, blobId);
1✔
900
        try {
901
            return fnFile.apply(blobFileLease == null ? null : blobFileLease.path);
1!
902
        } finally {
903
            if (blobFileLease != null) {
1!
904
                blobFileLease.release.run();  // MUST release the read lease!
1✔
905
            }
906
        }
907
    }
908

909
    /**
910
     * Lease a Blob file for reading from the Blob Store.
911
     *
912
     * @param transaction the current database transaction.
913
     * @param blobId the identifier of the blob to lease the blob file from.
914
     *
915
     * @return the blob file lease, or null if the blob does not exist in the Blob Store
916
     *
917
     * @throws IOException if an error occurs whilst retrieving the BLOB file.
918
     */
919
    private BlobFileLease readLeaseBlobFile(final Txn transaction, final BlobId blobId) throws IOException {
920
        if (state.get() != State.OPEN) {
1!
921
            throw new IOException("Blob Store is not open!");
×
922
        }
923

924
        final BlobReference blobReference = references.get(blobId);
1✔
925
        if (blobReference == null) {
1✔
926
            return null;
1✔
927
        }
928

929
        try {
930
            while (true) {
931
                final int count = blobReference.count.get();
1✔
932

933
                if (count == 0) {
1✔
934
                    // can't return something with has zero references
935
                    return null;
1✔
936
                }
937

938
                // guard against a concurrent vacuum operation
939
                if (count == DELETING) {
1!
940
                    // can't return something with has zero references (because it is being deleted)
941
                    return null;
×
942
                }
943

944
                // guard against a concurrent #add doing staging
945
                if (count == STAGED || count == PROMOTING) {
1!
946
                    // spin whilst another thread promotes the blob
947
                    // sleep a small time to save CPU
948
                    Thread.sleep(10);
×
949
                    continue;
×
950
                }
951

952
                // read a blob which has references
953
                if (count > 0) {
1!
954
                    // we are reading
955
                    blobReference.readers.incrementAndGet();
1✔
956

957
                    // get the blob
958
                    final Path blobFile = blobDir.resolve(bytesToHex(blobId.getId()));
1✔
959
                    return new BlobFileLease(blobFile, blobReference.readers::decrementAndGet);
1✔
960
                }
961
            }
962
        } catch (final InterruptedException e) {
×
963
            // only thrown by Thread.sleep above
964
            Thread.currentThread().interrupt();
×
965
            throw new IOException(e);
×
966
        }
967
    }
968

969
    /**
970
     * Gets the reference count for the Blob
971
     *
972
     * NOTE: this method is not thread-safe and should ONLY
973
     * be used for testing, which is why this method is
974
     * marked package-private!
975
     *
976
     * @param blobId The id of the blob
977
     *
978
     * @return the reference count, or null if the blob id is not in the references table.
979
     *
980
     * @throws IOException if the BlobStore is not open.
981
     */
982
    @Nullable Integer getReferenceCount(final BlobId blobId) throws IOException {
983
        if (state.get() != State.OPEN) {
1!
984
            throw new IOException("Blob Store is not open!");
×
985
        }
986

987
        final BlobReference blobReference = references.get(blobId);
1✔
988
        if (blobReference == null) {
1✔
989
            return null;
1✔
990
        }
991
        return blobReference.count.get();
1✔
992
    }
993

994
    @Override
995
    public void remove(final Txn transaction, final BlobId blobId) throws IOException {
996
        if (state.get() != State.OPEN) {
1!
997
            throw new IOException("Blob Store is not open!");
×
998
        }
999

1000
        final BlobReference blobReference = references.get(blobId);
1✔
1001
        if (blobReference == null) {
1!
1002
            return;
×
1003
        }
1004

1005
        try {
1006
            while (true) {
1007
                final int count = blobReference.count.get();
1✔
1008

1009
                if (count == 0) {
1!
1010
                    // can't remove something which has zero references
1011
                    return;
×
1012
                }
1013

1014
                // guard against a concurrent vacuum operation
1015
                if (count == DELETING) {
1!
1016
                    // can't remove something which has zero references (because it is being deleted)
1017
                    return;
×
1018
                }
1019

1020
                // guard against a concurrent #add or #remove
1021
                if (count == STAGED || count == PROMOTING || count == UPDATING_COUNT) {
1!
1022
                    // spin whilst another thread promotes the blob or updates the reference count
1023
                    // sleep a small time to save CPU
1024
                    Thread.sleep(10);
×
1025
                    continue;
×
1026
                }
1027

1028
                // only decrement the blob reference if the blob has more than zero references
1029
                if (count > 0 && blobReference.count.compareAndSet(count, UPDATING_COUNT)) {
1!
1030
                    // NOTE: we are the only thread that can be in this branch for the blobId
1031

1032
                    final int newCount = count - 1;
1✔
1033

1034
                    // write journal entries to the WAL
1035
                    final JournalManager journalManager = database.getJournalManager().orElse(null);
1✔
1036
                    if (journalManager != null) {
1✔
1037
                        try {
1038
                            journalManager.journal(new UpdateBlobRefCountLoggable(transaction.getId(), blobId, count, newCount));
1✔
1039
                            journalManager.flush(true, true);   // force WAL entries to disk!
1✔
1040
                        } catch (final JournalException e) {
1✔
1041
                            // restore the state of the blobReference first!
1042
                            blobReference.count.set(count);
×
1043
                            throw new IOException(e);
×
1044
                        }
1045
                    }
1046

1047
                    // schedule disk persist of the new value
1048
                    persistQueue.put(Tuple(blobId, blobReference, newCount));
1✔
1049

1050
                    if (newCount == 0) {
1✔
1051
                        // schedule blob file for vacuum.
1052

1053
                        final BlobVacuum.RequestDeleteBlobFile requestDeleteBlobFile =
1✔
1054
                                new BlobVacuum.RequestDeleteBlobFile(references, blobDir, blobId, blobReference);
1✔
1055

1056
                        if (journalManager != null) {
1✔
1057
                            // register a callback to schedule the BLOB file for vacuum ONLY after commit+checkpoint
1058
                            final ScheduleDeleteBlobFile scheduleDeleteBlobFile = new ScheduleDeleteBlobFile(
1✔
1059
                                    vacuumQueue, requestDeleteBlobFile);
1✔
1060
                            journalManager.listen(scheduleDeleteBlobFile);
1✔
1061
                            transaction.registerListener(scheduleDeleteBlobFile);
1✔
1062
                        } else {
1✔
1063
                            // no journal (or recovery)... so go ahead and schedule
1064
                            enqueueVacuum(vacuumQueue, requestDeleteBlobFile);
1✔
1065
                        }
1066
                    }
1067

1068
                    // update memory with the new value, and release other spinning threads
1069
                    blobReference.count.set(newCount);
1✔
1070

1071
                    // done!
1072
                    return;
1✔
1073
                }
1074
            }
1075
        } catch (final InterruptedException e) {
×
1076
            // thrown by persistQueue.put or Thread.sleep
1077
            Thread.currentThread().interrupt();
×
1078
            throw new IOException(e);
×
1079
        }
1080
    }
1081

1082
    @Override
1083
    public void backupToArchive(final RawDataBackup backup) throws IOException {
1084
        if (state.get() != State.OPEN) {
1!
1085
            throw new IOException("Blob Store is not open!");
×
1086
        }
1087

1088
        // TODO(AR) should we enter an exclusive backup state?
1089

1090
        // NOTE: do not use try-with-resources here, closing the OutputStream will close the entire backup
1091

1092
        // backup the blob.dbx
1093
        try {
1094
            final OutputStream os = backup.newEntry(fileName(persistentFile));
1✔
1095
            Files.copy(persistentFile, os);
1✔
1096
        } finally {
1✔
1097
            backup.closeEntry();
1✔
1098
        }
1099

1100
        // backup the blob files
1101
        for (final Path blobFile : FileUtils.list(blobDir, Files::isRegularFile)) {
1!
1102
            try {
1103
                final OutputStream os = backup.newEntry(fileName(blobDir) + '/' + fileName(blobFile));
×
1104
                Files.copy(persistentFile, os);
×
1105
            } finally {
×
1106
                backup.closeEntry();
×
1107
            }
1108
        }
1109

1110
        // backup the staging area
1111
        for (final Path blobFile : FileUtils.list(stagingDir, Files::isRegularFile)) {
1!
1112
            try {
1113
                final OutputStream os = backup.newEntry(fileName(blobDir) + '/' + fileName(stagingDir) + '/' + fileName(blobFile));
×
1114
                Files.copy(persistentFile, os);
×
1115
            } finally {
×
1116
                backup.closeEntry();
×
1117
            }
1118
        }
1119
    }
1✔
1120

1121
    @Override
1122
    public void redo(final BlobLoggable blobLoggable) throws LogException {
1123
        try {
1124
            if (blobLoggable instanceof StoreBlobFileLoggable storeBlobFileLoggable) {
1✔
1125
                redoStoreBlobFile(storeBlobFileLoggable.getBlobId(), storeBlobFileLoggable.getStagedUuid());
1✔
1126

1127
            } else if (blobLoggable instanceof UpdateBlobRefCountLoggable updateBlobRefCountLoggable) {
1!
1128
                updateBlogRefCount(updateBlobRefCountLoggable.getBlobId(), updateBlobRefCountLoggable.getNewCount());
1✔
1129
            }
1130
        } catch (final IOException e) {
1✔
1131
            throw new LogException(e.getMessage(), e);
×
1132
        }
1133
    }
1✔
1134

1135
    @Override
1136
    public void undo(final BlobLoggable blobLoggable) throws LogException {
1137
        try {
1138
            if (blobLoggable instanceof StoreBlobFileLoggable storeBlobFileLoggable) {
1✔
1139
                undoStoreBlobFile(storeBlobFileLoggable.getBlobId(), storeBlobFileLoggable.getStagedUuid());
1✔
1140

1141
            } else if (blobLoggable instanceof UpdateBlobRefCountLoggable updateBlobRefCountLoggable) {
1!
1142
                updateBlogRefCount(updateBlobRefCountLoggable.getBlobId(), updateBlobRefCountLoggable.getCurrentCount());
1✔
1143
            }
1144
        } catch (final IOException e) {
1✔
1145
            throw new LogException(e.getMessage(), e);
×
1146
        }
1147
    }
1✔
1148

1149
    /**
1150
     * Recovery - redo: Promotes the Staged Blob File after performing some checks.
1151
     *
1152
     * This is possible because the Staged Blob file is not
1153
     * removed until a checkpoint is written AFTER the transaction
1154
     * was committed.
1155
     *
1156
     * @param blobId the blobId
1157
     * @param stagedUuid The uuid of the staged blob file.
1158
     *
1159
     * @throws IOException if the staged blob file cannot be promoted
1160
     */
1161
    private void redoStoreBlobFile(final BlobId blobId, final String stagedUuid) throws IOException {
1162
        final Path stagedBlobFile = stagingDir.resolve(stagedUuid);
1✔
1163

1164
        // check the staged file exists
1165
        if (!Files.exists(stagedBlobFile)) {
1!
1166
            throw new IOException("Staged Blob File does not exist: " + stagedBlobFile.toAbsolutePath());
×
1167
        }
1168

1169
        // check the staged file has the correct checksum
1170
        final StreamableDigest streamableDigest = digestType.newStreamableDigest();
1✔
1171
        FileUtils.digest(stagedBlobFile, streamableDigest);
1✔
1172
        final String blobFilename = bytesToHex(blobId.getId());
1✔
1173
        if (!Arrays.equals(blobId.getId(), streamableDigest.getMessageDigest())) {
1!
1174
            throw new IOException("Staged Blob File checksum '"
×
1175
                    + bytesToHex(streamableDigest.getMessageDigest()) + "', does not match checksum of blobId ''"
×
1176
                    + blobFilename + "'");
×
1177
        }
1178

1179
        final Path blobFile = blobDir.resolve(blobFilename);
1✔
1180

1181
        Files.copy(stagedBlobFile, blobFile, REPLACE_EXISTING);
1✔
1182
    }
1✔
1183

1184
    /**
1185
     * Recovery - undo: Demotes the Blob File back to the staging area after performing some checks.
1186
     *
1187
     * @param blobId the blobId
1188
     * @param stagedUuid The uuid of the staged blob file.
1189
     *
1190
     * @throws IOException if the blob file cannot be demoted to the staging area
1191
     */
1192
    private void undoStoreBlobFile(final BlobId blobId, final String stagedUuid) throws IOException {
1193
        final String blobFilename = bytesToHex(blobId.getId());
1✔
1194
        final Path blobFile = blobDir.resolve(blobFilename);
1✔
1195

1196
        // check the blob file exists
1197
        if (!Files.exists(blobFile)) {
1!
1198
            throw new IOException("Blob File does not exist: " + blobFile.toAbsolutePath());
×
1199
        }
1200

1201
        final Path stagedBlobFile = stagingDir.resolve(stagedUuid);
1✔
1202

1203
        Files.copy(blobFile, stagedBlobFile, REPLACE_EXISTING);
1✔
1204
    }
1✔
1205

1206
    /**
1207
     * Recovery - redo/undo: sets the reference count of a blob.
1208
     *
1209
     * @param blobId the blobId
1210
     * @param count The reference count to set.
1211
     *
1212
     * @throws IOException if the blob's reference count cannot be set
1213
     */
1214
    private void updateBlogRefCount(final BlobId blobId, final int count) throws IOException {
1215
        buffer.clear();
1✔
1216
        buffer.limit(digestType.getDigestLengthBytes());  // we are only going to read the BlobIds
1✔
1217

1218
        // start immediately after the file header
1219
        channel.position(BLOB_STORE_HEADER_LEN);
1✔
1220

1221
        boolean updatedCount = false;
1✔
1222

1223
        while (channel.read(buffer) > 0) {
1✔
1224
            buffer.flip();
1✔
1225
            final byte[] id = new byte[digestType.getDigestLengthBytes()];
1✔
1226
            buffer.get(id);
1✔
1227
            final BlobId readBlobId = new BlobId(id);
1✔
1228

1229
            if (blobId.equals(readBlobId)) {
1✔
1230

1231
                buffer.clear();
1✔
1232
                buffer.limit(REFERENCE_COUNT_LEN);
1✔
1233
                buffer.putInt(count);
1✔
1234
                buffer.flip();
1✔
1235

1236
                channel.write(buffer);
1✔
1237

1238
                updatedCount = true;
1✔
1239

1240
                break;
1✔
1241
            }
1242

1243
            // skip over the reference count
1244
            channel.position(channel.position() + REFERENCE_COUNT_LEN);
1✔
1245
        }
1246

1247
        /*
1248
         * If we could not update the count of an existing entry, append a new entry to the end of the file.
1249
         * We even include those entries with count = 0, so that their blob files will be cleared up by
1250
         * the next call to compactPersistentReferences
1251
         */
1252
        if (!updatedCount) {
1✔
1253
            buffer.clear();
1✔
1254
            buffer.put(blobId.getId());
1✔
1255
            buffer.putInt(count);
1✔
1256

1257
            buffer.flip();
1✔
1258

1259
            channel.write(buffer);
1✔
1260
        }
1261
    }
1✔
1262

1263
    /**
1264
     * Stages a BLOB file.
1265
     *
1266
     * Writes a BLOB to a file in the Blob Store staging area.
1267
     *
1268
     * @param is data stream for the BLOB.
1269
     * @return The file path, length and checksum of the staged BLOB
1270
     * @throws IOException if an error occurs whilst staging the BLOB.
1271
     */
1272
    private Tuple3<Path, Long, MessageDigest> stage(final InputStream is) throws IOException {
1273
        final Path stageFile = stagingDir.resolve(UUIDGenerator.getUUIDversion4());
1✔
1274
        final CountingInputStream cis = new CountingInputStream(is);
1✔
1275
        final StreamableDigest streamableDigest = digestType.newStreamableDigest();
1✔
1276
        final DigestInputStream dis = new DigestInputStream(cis, streamableDigest);
1✔
1277

1278
        Files.copy(dis, stageFile);
1✔
1279

1280
        return Tuple(stageFile, cis.getByteCount(), streamableDigest.copyMessageDigest());
1✔
1281
    }
1282

1283
    /**
1284
     * Promotes a staged BLOB file to the BLOB store.
1285
     *
1286
     * Copies a staged BLOB file in the Blob Store staging area to
1287
     * the live Blob Store.
1288
     *
1289
     * The staged BLOB will be removed as part of the Journalling
1290
     * and Recovery.
1291
     *
1292
     * @param staged the staged BLOB.
1293
     * @throws IOException if an error occurs whilst promoting the BLOB.
1294
     */
1295
    private void promote(final Tuple3<Path, Long, MessageDigest> staged) throws IOException {
1296
        Files.copy(staged._1, blobDir.resolve(staged._3.toHexString()), REPLACE_EXISTING);
1✔
1297
    }
1✔
1298

1299
    /**
1300
     * Deletes a BLOB file from the Blob Store.
1301
     *
1302
     * @param blobDir the blob directory.
1303
     * @param blobId the identifier of the BLOB file to delete.
1304
     * @param always true if we should always be able to delete the file,
1305
     *     false if the file may not exist.
1306
     *
1307
     * @throws IOException if the file cannot be deleted, for example if {@code always}
1308
     *                     is set to true and the BLOB does not exist.
1309
     */
1310
    private static void deleteBlob(final Path blobDir, final BlobId blobId, final boolean always) throws IOException {
1311
        final Path blobFile = blobDir.resolve(bytesToHex(blobId.getId()));
1✔
1312
        if (always) {
1✔
1313
            Files.delete(blobFile);
1✔
1314
        } else {
1✔
1315
            Files.deleteIfExists(blobFile);
1✔
1316
        }
1317
    }
1✔
1318

1319
    /**
1320
     * Value class which represents the reference
1321
     * count for a blob, the number of active readers,
1322
     * and the offset of its entry in the persistent file.
1323
     */
1324
    static class BlobReference {
1325
        static final int DELETING = -4;
1326
        static final int STAGED = -3;
1327
        static final int PROMOTING = -2;
1328
        static final int UPDATING_COUNT = -1;
1329

1330
        final AtomicInteger count;
1331
        final AtomicInteger readers = new AtomicInteger();
1✔
1332

1333
        static final long NOT_PERSISTED = -1;
1334

1335
        /**
1336
         * Is only read and written from a single
1337
         * thread in {@link PersistentWriter}
1338
         * so no synchronization needed.
1339
         */
1340
        long persistentOffset = NOT_PERSISTED;
1✔
1341

1342
        /**
1343
         * Construct a new Blob Reference which has not yet
1344
         * been persisted.
1345
         *
1346
         * @param count the reference count
1347
         *
1348
         * The persistentOffset will be set to {@link #NOT_PERSISTED}
1349
         */
1350
        public BlobReference(final int count) {
1✔
1351
            this.count = new AtomicInteger(count);
1✔
1352
        }
1✔
1353

1354
        /**
1355
         * Construct a new Blob Reference to a persistent
1356
         * blob.
1357
         *
1358
         * @param count the reference count
1359
         * @param persistentOffset the offset of the blob reference in the persistent file
1360
         */
1361
        public BlobReference(final int count, final long persistentOffset) {
1✔
1362
            this.count = new AtomicInteger(count);
1✔
1363
            this.persistentOffset = persistentOffset;
1✔
1364
        }
1✔
1365
    }
1366

1367
    /**
1368
     * Value class which represents a lease
1369
     * of a Blob's file.
1370
     */
1371
    private static class BlobFileLease {
1372
        final Path path;
1373
        final Runnable release;
1374

1375
        /**
1376
         * @param path the blob file
1377
         * @param release the action to run to release the lease
1378
         */
1379
        public BlobFileLease(final Path path, final Runnable release) {
1✔
1380
            this.path = path;
1✔
1381
            this.release = release;
1✔
1382
        }
1✔
1383
    }
1384

1385
    /**
1386
     * A FilterInputStream which executes an action when
1387
     * the underlying stream is closed.
1388
     */
1389
    public static class OnCloseInputStream extends FilterInputStream {
1390
        private final Runnable closeAction;
1391

1392
        /**
1393
         * Ensures that the close action is only executed once.
1394
         */
1395
        private final AtomicBoolean closed = new AtomicBoolean(false);
1✔
1396

1397
        /**
1398
         * @param in  An input stream.
1399
         * @param closeAction an action to run after the stream is closed.
1400
         */
1401
        public OnCloseInputStream(final InputStream in, final Runnable closeAction) {
1402
            super(in);
1✔
1403
            this.closeAction = closeAction;
1✔
1404
        }
1✔
1405

1406
        @Override
1407
        public int read(final byte[] b) throws IOException {
1408
            return in.read(b);
1✔
1409
        }
1410

1411
        /**
1412
         * First, closes the underlying Input Stream
1413
         * by calling {@code super#close()} and then
1414
         * always executes the {@link #closeAction}.
1415
         *
1416
         * This method is idempotent, which is to say that
1417
         * the operation will only be
1418
         * applied once.
1419
         *
1420
         * @throws IOException if an I/O error occurs.
1421
         */
1422
        @Override
1423
        public void close() throws IOException {
1424
            if (closed.compareAndSet(false, true)) {
1✔
1425
                try {
1426
                    super.close();
1✔
1427
                } finally {
1✔
1428
                    closeAction.run();
1✔
1429
                }
1430
            }
1431
        }
1✔
1432
    }
1433

1434
    /**
1435
     * A Journal and Transaction listener which will execute an action only
1436
     * after the transaction has been completed (aborted or committed) and
1437
     * a checkpoint has been written.
1438
     */
1439
    private static abstract class CommitThenCheckpointListener implements TxnListener, JournalManager.JournalListener {
1✔
1440
        // written from single-thread, read from multiple threads
1441
        private volatile boolean committedOrAborted = false;
1✔
1442

1443
        @Override
1444
        public void commit() {
1445
            committedOrAborted = true;
1✔
1446
        }
1✔
1447

1448
        @Override
1449
        public void abort() {
1450
            committedOrAborted = true;
1✔
1451
        }
1✔
1452

1453
        @Override
1454
        public boolean afterCheckpoint(final long txnId) {
1455
            if (!committedOrAborted) {
×
1456
                /*
1457
                 * we have not yet/committed or aborted
1458
                 * so keep receiving checkpoint events!
1459
                 */
1460
                return true;
×
1461
            }
1462

1463
            execute();
×
1464

1465
            return false;
×
1466
        }
1467

1468
        /**
1469
         * Called after the transaction has completed
1470
         * and a checkpoint has been written.
1471
         */
1472
        public abstract void execute();
1473
    }
1474

1475
    /**
1476
     * Deletes a staged Blob file once the transaction that promoted it has
1477
     * completed and a checkpoint has been written.
1478
     */
1479
    private static class DeleteStagedBlobFile extends CommitThenCheckpointListener {
1480
        private final BlockingQueue<BlobVacuum.Request> vacuumQueue;
1481
        private final BlobVacuum.RequestDeleteStagedBlobFile requestDeleteStagedBlobFile;
1482

1483
        /**
1484
         * @param vacuumQueue the vacuum queue.
1485
         * @param requestDeleteStagedBlobFile the request to delete the staged blob file.
1486
         */
1487
        public DeleteStagedBlobFile(final BlockingQueue<BlobVacuum.Request> vacuumQueue, final BlobVacuum.RequestDeleteStagedBlobFile requestDeleteStagedBlobFile) {
1✔
1488
            this.vacuumQueue = vacuumQueue;
1✔
1489
            this.requestDeleteStagedBlobFile = requestDeleteStagedBlobFile;
1✔
1490
        }
1✔
1491

1492
        @Override
1493
        public void execute() {
1494
            enqueueVacuum(vacuumQueue, requestDeleteStagedBlobFile);
×
1495
        }
×
1496
    }
1497

1498
    /**
1499
     * Schedules a Blob File for deletion once the transaction that removed it
1500
     * has completed and a checkpoint has been written.
1501
     */
1502
    private static class ScheduleDeleteBlobFile extends CommitThenCheckpointListener {
1503
        private final BlockingQueue<BlobVacuum.Request> vacuumQueue;
1504
        private final BlobVacuum.RequestDeleteBlobFile requestDeleteBlobFile;
1505

1506
        /**
1507
         * @param vacuumQueue the vacuum queue.
1508
         * @param requestDeleteBlobFile the request to delete the blob file.
1509
         */
1510
        public ScheduleDeleteBlobFile(final BlockingQueue<BlobVacuum.Request> vacuumQueue,
1✔
1511
                final BlobVacuum.RequestDeleteBlobFile requestDeleteBlobFile) {
1512
            this.vacuumQueue = vacuumQueue;
1✔
1513
            this.requestDeleteBlobFile = requestDeleteBlobFile;
1✔
1514
        }
1✔
1515

1516
        @Override
1517
        public void execute() {
1518
            enqueueVacuum(vacuumQueue, requestDeleteBlobFile);
×
1519
        }
×
1520
    }
1521

1522
    private static void enqueueVacuum(final BlockingQueue<BlobVacuum.Request> vacuumQueue,
1523
            final BlobVacuum.Request request) {
1524
        try {
1525
            /*
1526
             * We offer with timeout because vacuum
1527
             * is best effort rather than essential, anything
1528
             * we cannot vacuum will be cleaned up at next startup
1529
             * either as a part of crash recovery or compaction
1530
             */
1531
            if (!vacuumQueue.offer(request, VACUUM_ENQUEUE_TIMEOUT, TimeUnit.MILLISECONDS)) {
1!
1532
                LOG.error("Timeout, could not not enqueue for vacuum: {}", request);
×
1533
            }
1534
        } catch (final InterruptedException e) {
×
1535
            LOG.error("Interrupted, could not not enqueue for vacuum: {}", request, e);
×
1536
            Thread.currentThread().interrupt();  // restore interrupted status!
×
1537
            return;
×
1538
        }
1539
    }
1✔
1540

1541
    /**
1542
     * The PersistentWriter is designed to be run
1543
     * exclusively on its own single thread for a BlobStore
1544
     * and is solely responsible for writing updates to the
1545
     * persistent blob store file.
1546
     */
1547
    private static class PersistentWriter implements Runnable {
1548

1549
        /**
1550
         * The Poison Pill can be placed on the {@link #persistQueue},
1551
         * when encountered the {@link PersistentWriter} will
1552
         * shutdown.
1553
         */
1554
        public static final Tuple3<BlobId, BlobReference, Integer> POISON_PILL = Tuple(null, null, null);
1✔
1555

1556
        private final BlockingQueue<Tuple3<BlobId, BlobReference, Integer>> persistQueue;
1557
        private final ByteBuffer buffer;
1558
        private final SeekableByteChannel channel;
1559
        private final Runnable abnormalShutdownCallback;
1560

1561
        PersistentWriter(final BlockingQueue<Tuple3<BlobId, BlobReference, Integer>> persistQueue,
1✔
1562
                final ByteBuffer buffer, final SeekableByteChannel channel, final Runnable abnormalShutdownCallback) {
1563
            this.persistQueue = persistQueue;
1✔
1564
            this.buffer = buffer;
1✔
1565
            this.channel = channel;
1✔
1566
            this.abnormalShutdownCallback = abnormalShutdownCallback;
1✔
1567
        }
1✔
1568

1569
        @Override
1570
        public void run() {
1571
            try {
1572
                while (true) {
1✔
1573
                    final Tuple3<BlobId, BlobReference, Integer> blobData = persistQueue.take();
1✔
1574
                    if (blobData == POISON_PILL) {
1✔
1575
                        // if we received the Poison Pill, we should shutdown!
1576
                        break;  // exit
1✔
1577
                    }
1578

1579
                    // write an entry
1580
                    writeEntry(blobData._1, blobData._2, blobData._3);
1✔
1581
                }
1582
            } catch (final InterruptedException e) {
×
1583
                // Restore the interrupted status
1584
                LOG.error("PersistentWriter Shutting down due to interrupt: {}", e.getMessage());
×
1585
                Thread.currentThread().interrupt();
×
1586
                abnormalShutdownCallback.run();
×
1587
            } catch (final IOException e) {
×
1588
                LOG.error("PersistentWriter Shutting down, received: {}", e.getMessage(), e);
×
1589
                abnormalShutdownCallback.run();
×
1590
            }
1591
        }
1✔
1592

1593
        /**
1594
         * Stores the reference count for a blob to the persistent blob store file.
1595
         *
1596
         * When a new reference count is written for the first time it updates
1597
         * the {@link BlobStoreImpl.BlobReference#persistentOffset} with the
1598
         * location of the reference in the persistent file.
1599
         *
1600
         * @param blobId the identifier of the blob.
1601
         * @param blobReference the reference details for the blob
1602
         * @param newCount the new reference count to store.
1603
         *
1604
         * @throws IOException if an error occurs whilst writing the persistent file.
1605
         */
1606
        private void writeEntry(final BlobId blobId, final BlobReference blobReference, final int newCount)
1607
                throws IOException {
1608

1609
            // if new record (i.e. not yet persisted), append to the end of the file
1610
            if (blobReference.persistentOffset == NOT_PERSISTED) {
1✔
1611
                blobReference.persistentOffset = channel.size();
1✔
1612
            }
1613

1614
            channel.position(blobReference.persistentOffset);
1✔
1615

1616
            buffer.clear();
1✔
1617
            buffer.put(blobId.getId());
1✔
1618
            buffer.putInt(newCount);
1✔
1619
            buffer.flip();
1✔
1620

1621
            channel.write(buffer);
1✔
1622
        }
1✔
1623
    }
1624

1625
    /**
1626
     * The BlobVacuum is designed to be run
1627
     * exclusively on its own single thread for a BlobStore
1628
     * and is solely responsible for deleting blob files from
1629
     * the blob file store.
1630
     */
1631
    private static class BlobVacuum implements Runnable {
1632
        private final BlockingQueue<Request> vacuumQueue;
1633

1634
        public BlobVacuum(final BlockingQueue<Request> vacuumQueue) {
1✔
1635
            this.vacuumQueue = vacuumQueue;
1✔
1636
        }
1✔
1637

1638
        @Override
1639
        public void run() {
1640
            try {
1641
                while (true) {
1642
                    final Request request = vacuumQueue.take();
1✔
1643
                    if (!request.service()) {
1!
1644
                        // if the request could not be serviced then enque it so we can try again in future
1645

1646
                        try {
1647
                            if (!vacuumQueue.offer(request, VACUUM_ENQUEUE_TIMEOUT, TimeUnit.MILLISECONDS)) {
×
1648
                                LOG.error("Timeout, could not not enqueue for vacuum: {}", request);
×
1649
                            }
1650
                        } catch (final InterruptedException e) {
×
1651
                            LOG.error("Interrupted, could not not enqueue for vacuum: {}", request, e);
×
1652
                            Thread.currentThread().interrupt();  // restore interrupted status!
×
1653
                            throw e;
×
1654
                        }
1655
                    }
1656
                }
1657
            } catch (final InterruptedException e) {
1✔
1658
                // expected when we are shutting down, only thrown by vacuumQueue.take/offer.
1659
                // Any remaining objects in the queue which we have not yet vacuumed will
1660
                // be taken care of by {@link #compactPersistentReferences(ByteBuffer, Path)
1661
                // when the persistent blob store file is next opened
1662

1663
                // Restore the interrupted status
1664
                Thread.currentThread().interrupt();
1✔
1665
            }
1666
        }
1✔
1667

1668
        /**
1669
         * The type of Vacuum Request
1670
         */
1671
        interface Request extends Comparable<Request> {
1672
            /**
1673
             * @return true if the request was serviced,
1674
             *     false if the request should be re-scheduled.
1675
             */
1676
            boolean service();
1677
        }
1678

1679
        /**
1680
         * Vacuum request for deleting a Blob File for a Blob that has been removed.
1681
         *
1682
         * The Blob File will only be deleted if it has no references and no active readers.
1683
         *
1684
         * As vacuuming happens asynchronously, a new Blob may have been added which
1685
         * has the same de-duplicated Blob File, causing an increase in references,
1686
         * in which case the Blob File will be recycled instead
1687
         * and will not be deleted here.
1688
         */
1689
        public static final class RequestDeleteBlobFile implements Request {
1690
            private final ConcurrentMap<BlobId, BlobReference> references;
1691
            private final Path blobDir;
1692
            private final BlobId blobId;
1693
            private final BlobReference blobReference;
1694

1695
            public RequestDeleteBlobFile(final ConcurrentMap<BlobId, BlobReference> references,
1✔
1696
                    final Path blobDir, final BlobId blobId, final BlobReference blobReference) {
1697
                this.references = references;
1✔
1698
                this.blobDir = blobDir;
1✔
1699
                this.blobId = blobId;
1✔
1700
                this.blobReference = blobReference;
1✔
1701
            }
1✔
1702

1703
            @Override
1704
            public String toString() {
1705
                return "RequestDeleteBlobFile(" + blobId + ")";
×
1706
            }
1707

1708
            @Override
1709
            public int compareTo(final Request other) {
1710
                if (other instanceof RequestDeleteBlobFile) {
1!
1711
                    return ((RequestDeleteBlobFile) other).blobReference.readers.get() - blobReference.readers.get();
1✔
1712
                } else {
1713
                    // This class has higher priority than other classes
1714
                    return 1;
×
1715
                }
1716
            }
1717

1718
            @Override
1719
            public boolean service() {
1720
                // we can only delete the blob file itelf if there are no references
1721
                if (blobReference.count.compareAndSet(0, DELETING)) {
1!
1722

1723
                    // make sure there are no readers still actively reading the blob file
1724
                    if (blobReference.readers.get() == 0) {
1!
1725

1726
                        // no more readers can be taken whilst count == DELETING, so we can delete
1727
                        try {
1728
                            deleteBlob(blobDir, blobId, true);
1✔
1729
                        } catch (final IOException ioe) {
1✔
1730
                            // non-critical error
1731
                            LOG.error("Unable to delete blob file: {}", bytesToHex(blobId.getId()), ioe);
×
1732
                        }
1733

1734
                        // remove from shared map
1735
                        references.remove(blobId);
1✔
1736

1737
                    } else {
1✔
1738
                        // reschedule the blob vacuum for later (when hopefully there are no active readers)
1739
                        return false;
×
1740
                    }
1741

1742
                    // NOTE: DELETING is the last state of a BlobReference#count -- there is no coming back from this!
1743

1744
                } else {
1745
                    /*
1746
                     * no-op: ignore this blob as it now again has active references,
1747
                     * so we don't need to delete it, instead it has been recycled :-)
1748
                     * Therefore we can just continue...
1749
                     */
1750
                }
1751

1752
                // we serviced this request!
1753
                return true;
1✔
1754
            }
1755
        }
1756

1757
        public static final class RequestDeleteStagedBlobFile implements Request {
1758
            private final Path stagingDir;
1759
            private final String stagedBlobUuid;
1760

1761
            public RequestDeleteStagedBlobFile(final Path stagingDir, final String stagedBlobUuid) {
1✔
1762
                this.stagingDir = stagingDir;
1✔
1763
                this.stagedBlobUuid = stagedBlobUuid;
1✔
1764
            }
1✔
1765

1766
            @Override
1767
            public String toString() {
1768
                return "RequestDeleteStagedBlobFile(" + stagedBlobUuid + ")";
×
1769
            }
1770

1771
            @Override
1772
            public int compareTo(final Request other) {
1773
                if (other instanceof RequestDeleteStagedBlobFile) {
×
1774
                    return stagedBlobUuid.compareTo(((RequestDeleteStagedBlobFile)other).stagedBlobUuid);
×
1775
                } else {
1776
                    // This class has lower priority than other classes
1777
                    return -1;
×
1778
                }
1779
            }
1780

1781
            @Override
1782
            public boolean service() {
1783
                final Path stagedBlobFile = stagingDir.resolve(stagedBlobUuid);
1✔
1784
                FileUtils.deleteQuietly(stagedBlobFile);
1✔
1785
                return true;
1✔
1786
            }
1787
        }
1788
    }
1789
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc