• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

evolvedbinary / elemental / 982

29 Apr 2025 08:34PM UTC coverage: 56.409% (+0.007%) from 56.402%
982

push

circleci

adamretter
[feature] Improve README.md badges

28451 of 55847 branches covered (50.94%)

Branch coverage included in aggregate %.

77468 of 131924 relevant lines covered (58.72%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.11
/exist-core/src/main/java/org/exist/storage/txn/TransactionManager.java
1
/*
2
 * Elemental
3
 * Copyright (C) 2024, Evolved Binary Ltd
4
 *
5
 * admin@evolvedbinary.com
6
 * https://www.evolvedbinary.com | https://www.elemental.xyz
7
 *
8
 * Use of this software is governed by the Business Source License 1.1
9
 * included in the LICENSE file and at www.mariadb.com/bsl11.
10
 *
11
 * Change Date: 2028-04-27
12
 *
13
 * On the date above, in accordance with the Business Source License, use
14
 * of this software will be governed by the Apache License, Version 2.0.
15
 *
16
 * Additional Use Grant: Production use of the Licensed Work for a permitted
17
 * purpose. A Permitted Purpose is any purpose other than a Competing Use.
18
 * A Competing Use means making the Software available to others in a commercial
19
 * product or service that: substitutes for the Software; substitutes for any
20
 * other product or service we offer using the Software that exists as of the
21
 * date we make the Software available; or offers the same or substantially
22
 * similar functionality as the Software.
23
 */
24
package org.exist.storage.txn;
25

26
import net.jcip.annotations.ThreadSafe;
27
import org.apache.logging.log4j.LogManager;
28
import org.apache.logging.log4j.Logger;
29
import org.exist.EXistException;
30
import org.exist.security.PermissionDeniedException;
31
import org.exist.storage.*;
32
import org.exist.storage.journal.JournalException;
33
import org.exist.storage.journal.JournalManager;
34
import org.exist.storage.sync.Sync;
35
import org.exist.util.LockException;
36
import org.exist.xmldb.XmldbURI;
37

38
import java.io.IOException;
39
import java.util.Objects;
40
import java.util.Optional;
41
import java.util.concurrent.ConcurrentHashMap;
42
import java.util.concurrent.atomic.AtomicInteger;
43
import java.util.concurrent.atomic.AtomicLong;
44

45
/**
46
 * The Transaction Manager provides methods to begin, commit, and abort
47
 * transactions.
48
 *
49
 * This implementation of the transaction manager is non-blocking lock-free.
50
 * It makes use of several CAS variables to ensure thread-safe concurrent
51
 * access. The most important of which is {@link #state} which indicates
52
 * either:
53
 *     1) the number of active transactions
54
 *     2) that the Transaction Manager is executing system
55
 *         tasks ({@link #STATE_SYSTEM}), during which time no
56
 *         other transactions are active.
57
 *     3) that the Transaction Manager has (or is)
58
 *         been shutdown ({@link #STATE_SHUTDOWN}).
59
 *
60
 * NOTE: the Transaction Manager may optimistically briefly enter
61
 *     the state {@link #STATE_SYSTEM} to block the initiation of
62
 *     new transactions and then NOT execute system tasks if it
63
 *     detects concurrent active transactions.
64
 *
65
 * System tasks are mutually exclusive with any other operation
66
 * including shutdown. When shutdown is requested, if system tasks
67
 * are executing, then the thread will spin until they are finished.
68
 * 
69
 * There's only one TransactionManager per database instance, it can be
70
 * accessed via {@link BrokerPool#getTransactionManager()}.
71
 *
72
 * @author <a href="mailto:adam@evolvedbinary.com">Adam Retter</a>
73
 */
74
@ThreadSafe
75
public class TransactionManager implements BrokerPoolService {
76

77
    private static final Logger LOG = LogManager.getLogger(TransactionManager.class);
1✔
78

79
    private final BrokerPool pool;
80
    private final Optional<JournalManager> journalManager;
81
    private final SystemTaskManager systemTaskManager;
82

83
    /**
84
     * The next transaction id
85
     */
86
    private final AtomicLong nextTxnId = new AtomicLong();
1✔
87

88
    /**
89
     * Currently active transactions and their operations journal write count.
90
     *  Key is the transaction id
91
     *  Value is the transaction's operations journal write count.
92
     */
93
    private final ConcurrentHashMap<Long, TxnCounter> transactions = new ConcurrentHashMap<>();
1✔
94

95
    /**
96
     * State for when the Transaction Manager has been shutdown.
97
     */
98
    private static final int STATE_SHUTDOWN = -2;
99

100
    /**
101
     * State for when the Transaction Manager has executing system tasks.
102
     */
103
    private static final int STATE_SYSTEM = -1;
104

105
    /**
106
     * State for when the Transaction Manager is idle, i.e. no active transactions.
107
     */
108
    private static final int STATE_IDLE = 0;
1✔
109

110
    /**
111
     * State of the transaction manager.
112
     *
113
     * Will be either {@link #STATE_SHUTDOWN}, {@link #STATE_SYSTEM},
114
     * {@link #STATE_IDLE} or a non-zero positive integer which
115
     * indicates the number of active transactions.
116
     */
117
    private final AtomicInteger state = new AtomicInteger(STATE_IDLE);
1✔
118

119
    /**
120
     * Id of the thread which is executing system tasks when
121
     * the {@link #state} == {@link #STATE_SYSTEM}. This
122
     * is used for reentrancy when system tasks need to
123
     * make transactional operations.
124
     */
125
    private final AtomicLong systemThreadId = new AtomicLong(-1);
1✔
126

127

128
    /**
129
     * Constructs a transaction manager for a Broker Pool.
130
     * 
131
     * @param pool the broker pool
132
     * @param journalManager the journal manager
133
     * @param systemTaskManager the system task manager
134
     */
135
    public TransactionManager(final BrokerPool pool, final Optional<JournalManager> journalManager,
1✔
136
            final SystemTaskManager systemTaskManager) {
137
        this.pool = pool;
1✔
138
        this.journalManager = journalManager;
1✔
139
        this.systemTaskManager = systemTaskManager;
1✔
140
    }
1✔
141

142
    private static void throwShutdownException() {
143
        //TODO(AR) API should be revised in future so that this is a TransactionException
144
        throw new RuntimeException("Transaction Manager is shutdown");
×
145
    }
146

147
    /**
148
     * Create a new transaction.
149
     *
150
     * @return the new transaction
151
     */
152
    public Txn beginTransaction() {
153
        try {
154
            // CAS loop
155
            while (true) {
156
                final int localState = state.get();
1✔
157

158
                // can NOT begin transaction when shutdown!
159
                if (localState == STATE_SHUTDOWN) {
1!
160
                    throwShutdownException();
×
161
                }
162

163
                // must NOT begin transaction when another thread is processing system tasks!
164
                if (localState == STATE_SYSTEM) {
1✔
165
                    final long thisThreadId = Thread.currentThread().getId();
1✔
166
                    if (systemThreadId.compareAndSet(thisThreadId, thisThreadId)) {
1✔
167
                        // our thread is executing system tasks, allow reentrancy from our thread!
168

169
                        // done... return from CAS loop!
170
                        return doBeginTransaction();
1✔
171

172
                    } else {
173
                        // spin whilst another thread executes the system tasks
174
                        // sleep a small time to save CPU
175
                        Thread.sleep(10);
1✔
176
                        continue;
1✔
177
                    }
178
                }
179

180
                // if we are operational and are not preempted by another thread, begin transaction
181
                if (localState >= STATE_IDLE && state.compareAndSet(localState, localState + 1)) {
1!
182
                    // done... return from CAS loop!
183
                    return doBeginTransaction();
1✔
184
                }
185
            }
186
        } catch (final InterruptedException e) {
×
187
            // thrown by Thread.sleep
188
            Thread.currentThread().interrupt();
×
189
            //TODO(AR) API should be revised in future so that this is a TransactionException
190
            throw new RuntimeException(e);
×
191
        }
192
    }
193

194
    private Txn doBeginTransaction() {
195
        final long txnId = nextTxnId.getAndIncrement();
1✔
196
        if (journalManager.isPresent()) {
1✔
197
            try {
198
                journalManager.get().journal(new TxnStart(txnId));
1✔
199
            } catch (final JournalException e) {
1✔
200
                LOG.error("Failed to create transaction. Error writing to Journal", e);
×
201
            }
202
        }
203

204
        /*
205
         * NOTE: we intentionally increment the txn counter here
206
         *     to set the counter to 1 to represent the TxnStart,
207
         *     as that will not be done
208
         *     by {@link JournalManager#journal(Loggable)} or
209
         *     {@link Journal#writeToLog(loggable)}.
210
         */
211
        transactions.put(txnId, new TxnCounter().increment());
1✔
212
        final Txn txn = new Txn(this, txnId);
1✔
213

214
        // TODO(AR) ultimately we should be doing away with DBBroker#addCurrentTransaction
215
        try(final DBBroker broker = pool.getBroker()) {
1✔
216
            broker.addCurrentTransaction(txn);
1✔
217
        } catch(final EXistException ee) {
×
218
            LOG.fatal(ee.getMessage(), ee);
×
219
            throw new RuntimeException(ee);
×
220
        }
221

222
        return txn;
1✔
223
    }
224
    
225
    /**
226
     * Commit a transaction.
227
     * 
228
     * @param txn the transaction to commit.
229
     *
230
     * @throws TransactionException if the transaction could not be committed.
231
     */
232
    public void commit(final Txn txn) throws TransactionException {
233
        Objects.requireNonNull(txn);
1✔
234

235
        if(txn instanceof Txn.ReusableTxn) {
1!
236
            txn.commit();
×
237
            return;
×
238
            //throw new IllegalStateException("Commit should be called on the transaction and not via the TransactionManager"); //TODO(AR) remove later when API is cleaned up?
239
        }
240

241
        //we can only commit something which is in the STARTED state
242
        if (txn.getState() != Txn.State.STARTED) {
1✔
243
            return;
1✔
244
        }
245

246
        boolean doneCommit = false;
1✔
247

248
        // CAS loop
249
        try {
250
            while (true) {
251
                final int localState = state.get();
1✔
252

253
                // can NOT commit transaction when shutdown!
254
                if (localState == STATE_SHUTDOWN) {
1!
255
                    throwShutdownException();
×
256
                }
257

258
                // must NOT commit transaction when another thread is processing system tasks!
259
                if (localState == STATE_SYSTEM) {
1✔
260
                    final long thisThreadId = Thread.currentThread().getId();
1✔
261
                    if (systemThreadId.compareAndSet(thisThreadId, thisThreadId)) {
1!
262
                        // our thread is executing system tasks, allow reentrancy from our thread!
263
                        doCommitTransaction(txn);
1✔
264

265
                        // done... exit CAS loop!
266
                        return;
1✔
267

268
                    } else {
269
                        // spin whilst another thread executes the system tasks
270
                        // sleep a small time to save CPU
271
                        Thread.sleep(10);
×
272
                        continue;
×
273
                    }
274
                }
275

276
                // if we are have active transactions and are not preempted by another thread, commit transaction
277
                if (localState > STATE_IDLE) {
1!
278
                    if (!doneCommit) {
1✔
279
                        doCommitTransaction(txn);
1✔
280
                        // NOTE: doCommitTransaction above might throw an exception before commit which will throw us out of the loop
281
                        doneCommit = txn.getState() == Txn.State.COMMITTED;
1!
282
                    }
283

284
                    // try and update our state to reflect that we have committed the transaction, if not then loop...
285
                    if (state.compareAndSet(localState, localState - 1)) {
1✔
286
                        // done... exit CAS loop!
287
                        return;
1✔
288
                    }
289
                }
290
            }
291
        } catch (final InterruptedException e) {
×
292
            // thrown by Thread.sleep
293
            Thread.currentThread().interrupt();
×
294
            //TODO(AR) API should be revised in future so that this is a TransactionException
295
            throw new RuntimeException(e);
×
296
        }
297
    }
298

299
    private void doCommitTransaction(final Txn txn) throws TransactionException {
300
        if (journalManager.isPresent()) {
1✔
301
            try {
302
                journalManager.get().journalGroup(new TxnCommit(txn.getId()));
1✔
303
            } catch (final JournalException e) {
1✔
304
                throw new TransactionException("Failed to write commit record to journal: " + e.getMessage(), e);
×
305
            }
306
        }
307

308
        txn.signalCommit();
1✔
309
        txn.releaseAll();
1✔
310

311
        transactions.remove(txn.getId());
1✔
312

313
        if (LOG.isDebugEnabled()) {
1!
314
            LOG.debug("Committed transaction: {}", txn.getId());
×
315
        }
316
    }
1✔
317

318
    /**
319
     * Abort a transaction.
320
     *
321
     * @param txn the transaction to abort.
322
     */
323
    public void abort(final Txn txn) {
324
        Objects.requireNonNull(txn);
1✔
325

326
        //we can only abort something which is in the STARTED state
327
        if (txn.getState() != Txn.State.STARTED) {
1✔
328
            return;
1✔
329
        }
330

331
        // CAS loop
332
        try {
333
            while (true) {
334
                final int localState = state.get();
1✔
335

336
                // can NOT abort transaction when shutdown!
337
                if (localState == STATE_SHUTDOWN) {
1!
338
                    throwShutdownException();
×
339
                }
340

341
                // must NOT abort transaction when another thread is processing system tasks!
342
                if (localState == STATE_SYSTEM) {
1!
343
                    final long thisThreadId = Thread.currentThread().getId();
×
344
                    if (systemThreadId.compareAndSet(thisThreadId, thisThreadId)) {
×
345
                        // our thread is executing system tasks, allow reentrancy from our thread!
346
                        doAbortTransaction(txn);
×
347

348
                        // done... exit CAS loop!
349
                        return;
×
350

351
                    } else {
352
                        // spin whilst another thread executes the system tasks
353
                        // sleep a small time to save CPU
354
                        Thread.sleep(10);
×
355
                        continue;
×
356
                    }
357
                }
358

359
                // if we are have active transactions and are not preempted by another thread, abort transaction
360
                if (localState > STATE_IDLE && state.compareAndSet(localState, localState - 1)) {
1!
361
                    doAbortTransaction(txn);
1✔
362

363
                    // done... exit CAS loop!
364
                    return;
1✔
365
                }
366
            }
367
        } catch (final InterruptedException e) {
×
368
            // thrown by Thread.sleep
369
            Thread.currentThread().interrupt();
×
370
            //TODO(AR) API should be revised in future so that this is a TransactionException
371
            throw new RuntimeException(e);
×
372
        }
373
    }
374

375
    private void doAbortTransaction(final Txn txn) {
376
        if (journalManager.isPresent()) {
1!
377
            try {
378
                journalManager.get().journalGroup(new TxnAbort(txn.getId()));
1✔
379
            } catch (final JournalException e) {
1✔
380
                //TODO(AR) should revise the API in future to throw TransactionException
381
                LOG.error("Failed to write abort record to journal: {}", e.getMessage(), e);
×
382
            }
383
        }
384

385
        txn.signalAbort();
1✔
386
        txn.releaseAll();
1✔
387

388
        transactions.remove(txn.getId());
1✔
389

390
        if (LOG.isDebugEnabled()) {
1!
391
            LOG.debug("Aborted transaction: {}", txn.getId());
×
392
        }
393
    }
1✔
394

395
    /**
396
     * Close the transaction.
397
     *
398
     * Ensures that the transaction has either been committed or aborted.
399
     *
400
     * @param txn the transaction to close
401
     */
402
    public void close(final Txn txn) {
403
        Objects.requireNonNull(txn);
1✔
404

405
        //if the transaction is already closed, do nothing
406
        if (txn.getState() == Txn.State.CLOSED) {
1✔
407
            return;
1✔
408
        }
409
        try {
410
            //if the transaction is started, then we should auto-abort the uncommitted transaction
411
            if (txn.getState() == Txn.State.STARTED) {
1✔
412
                LOG.warn("Transaction was not committed or aborted, auto aborting!");
1✔
413
                abort(txn);
1✔
414
            }
415

416
            // TODO(AR) ultimately we should be doing away with DBBroker#addCurrentTransaction
417
            try(final DBBroker broker = pool.getBroker()) {
1✔
418
                broker.removeCurrentTransaction(txn instanceof Txn.ReusableTxn ? ((Txn.ReusableTxn)txn).getUnderlyingTransaction() : txn);
1✔
419
            } catch(final EXistException ee) {
×
420
                LOG.fatal(ee.getMessage(), ee);
×
421
                throw new RuntimeException(ee);
×
422
            }
423

424
        } finally {
425
            txn.setState(Txn.State.CLOSED); //transaction is now closed!
1✔
426
        }
427

428
        processSystemTasks();
1✔
429
    }
1✔
430

431
    /**
432
     * Keep track of a new operation within the given transaction.
433
     *
434
     * @param txnId the transaction id.
435
     */
436
    public void trackOperation(final long txnId) {
437
        transactions.get(txnId).increment();
1✔
438
    }
1✔
439

440
    /**
441
     * Create a new checkpoint. A checkpoint fixes the current database state. All dirty pages
442
     * are written to disk and the journal file is cleaned.
443
     *
444
     * This method is called from
445
     * {@link org.exist.storage.BrokerPool#sync(DBBroker, Sync)} within pre-defined periods. It
446
     * should not be called from somewhere else. The database needs to
447
     * be in a stable state (all transactions completed, no operations running).
448
     *
449
     * @param switchFiles Indicates whether a new journal file should be started
450
     *
451
     * @throws TransactionException if an error occurs whilst writing the checkpoint.
452
     */
453
    public void checkpoint(final boolean switchFiles) throws TransactionException {
454
        if (state.get() == STATE_SHUTDOWN) {
1!
455
            throwShutdownException();
×
456
        }
457

458
        if(journalManager.isPresent()) {
1✔
459
            try {
460
                final long txnId = nextTxnId.getAndIncrement();
1✔
461
                journalManager.get().checkpoint(txnId, switchFiles);
1✔
462
            } catch(final JournalException e) {
1✔
463
                throw new TransactionException(e.getMessage(), e);
×
464
            }
465
        }
466
    }
1✔
467

468
    /**
469
     * @deprecated This mixes concerns and should not be here!
470
     * @param broker the DBBroker
471
     * @throws IOException in response to an I/O error
472
     */
473
    @Deprecated
474
    public void reindex(final DBBroker broker) throws IOException {
475
        broker.pushSubject(broker.getBrokerPool().getSecurityManager().getSystemSubject());
×
476
        try(final Txn transaction = beginTransaction()) {
×
477
            broker.reindexCollection(transaction, XmldbURI.ROOT_COLLECTION_URI);
×
478
            commit(transaction);
×
479
        } catch (final PermissionDeniedException | LockException | TransactionException e) {
×
480
            LOG.error("Exception during reindex: {}", e.getMessage(), e);
×
481
        } finally {
482
                broker.popSubject();
×
483
        }
484
    }
×
485

486
    @Override
487
    public void shutdown() {
488
        try {
489
            while (true) {
490
                final int localState = state.get();
1✔
491

492
                if (localState == STATE_SHUTDOWN) {
1!
493
                    // already shutdown!
494
                    return;
×
495
                }
496

497
                // can NOT shutdown whilst system tasks are executing
498
                if (localState == STATE_SYSTEM) {
1!
499
                    // spin whilst another thread executes the system tasks
500
                    // sleep a small time to save CPU
501
                    Thread.sleep(10);
×
502
                    continue;
×
503
                }
504

505
                if (state.compareAndSet(localState, STATE_SHUTDOWN)) {
1!
506
                    // CAS above guarantees that only a single thread will ever enter this block once!
507

508
                    final int uncommitted = uncommittedTransaction();
1✔
509
                    final boolean checkpoint = uncommitted == 0;
1✔
510

511
                    final long txnId = nextTxnId.getAndIncrement();
1✔
512
                    journalManager.ifPresent(manager -> manager.shutdown(txnId, checkpoint));
1✔
513

514
                    transactions.clear();
1✔
515

516
                    if (LOG.isDebugEnabled()) {
1!
517
                        LOG.debug("Shutting down transaction manager. Uncommitted transactions: {}", transactions.size());
×
518
                    }
519

520
                    // done... exit CAS loop!
521
                    return;
1✔
522
                }
523
            }
524
        } catch (final InterruptedException e) {
×
525
            // thrown by Thread.sleep
526
            Thread.currentThread().interrupt();
×
527
            throw new RuntimeException(e);
×
528
        }
529
    }
530

531
    private int uncommittedTransaction() {
532
        final Integer uncommittedCount = transactions.reduce(1000,
1✔
533
                (txnId, txnCounter) -> {
1✔
534
                    if (txnCounter.getCount() > 0) {
1!
535
                        LOG.warn("Found an uncommitted transaction with id {}. Pending operations: {}", txnId, txnCounter.getCount());
1✔
536
                        return 1;
1✔
537
                    } else {
538
                        return 0;
×
539
                    }
540
                },
541
                Integer::sum
1✔
542
        );
543

544
        if (uncommittedCount == null) {
1✔
545
           return 0;
1✔
546
        }
547

548
        if (uncommittedCount > 0) {
1!
549
            LOG.warn("There are uncommitted transactions. A recovery run may be triggered upon restart.");
1✔
550
        }
551

552
        return uncommittedCount;
1✔
553
    }
554

555
    public void triggerSystemTask(final SystemTask task) {
556
        systemTaskManager.addSystemTask(task);
1✔
557
        processSystemTasks();
1✔
558
    }
1✔
559

560
    private void processSystemTasks() {
561
        if (state.get() != STATE_IDLE) {
1✔
562
            // avoids taking a broker below if it is not needed
563
            return;
1✔
564
        }
565

566
        try (final DBBroker systemBroker = pool.get(Optional.of(pool.getSecurityManager().getSystemSubject()))) {
1✔
567

568
            // no new transactions can begin, commit, or abort whilst processing system tasks
569
            // only process system tasks if there are no active transactions, i.e. the state == IDLE
570
            if (state.compareAndSet(STATE_IDLE, STATE_SYSTEM)) {
1✔
571
                // CAS above guarantees that only a single thread will ever enter this block at once
572
                try {
573
                    this.systemThreadId.set(Thread.currentThread().getId());
1✔
574

575
                    // we have to check that `transactions` is empty
576
                    // otherwise we might be in SYSTEM state but `abort` or `commit`
577
                    // functions are still finishing
578
                    if (transactions.isEmpty()) {
1!
579
                        try (final Txn transaction = beginTransaction()) {
1✔
580
                            systemTaskManager.processTasks(systemBroker, transaction);
1✔
581
                            transaction.commit();
1✔
582
                        }
583
                    }
584

585
                } finally {
586
                    this.systemThreadId.set(-1);
1✔
587

588
                    // restore IDLE state
589
                    state.set(STATE_IDLE);
1✔
590
                }
591
            }
592
        } catch (final EXistException e) {
×
593
            LOG.error("Unable to process system tasks: {}", e.getMessage(), e);
×
594
        }
595
    }
1✔
596

597
    /**
598
     * Keep track of the number of operations processed within a transaction.
599
     * This is used to determine if there are any uncommitted transactions
600
     * during shutdown.
601
     */
602
    private static final class TxnCounter {
1✔
603
        /**
604
         * The counter variable is declared volatile as it is only ever
605
         * written from one thread (via {@link #increment()} which is
606
         * the `transaction` for which it is maintaining a count, whilst
607
         * it is read from (potentially) a different thread
608
         * (via {@link #getCount()} when {@link TransactionManager#shutdown()}
609
         * calls {@link TransactionManager#uncommittedTransaction()}.
610
         */
611
        private volatile long counter = 0;
1✔
612

613
        public TxnCounter increment() {
614
            counter++;
1✔
615
            return this;
1✔
616
        }
617

618
        public long getCount() {
619
            return counter;
1✔
620
        }
621
    }
622
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc