• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

evolvedbinary / elemental / 982

29 Apr 2025 08:34PM UTC coverage: 56.409% (+0.007%) from 56.402%
982

push

circleci

adamretter
[feature] Improve README.md badges

28451 of 55847 branches covered (50.94%)

Branch coverage included in aggregate %.

77468 of 131924 relevant lines covered (58.72%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.26
/exist-core/src/main/java/org/exist/storage/recovery/RecoveryManager.java
1
/*
2
 * Elemental
3
 * Copyright (C) 2024, Evolved Binary Ltd
4
 *
5
 * admin@evolvedbinary.com
6
 * https://www.evolvedbinary.com | https://www.elemental.xyz
7
 *
8
 * Use of this software is governed by the Business Source License 1.1
9
 * included in the LICENSE file and at www.mariadb.com/bsl11.
10
 *
11
 * Change Date: 2028-04-27
12
 *
13
 * On the date above, in accordance with the Business Source License, use
14
 * of this software will be governed by the Apache License, Version 2.0.
15
 *
16
 * Additional Use Grant: Production use of the Licensed Work for a permitted
17
 * purpose. A Permitted Purpose is any purpose other than a Competing Use.
18
 * A Competing Use means making the Software available to others in a commercial
19
 * product or service that: substitutes for the Software; substitutes for any
20
 * other product or service we offer using the Software that exists as of the
21
 * date we make the Software available; or offers the same or substantially
22
 * similar functionality as the Software.
23
 *
24
 * NOTE: Parts of this file contain code from 'The eXist-db Authors'.
25
 *       The original license header is included below.
26
 *
27
 * =====================================================================
28
 *
29
 * eXist-db Open Source Native XML Database
30
 * Copyright (C) 2001 The eXist-db Authors
31
 *
32
 * info@exist-db.org
33
 * http://www.exist-db.org
34
 *
35
 * This library is free software; you can redistribute it and/or
36
 * modify it under the terms of the GNU Lesser General Public
37
 * License as published by the Free Software Foundation; either
38
 * version 2.1 of the License, or (at your option) any later version.
39
 *
40
 * This library is distributed in the hope that it will be useful,
41
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
42
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
43
 * Lesser General Public License for more details.
44
 *
45
 * You should have received a copy of the GNU Lesser General Public
46
 * License along with this library; if not, write to the Free Software
47
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
48
 */
49
package org.exist.storage.recovery;
50

51
import java.io.FileNotFoundException;
52
import java.io.IOException;
53
import java.nio.file.Path;
54
import java.util.List;
55
import java.util.function.Consumer;
56
import java.util.function.Function;
57
import java.util.stream.Collectors;
58
import java.util.stream.Stream;
59

60
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
61
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
62
import org.apache.logging.log4j.LogManager;
63
import org.apache.logging.log4j.Logger;
64
import org.exist.storage.DBBroker;
65
import org.exist.storage.BrokerPool;
66
import org.exist.storage.blob.BlobStore;
67
import org.exist.storage.journal.*;
68
import org.exist.storage.sync.Sync;
69
import org.exist.storage.txn.Checkpoint;
70
import org.exist.util.FileUtils;
71
import org.exist.util.ProgressBar;
72
import com.evolvedbinary.j8fu.function.SupplierE;
73
import org.exist.util.sanity.SanityCheck;
74

75
import javax.annotation.Nullable;
76

77
/**
78
 * Database recovery. This class is used once during startup to check
79
 * if the database is in a consistent state. If not, the class attempts to recover
80
 * the database from the journalling log.
81
 * 
82
 * @author wolf
83
 */
84
public class RecoveryManager {
85
        
86
        private final static Logger LOG = LogManager.getLogger(RecoveryManager.class);
1✔
87

88
    private final DBBroker broker;
89
    private final JournalRecoveryAccessor journalRecovery;
90
    private final boolean restartOnError;
91
    private final boolean hideProgressBar;
92

93
    public RecoveryManager(final DBBroker broker, final JournalManager journalManager, final boolean restartOnError) {
1✔
94
        this.broker = broker;
1✔
95
        this.journalRecovery = journalManager.getRecoveryAccessor(this);
1✔
96
        this.restartOnError = restartOnError;
1✔
97
        this.hideProgressBar = Boolean.getBoolean("exist.recovery.progressbar.hide");
1✔
98
        }
1✔
99

100
        /**
101
         * Checks if the database is in a consistent state. If not, start a recovery run.
102
         * 
103
         * The method scans the last log file and tries to find the last checkpoint
104
         * record. If the checkpoint record is the last record in the file,
105
         * the database was closed cleanly and is in a consistent state. If not, a
106
         * recovery run is started beginning at the last checkpoint found.
107
         *  
108
         * @throws LogException Reading of journal failed.
109
     * @return if recover was successful
110
         */
111
        public boolean recover() throws LogException {
112
        boolean recoveryRun = false;
1✔
113
                final List<Path> files;
114
        try(final Stream<Path> fileStream = journalRecovery.getFiles.get()) {
1✔
115
            files = fileStream.collect(Collectors.toList());
1✔
116
        } catch(final IOException ioe) {
×
117
            throw new LogException("Unable to find journal files in data dir", ioe);
×
118
        }
119
        // find the last log file in the data directory
120
                final short lastNum = Journal.findLastFile(files.stream());
1✔
121
                if (-1 < lastNum) {
1✔
122
            // load the last log file
123
                        final Path last = journalRecovery.getFile.apply(lastNum);
1✔
124
                        // scan the last log file and record the last checkpoint found
125
            try (JournalReader reader = new JournalReader(broker, last, lastNum)) {
1✔
126
                // try to read the last log record to see if it is a checkpoint
127
                boolean checkpointFound = false;
1✔
128
                try {
129
                    final Loggable lastLog = reader.lastEntry();
1✔
130
                    if (lastLog != null && lastLog.getLogType() == LogEntryTypes.CHECKPOINT) {
1!
131
                        final Checkpoint checkpoint = (Checkpoint) lastLog;
1✔
132
                        // Found a checkpoint. To be sure it is indeed a valid checkpoint
133
                        // record, we compare the LSN stored in it with the current LSN.
134
                        if (checkpoint.getStoredLsn().equals(checkpoint.getLsn())) {
1!
135
                            checkpointFound = true;
1✔
136
                            LOG.debug("Database is in clean state. Last checkpoint: {}", checkpoint.getDateString());
1✔
137
                        }
138
                    }
139
                } catch (final LogException e) {
1✔
140
                    LOG.info("Reading last journal log entry failed: {}. Will scan the log...", e.getMessage());
×
141
                    // if an exception occurs at this point, the journal file is probably incomplete,
142
                    // which indicates a db crash
143
                    checkpointFound = false;
×
144
                }
145
                if (!checkpointFound) {
1✔
146
                    LOG.info("Unclean shutdown detected. Scanning journal...");
1✔
147
                    broker.getBrokerPool().reportStatus("Unclean shutdown detected. Scanning log...");
1✔
148
                    reader.positionFirst();
1✔
149
                    final Long2ObjectMap<Loggable> txnsStarted = new Long2ObjectOpenHashMap<>();
1✔
150
                    Checkpoint lastCheckpoint = null;
1✔
151
                    Lsn lastLsn = Lsn.LSN_INVALID;
1✔
152
                    Loggable next;
153
                    try {
154
                        final long lastSize = FileUtils.sizeQuietly(last);
1✔
155
                        @Nullable final ProgressBar scanProgressBar = hideProgressBar ? null : new ProgressBar("Scanning journal ", lastSize);
1!
156
                        while ((next = reader.nextEntry()) != null) {
1✔
157
//                                LOG.debug(next.dump());
158
                            if (next.getLogType() == LogEntryTypes.TXN_START) {
1✔
159
                                // new transaction starts: add it to the transactions table
160
                                txnsStarted.put(next.getTransactionId(), next);
1✔
161
                            } else if (next.getLogType() == LogEntryTypes.TXN_ABORT) {
1!
162
                                // transaction aborted: remove it from the transactions table
163
                                txnsStarted.remove(next.getTransactionId());
×
164
                            } else if (next.getLogType() == LogEntryTypes.CHECKPOINT) {
1✔
165
                                txnsStarted.clear();
1✔
166
                                lastCheckpoint = (Checkpoint) next;
1✔
167
                            }
168
                            lastLsn = next.getLsn();
1✔
169

170
                            if (scanProgressBar != null) {
1!
171
                                scanProgressBar.set(next.getLsn().getOffset());
×
172
                            }
173
                        }
174

175
                        if (scanProgressBar != null) {
1!
176
                            scanProgressBar.set(lastSize);  // 100%
×
177
                        }
178
                    } catch (final LogException e) {
×
179
                        if (LOG.isDebugEnabled()) {
×
180
                            LOG.debug("Caught exception while reading log", e);
×
181
                        }
182
                        LOG.warn("Last readable journal log entry lsn: {}", lastLsn);
×
183
                    }
184

185
                    // if the last checkpoint record is not the last record in the file
186
                    // we need a recovery.
187
                    if ((lastCheckpoint == null || !lastCheckpoint.getLsn().equals(lastLsn)) &&
1!
188
                        !txnsStarted.isEmpty()) {
1!
189
                        LOG.info("Dirty transactions: {}", txnsStarted.size());
1✔
190
                        // starting recovery: reposition the log reader to the last checkpoint
191
                        if (lastCheckpoint == null) {
1✔
192
                            reader.positionFirst();
1✔
193
                        } else {
1✔
194
                            reader.position(lastCheckpoint.getLsn());
1✔
195
                            next = reader.nextEntry();
1✔
196
                        }
197
                        recoveryRun = true;
1✔
198
                        try {
199
                            LOG.info("Running recovery...");
1✔
200
                            broker.getBrokerPool().reportStatus("Running recovery...");
1✔
201

202

203
                            try (final BlobStore blobStore = broker.getBrokerPool().getBlobStore()) {
1✔
204
                                try {
205
                                    blobStore.openForRecovery();
1✔
206
                                } catch (final FileNotFoundException e) {
1✔
207
                                    LOG.warn(e.getMessage(), e);
×
208
                                } catch (final IOException e) {
×
209
                                    throw new LogException("Unable to Open the Blob Store for Recovery: " + e.getMessage(), e);
×
210
                                }
211

212
                                doRecovery(txnsStarted.size(), last, reader, lastLsn);
1✔
213

214
                            } catch (final IOException e) {
×
215
                                LOG.error("Error whilst closing the Blob Store after recovery: {}", e.getMessage(), e);
×
216
                            }
217
                        } catch (final LogException e) {
×
218
                            // if restartOnError == true, we try to bring up the database even if there
219
                            // are errors. Otherwise, an exception is thrown, which will stop the db initialization
220
                            broker.getBrokerPool().reportStatus(BrokerPool.SIGNAL_ABORTED);
×
221
                            if (restartOnError) {
×
222
                                LOG.error("Aborting recovery. Elemental detected an error during recovery. This may not be fatal. Database will start up, but corruptions are likely.");
×
223
                            } else {
×
224
                                LOG.error("Aborting recovery. Elemental detected an error during recovery. This may not be fatal. Please consider running a consistency check via the export tool and create a backup if problems are reported. The db should come up again if you restart it.");
×
225
                                throw e;
×
226
                            }
227
                        }
228
                    } else {
229
                        LOG.info("Database is in clean state. Nothing to recover from the journal.");
×
230
                    }
231
                }
232
            } finally {
233
                // remove .log files from directory even if recovery failed.
234
                // Re-applying them on a second start up attempt would definitely damage the db, so we better
235
                // delete them before user tries to launch again.
236
                cleanDirectory(files.stream());
1✔
237
                if (recoveryRun) {
1✔
238
                    broker.repairPrimary();
1✔
239
                    broker.sync(Sync.MAJOR);
1✔
240
                }
241
            }
242
                }
243
        journalRecovery.setCurrentFileNum.accept(lastNum);
1✔
244
        journalRecovery.switchFiles.get();
1✔
245

246
        return recoveryRun;
1✔
247
        }
248

249
    public class JournalRecoveryAccessor {
250
        final Consumer<Boolean> setInRecovery;
251
        final SupplierE<Stream<Path>, IOException> getFiles;
252
        final Function<Short, Path> getFile;
253
        final Consumer<Short> setCurrentFileNum;
254
        final SupplierE<Void, LogException> switchFiles;
255

256

257
        public JournalRecoveryAccessor(final Consumer<Boolean> setInRecovery,
1✔
258
                final SupplierE<Stream<Path>, IOException> getFiles, final Function<Short, Path> getFile,
259
                final Consumer<Short> setCurrentFileNum, final SupplierE<Void, LogException> switchFiles) {
1✔
260
            this.setInRecovery = setInRecovery;
1✔
261
            this.getFiles = getFiles;
1✔
262
            this.getFile = getFile;
1✔
263
            this.setCurrentFileNum = setCurrentFileNum;
1✔
264
            this.switchFiles = switchFiles;
1✔
265
        }
1✔
266
    }
267

268
    /**
269
     * Called by {@link #recover()} to do the actual recovery.
270
     *
271
     * @param txnCount
272
     * @param last
273
     * @param reader
274
     * @param lastLsn
275
     *
276
     * @throws LogException
277
     */
278
    private void doRecovery(final int txnCount, final Path last, final JournalReader reader, final Lsn lastLsn) throws LogException {
279
        if (LOG.isInfoEnabled()) {
1!
280
            LOG.info("Running recovery ...");
1✔
281
        }
282
        journalRecovery.setInRecovery.accept(true);
1✔
283

284
        try {
285
            // map to track running transactions
286
            final Long2ObjectMap<Loggable> runningTxns = new Long2ObjectOpenHashMap<>();
1✔
287

288
            // ------- REDO ---------
289
            if (LOG.isInfoEnabled())
1!
290
                {
291
                    LOG.info("First pass: redoing {} transactions...", txnCount);}
1✔
292
            Loggable next = null;
1✔
293
            int redoCnt = 0;
1✔
294
            try {
295
                final long lastSize = FileUtils.sizeQuietly(last);
1✔
296
                @Nullable final ProgressBar redoProgressBar = hideProgressBar ? null : new ProgressBar("Redo ", lastSize);
1!
297
                while ((next = reader.nextEntry()) != null) {
1!
298
                    SanityCheck.ASSERT(next.getLogType() != LogEntryTypes.CHECKPOINT,
1!
299
                            "Found a checkpoint during recovery run! This should not ever happen.");
1✔
300
                    if (next.getLogType() == LogEntryTypes.TXN_START) {
1✔
301
                        // new transaction starts: add it to the transactions table
302
                        runningTxns.put(next.getTransactionId(), next);
1✔
303
                    } else if (next.getLogType() == LogEntryTypes.TXN_COMMIT) {
1✔
304
                        // transaction committed: remove it from the transactions table
305
                        runningTxns.remove(next.getTransactionId());
1✔
306
                        redoCnt++;
1✔
307
                    } else if (next.getLogType() == LogEntryTypes.TXN_ABORT) {
1!
308
                        // transaction aborted: remove it from the transactions table
309
                        runningTxns.remove(next.getTransactionId());
×
310
                    }
311
        //            LOG.debug("Redo: " + next.dump());
312
                    // redo the log entry
313
                    next.redo();
1✔
314

315
                    if (redoProgressBar != null) {
1!
316
                        redoProgressBar.set(next.getLsn().getOffset());
×
317
                    }
318

319
                    if (next.getLsn().equals(lastLsn)) {
1✔
320
                        // last readable entry reached. Stop here.
321
                        break;
1✔
322
                    }
323
                }
324

325
                if (redoProgressBar != null) {
1!
326
                    redoProgressBar.set(lastSize);  // 100% done
×
327
                }
328
            } catch (final Exception e) {
×
329
                LOG.error("Exception caught while redoing transactions. Aborting recovery to avoid possible damage. " +
×
330
                    "Before starting again, make sure to run a check via the emergency export tool.", e);
×
331
                if (next != null)
×
332
                    {
333
                        LOG.info("Log entry that caused the exception: {}", next.dump());}
×
334
                throw new LogException("Recovery aborted. ");
×
335
            } finally {
336
                LOG.info("Redo processed {} out of {} transactions.", redoCnt, txnCount);
1✔
337
            }
338

339
            // ------- UNDO ---------
340
            if (LOG.isInfoEnabled())
1!
341
                {
342
                    LOG.info("Second pass: undoing dirty transactions. Uncommitted transactions: {}", runningTxns.size());}
1✔
343
            // see if there are uncommitted transactions pending
344
            if (!runningTxns.isEmpty()) {
1✔
345
                // do a reverse scan of the log, undoing all uncommitted transactions
346
                try {
347
                    final long lastSize = FileUtils.sizeQuietly(last);
1✔
348
                    final ProgressBar undoProgressBar = hideProgressBar ? null : new ProgressBar("Undo ", lastSize);
1!
349
                    while ((next = reader.previousEntry()) != null) {
1!
350
                        if (next.getLogType() == LogEntryTypes.TXN_START) {
1✔
351
                            if (runningTxns.get(next.getTransactionId()) != null) {
1✔
352
                                runningTxns.remove(next.getTransactionId());
1✔
353
                                if (runningTxns.isEmpty()) {
1✔
354
                                    // all dirty transactions undone
355
                                    break;
1✔
356
                                }
357
                            }
358
                        } else if (next.getLogType() == LogEntryTypes.TXN_COMMIT) {
1✔
359
                            // ignore already committed transaction
360
                        } else if (next.getLogType() == LogEntryTypes.CHECKPOINT) {
1!
361
                            // found last checkpoint: undo is completed
362
                            break;
×
363
                        }
364

365
                        // undo the log entry if it belongs to an uncommitted transaction
366
                        if (runningTxns.get(next.getTransactionId()) != null) {
1✔
367
    //                                        LOG.debug("Undo: " + next.dump());
368
                            next.undo();
1✔
369
                        }
370

371
                        if (undoProgressBar != null) {
1!
372
                            undoProgressBar.set(lastSize - next.getLsn().getOffset());
×
373
                        }
374
                    }
375

376
                    if (undoProgressBar != null) {
1!
377
                        undoProgressBar.set(lastSize);   // 100% done
×
378
                    }
379
                } catch (final Exception e) {
×
380
                    LOG.warn("Exception caught while undoing dirty transactions. Remaining transactions to be undone: {}. Aborting recovery to avoid possible damage. Before starting again, make sure to run a check via the emergency export tool.", runningTxns.size(), e);
×
381
                    if (next != null)
×
382
                        {
383
                            LOG.warn("Log entry that caused the exception: {}", next.dump());}
×
384
                    throw new LogException("Recovery aborted", e);
×
385
                }
386
            }
387
        } finally {
388
            broker.sync(Sync.MAJOR);
1✔
389
            journalRecovery.setInRecovery.accept(false);
1✔
390
        }
391
    }
1✔
392
    
393
        private void cleanDirectory(final Stream<Path> files) {
394
        files.forEach(FileUtils::deleteQuietly);
1✔
395
        }
1✔
396
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc