• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

evolvedbinary / elemental / 982

29 Apr 2025 08:34PM UTC coverage: 56.409% (+0.007%) from 56.402%
982

push

circleci

adamretter
[feature] Improve README.md badges

28451 of 55847 branches covered (50.94%)

Branch coverage included in aggregate %.

77468 of 131924 relevant lines covered (58.72%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.6
/exist-core/src/main/java/org/exist/storage/journal/JournalManager.java
1
/*
2
 * Elemental
3
 * Copyright (C) 2024, Evolved Binary Ltd
4
 *
5
 * admin@evolvedbinary.com
6
 * https://www.evolvedbinary.com | https://www.elemental.xyz
7
 *
8
 * Use of this software is governed by the Business Source License 1.1
9
 * included in the LICENSE file and at www.mariadb.com/bsl11.
10
 *
11
 * Change Date: 2028-04-27
12
 *
13
 * On the date above, in accordance with the Business Source License, use
14
 * of this software will be governed by the Apache License, Version 2.0.
15
 *
16
 * Additional Use Grant: Production use of the Licensed Work for a permitted
17
 * purpose. A Permitted Purpose is any purpose other than a Competing Use.
18
 * A Competing Use means making the Software available to others in a commercial
19
 * product or service that: substitutes for the Software; substitutes for any
20
 * other product or service we offer using the Software that exists as of the
21
 * date we make the Software available; or offers the same or substantially
22
 * similar functionality as the Software.
23
 */
24
package org.exist.storage.journal;
25

26
import net.jcip.annotations.GuardedBy;
27
import net.jcip.annotations.ThreadSafe;
28
import org.apache.logging.log4j.LogManager;
29
import org.apache.logging.log4j.Logger;
30
import org.exist.EXistException;
31
import org.exist.storage.BrokerPool;
32
import org.exist.storage.BrokerPoolService;
33
import org.exist.storage.BrokerPoolServiceException;
34
import org.exist.storage.recovery.RecoveryManager;
35
import org.exist.util.Configuration;
36
import org.exist.util.ReadOnlyException;
37

38
import java.nio.file.Path;
39
import java.util.List;
40
import java.util.Optional;
41
import java.util.concurrent.CopyOnWriteArrayList;
42

43
/**
44
 * Journal Manager just adds some light-weight
45
 * wrapping around {@link Journal}.
46
 *
47
 * @author <a href="mailto:adam@evolvedbinary.com">Adam Retter</a>
48
 */
49
@ThreadSafe
50
public class JournalManager implements BrokerPoolService {
1✔
51
    private static final Logger LOG = LogManager.getLogger(JournalManager.class);
1✔
52

53
    @GuardedBy("this") private Path journalDir;
54
    @GuardedBy("this") private boolean groupCommits;
55
    // package-private accessibility for testing
56
    @GuardedBy("this") Journal journal;
57
    @GuardedBy("this") private boolean journallingDisabled = false;
1✔
58
    @GuardedBy("this") private boolean initialized = false;
1✔
59

60
    private final List<JournalListener> journalListeners = new CopyOnWriteArrayList<>();
1✔
61

62
    @Override
63
    public synchronized void configure(final Configuration configuration) {
64
        this.journalDir = (Path) Optional.ofNullable(configuration.getProperty(Journal.PROPERTY_RECOVERY_JOURNAL_DIR))
1✔
65
                .orElse(configuration.getProperty(BrokerPool.PROPERTY_DATA_DIR));
1✔
66
        this.groupCommits = configuration.getProperty(BrokerPool.PROPERTY_RECOVERY_GROUP_COMMIT, false);
1✔
67
        if (LOG.isDebugEnabled()) {
1!
68
            LOG.debug("GroupCommits = {}", groupCommits);
×
69
        }
70
    }
1✔
71

72
    @Override
73
    public synchronized void prepare(final BrokerPool pool) throws BrokerPoolServiceException {
74
        if (!journallingDisabled) {
1!
75
            try {
76
                this.journal = new Journal(pool, journalDir);
1✔
77
                this.journal.initialize();
1✔
78
                this.initialized = true;
1✔
79
            } catch(final EXistException | ReadOnlyException e) {
1✔
80
                throw new BrokerPoolServiceException(e);
×
81
            }
82
        }
83
    }
1✔
84

85
    public synchronized void disableJournalling() {
86
        this.journallingDisabled = true;
×
87
    }
×
88

89
    /**
90
     * Write a single entry to the journal
91
     *
92
     * @see Journal#writeToLog(Loggable)
93
     *
94
     * @param loggable The entry to write in the journal
95
     *
96
     * @throws JournalException of the journal entry cannot be written
97
     */
98
    public synchronized void journal(final Loggable loggable) throws JournalException {
99
        if (!journallingDisabled) {
1!
100
            journal.writeToLog(loggable);
1✔
101
        }
102
    }
1✔
103

104
    /**
105
     * Write a group of entrys to the journal
106
     *
107
     * @see Journal#writeToLog(Loggable)
108
     * @see Journal#flushToLog(boolean)
109
     *
110
     * @param loggable The entry to write in the journalGroup
111
     *
112
     * @throws JournalException of the journal group cannot be written
113
     */
114
    public synchronized void journalGroup(final Loggable loggable) throws JournalException {
115
        if (!journallingDisabled) {
1!
116
            journal.writeToLog(loggable);
1✔
117
            if (!groupCommits) {
1!
118
                journal.flushToLog(true);
1✔
119
            }
120
        }
121
    }
1✔
122

123
    /**
124
     * @see Journal#checkpoint(long, boolean)
125
     *
126
     * Create a new checkpoint. A checkpoint fixes the current database state. All dirty pages
127
     * are written to disk and the journal file is cleaned.
128
     *
129
     * This method is called from
130
     * {@link org.exist.storage.BrokerPool} within pre-defined periods. It
131
     * should not be called from somewhere else. The database needs to
132
     * be in a stable state (all transactions completed, no operations running).
133
     *
134
     * @param transactionId The id of the transaction for the checkpoint
135
     * @param switchFiles Whether a new journal file should be started
136
     *
137
     * @throws JournalException of the journal checkpoint cannot be written
138
     */
139
    public synchronized void checkpoint(final long transactionId, final boolean switchFiles) throws JournalException {
140
        if (!journallingDisabled) {
1!
141
            journal.checkpoint(transactionId, switchFiles);
1✔
142

143
            // notify each listener, de-registering those who want no further events
144
            journalListeners.forEach(listener -> {
1✔
145
                if(!listener.afterCheckpoint(transactionId)) {
×
146
                    journalListeners.remove(listener);
×
147
                }
148
            });
×
149
        }
150
    }
1✔
151

152
    /**
153
     * Flush the Journal.
154
     *
155
     * @param fsync true to use fsync
156
     * @param forceSync true to force an fsync
157
     *
158
     * See {@link Journal#flushToLog(boolean, boolean)}.
159
     */
160
    public synchronized void flush(final boolean fsync, final boolean forceSync) {
161
        journal.flushToLog(fsync, forceSync);
1✔
162
    }
1✔
163

164

165

166
    /**
167
     * Shut down the journal. This will write a checkpoint record
168
     * to the log, so recovery manager knows the file has been
169
     * closed in a clean way.
170
     *
171
     * @param transactionId The id of the transaction for the shutdown
172
     * @param checkpoint Whether to write a checkpoint before shutdown
173
     */
174
    public synchronized void shutdown(final long transactionId, final boolean checkpoint) {
175
        if(initialized) {
1!
176
            journal.shutdown(transactionId, checkpoint);
1✔
177
            initialized = false;
1✔
178
        }
179
    }
1✔
180

181
    /**
182
     * @see Journal#lastWrittenLsn()
183
     *
184
     * @return the last written LSN
185
     */
186
    public synchronized Lsn lastWrittenLsn() {
187
        return journal.lastWrittenLsn();
1✔
188
    }
189

190

191

192
    public RecoveryManager.JournalRecoveryAccessor getRecoveryAccessor(final RecoveryManager recoveryManager) {
193
        return recoveryManager.new JournalRecoveryAccessor(
1✔
194
                journal::setInRecovery, journal::getFiles, journal::getFile, journal::setCurrentJournalFileNumber,
1✔
195
                () -> { journal.switchFiles(); return null; });
1✔
196
    }
197

198
    /**
199
     * Add a callback which can listen for Journal events.
200
     *
201
     * @param listener the journal listener
202
     */
203
    public void listen(final JournalListener listener) {
204
        this.journalListeners.add(listener);
1✔
205
    }
1✔
206

207
    /**
208
     * Callback for Journal events
209
     */
210
    public interface JournalListener {
211

212
        /**
213
         * Called after the journal has written a checkpoint
214
         *
215
         * @param txnId The id of the transaction written in the checkpoint
216
         *
217
         * @return true if the listener should continue to receive events, false
218
         *    if the listener should be de-registered and receive no further events.
219
         */
220
        boolean afterCheckpoint(final long txnId);
221
    }
222

223
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc