• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

evolvedbinary / elemental / 982

29 Apr 2025 08:34PM UTC coverage: 56.409% (+0.007%) from 56.402%
982

push

circleci

adamretter
[feature] Improve README.md badges

28451 of 55847 branches covered (50.94%)

Branch coverage included in aggregate %.

77468 of 131924 relevant lines covered (58.72%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.95
/exist-core/src/main/java/org/exist/storage/journal/JournalReader.java
1
/*
2
 * Elemental
3
 * Copyright (C) 2024, Evolved Binary Ltd
4
 *
5
 * admin@evolvedbinary.com
6
 * https://www.evolvedbinary.com | https://www.elemental.xyz
7
 *
8
 * Use of this software is governed by the Business Source License 1.1
9
 * included in the LICENSE file and at www.mariadb.com/bsl11.
10
 *
11
 * Change Date: 2028-04-27
12
 *
13
 * On the date above, in accordance with the Business Source License, use
14
 * of this software will be governed by the Apache License, Version 2.0.
15
 *
16
 * Additional Use Grant: Production use of the Licensed Work for a permitted
17
 * purpose. A Permitted Purpose is any purpose other than a Competing Use.
18
 * A Competing Use means making the Software available to others in a commercial
19
 * product or service that: substitutes for the Software; substitutes for any
20
 * other product or service we offer using the Software that exists as of the
21
 * date we make the Software available; or offers the same or substantially
22
 * similar functionality as the Software.
23
 */
24
package org.exist.storage.journal;
25

26
import net.jpountz.xxhash.StreamingXXHash64;
27
import net.jpountz.xxhash.XXHashFactory;
28
import org.apache.logging.log4j.LogManager;
29
import org.apache.logging.log4j.Logger;
30
import org.exist.storage.DBBroker;
31
import org.exist.util.ByteConversion;
32

33
import javax.annotation.Nullable;
34
import java.io.IOException;
35
import java.nio.ByteBuffer;
36
import java.nio.channels.SeekableByteChannel;
37
import java.nio.file.Files;
38
import java.nio.file.Path;
39

40
import static java.nio.file.StandardOpenOption.READ;
41
import static org.exist.storage.journal.Journal.*;
42

43
/**
44
 * Read log entries from the journal file. This class is used during recovery to scan the
45
 * last journal file. It uses a memory-mapped byte buffer on the file.
46
 * Journal entries can be read forward (during redo) or backward (during undo).
47
 *
48
 * @author wolf
49
 */
50
public class JournalReader implements AutoCloseable {
51

52
    private static final Logger LOG = LogManager.getLogger(JournalReader.class);
1✔
53

54
    private final DBBroker broker;
55
    private final short fileNumber;
56
    private final ByteBuffer header = ByteBuffer.allocateDirect(LOG_ENTRY_HEADER_LEN);
1✔
57
    private ByteBuffer payload = ByteBuffer.allocateDirect(8192);  // 8 KB
1✔
58
    @Nullable
59
    private SeekableByteChannel fc;
60

61
    private final StreamingXXHash64 xxHash64 = XXHashFactory.fastestInstance().newStreamingHash64(Journal.XXHASH64_SEED);
1✔
62

63
    /**
64
     * Opens the specified file for reading.
65
     *
66
     * @param broker     the database broker
67
     * @param file       the journal file
68
     * @param fileNumber the number of the journal file
69
     * @throws LogException if the journal cannot be opened
70
     */
71
    public JournalReader(final DBBroker broker, final Path file, final short fileNumber) throws LogException {
1✔
72
        this.broker = broker;
1✔
73
        this.fileNumber = fileNumber;
1✔
74
        try {
75
            this.fc = Files.newByteChannel(file, READ);
1✔
76
            validateJournalHeader(file, fc);
1✔
77
        } catch (final IOException e) {
1✔
78
            close();
×
79
            throw new LogException("Failed to read journal file " + file.toAbsolutePath().toString(), e);
×
80
        }
81
    }
1✔
82

83
    private void validateJournalHeader(final Path file, final SeekableByteChannel fc) throws IOException, LogException {
84
        // read the magic number
85
        final ByteBuffer buf = ByteBuffer.allocateDirect(JOURNAL_HEADER_LEN);
1✔
86
        fc.read(buf);
1✔
87
        buf.flip();
1✔
88

89
        // check the magic number
90
        final boolean validMagic =
1✔
91
                buf.get() == JOURNAL_MAGIC_NUMBER[0]
1!
92
                && buf.get() == JOURNAL_MAGIC_NUMBER[1]
1!
93
                && buf.get() == JOURNAL_MAGIC_NUMBER[2]
1!
94
                && buf.get() == JOURNAL_MAGIC_NUMBER[3];
1!
95

96
        if (!validMagic) {
1!
97
            throw new LogException("File was not recognised as a valid Elemental journal file: " + file.toAbsolutePath().toString());
×
98
        }
99

100
        // check the version of the journal format
101
        final short storedVersion = ByteConversion.byteToShortH(new byte[] {buf.get(), buf.get()}, 0);
1✔
102
        final boolean validVersion =
1✔
103
                storedVersion == JOURNAL_VERSION;
1!
104

105
        if (!validVersion) {
1!
106
            throw new LogException("Journal file was version " + storedVersion + ", but required version " + JOURNAL_VERSION + ": " + file.toAbsolutePath().toString());
×
107
        }
108
    }
1✔
109

110
    /**
111
     * Returns the next entry found from the current position.
112
     *
113
     * @return the next entry, or null if there are no more entries.
114
     * @throws LogException if an entry could not be read due to an inconsistency on disk.
115
     */
116
    public @Nullable
117
    Loggable nextEntry() throws LogException {
118
        try {
119
            checkOpen();
1✔
120

121
            // are we at the end of the journal?
122
            if (fc.position() + LOG_ENTRY_BASE_LEN > fc.size()) {
1✔
123
                return null;
1✔
124
            }
125
        } catch (final IOException e) {
×
126
            throw new LogException("Unable to check journal position and size: " + e.getMessage(), e);
×
127
        }
128

129
        return readEntry();
1✔
130
    }
131

132
    /**
133
     * Returns the previous entry found by scanning backwards from the current position.
134
     *
135
     * @return the previous entry, or null of there was no previous entry.
136
     * @throws LogException if an entry could not be read due to an inconsistency on disk.
137
     */
138
    public @Nullable
139
    Loggable previousEntry() throws LogException {
140
        try {
141
            checkOpen();
1✔
142

143
            // is there a previous entry to read?
144
            if (fc.position() < JOURNAL_HEADER_LEN + LOG_ENTRY_BASE_LEN) {
1!
145
                return null;
×
146
            }
147

148
            // go back 8 bytes (checksum length) + 2 bytes (backLink length) and read the backLink (2 bytes) of the last entry
149
            fc.position(fc.position() - LOG_ENTRY_CHECKSUM_LEN - LOG_ENTRY_BACK_LINK_LEN);
1✔
150
            header.clear().limit(LOG_ENTRY_BACK_LINK_LEN);
1✔
151
            final int read = fc.read(header);
1✔
152
            if (read != LOG_ENTRY_BACK_LINK_LEN) {
1!
153
                throw new LogException("Unable to read journal entry back-link!");
×
154
            }
155
            header.flip();
1✔
156
            final short backLink = header.getShort();
1✔
157

158
            // position the channel to the start of the previous entry and mark it
159
            final long prevStart = fc.position() - LOG_ENTRY_BACK_LINK_LEN - backLink;
1✔
160
            fc.position(prevStart);
1✔
161
            final Loggable loggable = readEntry();
1✔
162

163
            // reset to the mark
164
            fc.position(prevStart);
1✔
165
            return loggable;
1✔
166
        } catch (final IOException e) {
×
167
            throw new LogException("Fatal error while reading previous journal entry: " + e.getMessage(), e);
×
168
        }
169
    }
170

171
    /**
172
     * Returns the last entry in the journal.
173
     *
174
     * @return the last entry in the journal, or null if there are no entries in the journal.
175
     * @throws LogException if an entry could not be read due to an inconsistency on disk.
176
     */
177
    public @Nullable
178
    Loggable lastEntry() throws LogException {
179
        try {
180
            checkOpen();
1✔
181
            positionLast();
1✔
182
            return previousEntry();
1✔
183
        } catch (final IOException e) {
×
184
            throw new LogException("Fatal error while reading last journal entry: " + e.getMessage(), e);
×
185
        }
186
    }
187

188
    /**
189
     * Read the current entry from the journal.
190
     *
191
     * @return The entry, or null if there is no entry.
192
     * @throws LogException if an entry could not be read due to an inconsistency on disk.
193
     */
194
    private @Nullable
195
    Loggable readEntry() throws LogException {
196
        try {
197
            final Lsn lsn = new Lsn(fileNumber, fc.position() + 1);
1✔
198

199
            // read the entry header
200
            header.clear();
1✔
201
            int read = fc.read(header);
1✔
202
            if (read <= 0) {
1!
203
                return null;
×
204
            }
205
            if (read != LOG_ENTRY_HEADER_LEN) {
1!
206
                throw new LogException("Incomplete journal entry header found, expected  "
×
207
                        + LOG_ENTRY_HEADER_LEN + " bytes, but found " + read + " bytes");
×
208
            }
209
            header.flip();
1✔
210

211
            // prepare the checksum for the header
212
            xxHash64.reset();
1✔
213
            if (header.hasArray()) {
1!
214
                xxHash64.update(header.array(), 0, LOG_ENTRY_HEADER_LEN);
×
215
            } else {
×
216
                final int mark = header.position();
1✔
217
                header.position(0);
1✔
218
                final byte buf[] = new byte[LOG_ENTRY_HEADER_LEN];
1✔
219
                header.get(buf);
1✔
220
                xxHash64.update(buf, 0, LOG_ENTRY_HEADER_LEN);
1✔
221
                header.position(mark);
1✔
222
            }
223

224
            final byte entryType = header.get();
1✔
225
            final long transactId = header.getLong();
1✔
226
            final short size = header.getShort();
1✔
227
            if (fc.position() + size > fc.size()) {
1!
228
                throw new LogException("Invalid length");
×
229
            }
230

231
            final Loggable loggable = LogEntryTypes.create(entryType, broker, transactId);
1✔
232
            if (loggable == null) {
1!
233
                throw new LogException("Invalid log entry: " + entryType + "; size: " + size + "; id: "
×
234
                        + transactId + "; at: " + lsn);
×
235
            }
236
            loggable.setLsn(lsn);
1✔
237

238
            final int remainingEntryBytes = size + LOG_ENTRY_BACK_LINK_LEN + LOG_ENTRY_CHECKSUM_LEN;
1✔
239

240
            if (remainingEntryBytes > payload.capacity()) {
1!
241
                // resize the payload buffer
242
                payload = ByteBuffer.allocateDirect(remainingEntryBytes);
×
243
            }
244
            payload.clear().limit(remainingEntryBytes);
1✔
245
            read = fc.read(payload);
1✔
246
            if (read < remainingEntryBytes) {
1!
247
                throw new LogException("Incomplete log entry found!");
×
248
            }
249
            payload.flip();
1✔
250

251
            // read entry data
252
            loggable.read(payload);
1✔
253

254
            // read entry backLink
255
            final short backLink = payload.getShort();
1✔
256
            if (backLink != size + LOG_ENTRY_HEADER_LEN) {
1!
257
                LOG.error("Bad pointer to previous: backLink = {}; size = {}; transactId = {}", backLink, size, transactId);
×
258
                throw new LogException("Bad pointer to previous in entry: " + loggable.dump());
×
259
            }
260

261
            // update the checksum for the entry data and backLink
262
            if (payload.hasArray()) {
1!
263
                xxHash64.update(payload.array(), 0, size + LOG_ENTRY_BACK_LINK_LEN);
×
264
            } else {
×
265
                final int mark = payload.position();
1✔
266
                payload.position(0);
1✔
267
                final byte buf[] = new byte[size + LOG_ENTRY_BACK_LINK_LEN];
1✔
268
                payload.get(buf);
1✔
269
                xxHash64.update(buf, 0, size + LOG_ENTRY_BACK_LINK_LEN);
1✔
270
                payload.position(mark);
1✔
271
            }
272

273
            // read the entry checksum
274
            final long checksum = payload.getLong();
1✔
275

276
            // verify the checksum
277
            final long calculatedChecksum = xxHash64.getValue();
1✔
278
            if (checksum != calculatedChecksum) {
1!
279
                throw new LogException("Checksum mismatch whilst reading log entry. read=" + checksum + " calculated=" + calculatedChecksum);
×
280
            }
281

282
            return loggable;
1✔
283
        } catch (final IOException e) {
×
284
            throw new LogException(e.getMessage(), e);
×
285
        }
286
    }
287

288
    /**
289
     * Re-position the file position so it points to the start of the entry
290
     * with the given LSN.
291
     *
292
     * @param lsn the log sequence number
293
     * @throws LogException if the journal file cannot be re-positioned
294
     */
295
    public void position(final Lsn lsn) throws LogException {
296
        try {
297
            checkOpen();
1✔
298
            fc.position(lsn.getOffset() - 1);
1✔
299
        } catch (final IOException e) {
1✔
300
            throw new LogException("Fatal error while seeking journal: " + e.getMessage(), e);
×
301
        }
302
    }
1✔
303

304
    /**
305
     * Re-position the file position so it points to the first entry.
306
     *
307
     * @throws LogException if the journal file cannot be re-positioned
308
     */
309
    public void positionFirst() throws LogException {
310
        try {
311
            checkOpen();
1✔
312
            fc.position(JOURNAL_HEADER_LEN);
1✔
313
        } catch (final IOException e) {
1✔
314
            throw new LogException("Fatal error while seeking first journal entry: " + e.getMessage(), e);
×
315
        }
316
    }
1✔
317

318
    /**
319
     * Re-position the file position so it points to the last entry.
320
     *
321
     * @throws LogException if the journal file cannot be re-positioned
322
     */
323
    public void positionLast() throws LogException {
324
        try {
325
            checkOpen();
1✔
326
            fc.position(fc.size());
1✔
327
        } catch (final IOException e) {
1✔
328
            throw new LogException("Fatal error while seeking last journal entry: " + e.getMessage(), e);
×
329
        }
330
    }
1✔
331

332
    private void checkOpen() throws IOException {
333
        if (fc == null) {
1!
334
            throw new IOException("Journal file is closed");
×
335
        }
336
    }
1✔
337

338
    @Override
339
    public void close() {
340
        try {
341
            if (fc != null) {
1!
342
                fc.close();
1✔
343
            }
344
        } catch (final IOException e) {
1✔
345
            LOG.warn(e.getMessage(), e);
×
346
        }
347
        fc = null;
1✔
348
    }
1✔
349
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc