• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

evolvedbinary / elemental / 982

29 Apr 2025 08:34PM UTC coverage: 56.409% (+0.007%) from 56.402%
982

push

circleci

adamretter
[feature] Improve README.md badges

28451 of 55847 branches covered (50.94%)

Branch coverage included in aggregate %.

77468 of 131924 relevant lines covered (58.72%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.36
/exist-core/src/main/java/org/exist/storage/BrokerPool.java
1
/*
2
 * Elemental
3
 * Copyright (C) 2024, Evolved Binary Ltd
4
 *
5
 * admin@evolvedbinary.com
6
 * https://www.evolvedbinary.com | https://www.elemental.xyz
7
 *
8
 * Use of this software is governed by the Business Source License 1.1
9
 * included in the LICENSE file and at www.mariadb.com/bsl11.
10
 *
11
 * Change Date: 2028-04-27
12
 *
13
 * On the date above, in accordance with the Business Source License, use
14
 * of this software will be governed by the Apache License, Version 2.0.
15
 *
16
 * Additional Use Grant: Production use of the Licensed Work for a permitted
17
 * purpose. A Permitted Purpose is any purpose other than a Competing Use.
18
 * A Competing Use means making the Software available to others in a commercial
19
 * product or service that: substitutes for the Software; substitutes for any
20
 * other product or service we offer using the Software that exists as of the
21
 * date we make the Software available; or offers the same or substantially
22
 * similar functionality as the Software.
23
 *
24
 * NOTE: Parts of this file contain code from 'The eXist-db Authors'.
25
 *       The original license header is included below.
26
 *
27
 * =====================================================================
28
 *
29
 * eXist-db Open Source Native XML Database
30
 * Copyright (C) 2001 The eXist-db Authors
31
 *
32
 * info@exist-db.org
33
 * http://www.exist-db.org
34
 *
35
 * This library is free software; you can redistribute it and/or
36
 * modify it under the terms of the GNU Lesser General Public
37
 * License as published by the Free Software Foundation; either
38
 * version 2.1 of the License, or (at your option) any later version.
39
 *
40
 * This library is distributed in the hope that it will be useful,
41
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
42
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
43
 * Lesser General Public License for more details.
44
 *
45
 * You should have received a copy of the GNU Lesser General Public
46
 * License along with this library; if not, write to the Free Software
47
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
48
 */
49
package org.exist.storage;
50

51
import com.evolvedbinary.j8fu.fsm.AtomicFSM;
52
import com.evolvedbinary.j8fu.fsm.FSM;
53
import com.evolvedbinary.j8fu.lazy.AtomicLazyVal;
54
import net.jcip.annotations.ThreadSafe;
55
import org.apache.logging.log4j.LogManager;
56
import org.apache.logging.log4j.Logger;
57
import org.exist.Database;
58
import org.exist.EXistException;
59
import org.exist.collections.Collection;
60
import org.exist.collections.CollectionCache;
61
import org.exist.collections.CollectionConfiguration;
62
import org.exist.collections.CollectionConfigurationManager;
63
import org.exist.collections.triggers.*;
64
import org.exist.config.ConfigurationDocumentTrigger;
65
import org.exist.config.Configurator;
66
import org.exist.config.annotation.ConfigurationClass;
67
import org.exist.config.annotation.ConfigurationFieldAsAttribute;
68
import org.exist.debuggee.Debuggee;
69
import org.exist.debuggee.DebuggeeFactory;
70
import org.exist.dom.persistent.SymbolTable;
71
import org.exist.indexing.IndexManager;
72
import org.exist.management.AgentFactory;
73
import org.exist.numbering.DLNFactory;
74
import org.exist.numbering.NodeIdFactory;
75
import org.exist.repo.ClasspathHelper;
76
import org.exist.repo.ExistRepository;
77
import org.exist.scheduler.Scheduler;
78
import org.exist.scheduler.impl.QuartzSchedulerImpl;
79
import org.exist.scheduler.impl.SystemTaskJobImpl;
80
import org.exist.security.SecurityManager;
81
import org.exist.security.*;
82
import org.exist.security.internal.SecurityManagerImpl;
83
import org.exist.storage.blob.BlobStore;
84
import org.exist.storage.blob.BlobStoreImplService;
85
import org.exist.storage.blob.BlobStoreService;
86
import org.exist.storage.journal.JournalManager;
87
import org.exist.storage.lock.FileLockService;
88
import org.exist.storage.lock.LockManager;
89
import org.exist.storage.recovery.RecoveryManager;
90
import org.exist.storage.sync.Sync;
91
import org.exist.storage.sync.SyncTask;
92
import org.exist.storage.txn.TransactionException;
93
import org.exist.storage.txn.TransactionManager;
94
import org.exist.storage.txn.Txn;
95
import org.exist.util.*;
96
import org.exist.xmldb.ShutdownListener;
97
import org.exist.xmldb.XmldbURI;
98
import org.exist.xquery.PerformanceStats;
99
import org.exist.xquery.PerformanceStatsService;
100
import org.exist.xquery.XQuery;
101

102
import java.io.IOException;
103
import java.io.PrintWriter;
104
import java.io.StringWriter;
105
import java.lang.ref.Reference;
106
import java.lang.reflect.Array;
107
import java.lang.reflect.Field;
108
import java.nio.file.FileStore;
109
import java.nio.file.Path;
110
import java.text.NumberFormat;
111
import java.util.*;
112
import java.util.Map.Entry;
113
import java.util.concurrent.ConcurrentHashMap;
114
import java.util.concurrent.ConcurrentSkipListSet;
115
import java.util.concurrent.atomic.AtomicBoolean;
116
import java.util.concurrent.locks.Lock;
117
import java.util.concurrent.locks.ReentrantLock;
118
import java.util.function.Consumer;
119

120
import static com.evolvedbinary.j8fu.fsm.TransitionTable.transitionTable;
121
import static org.exist.util.ThreadUtils.nameInstanceThreadGroup;
122
import static org.exist.util.ThreadUtils.newInstanceThread;
123

124
/**
125
 * This class controls all available instances of the database.
126
 * Use it to configure, start and stop database instances.
127
 * You may have multiple instances defined, each using its own configuration.
128
 * To define multiple instances, pass an identification string to
129
 * {@link #configure(String, int, int, Configuration, Optional)}
130
 * and use {@link #getInstance(String)} to retrieve an instance.
131
 *
132
 * @author <a href="mailto:wolfgang@exist-db.org">Wolfgang Meier</a>
133
 * @author <a href="mailto:pierrick.brihaye@free.fr">Pierrick Brihaye</a>
134
 * @author <a href="mailto:adam@exist-db.org">Adam Retter</a>
135
 */
136
@ConfigurationClass("pool")
137
public class BrokerPool extends BrokerPools implements BrokerPoolConstants, Database {
138

139
    private final static Logger LOG = LogManager.getLogger(BrokerPool.class);
1✔
140

141
    private final BrokerPoolServicesManager servicesManager = new BrokerPoolServicesManager();
1✔
142

143
    private StatusReporter statusReporter = null;
1✔
144

145
    private final XQuery xqueryService = new XQuery();
1✔
146

147
    private AtomicLazyVal<SaxonConfiguration> saxonConfiguration = new AtomicLazyVal<>(() -> SaxonConfiguration.loadConfiguration(this));
1✔
148

149
    //TODO : make it non-static since every database instance may have its own policy.
150
    //TODO : make a default value that could be overwritten by the configuration
151
    // WM: this is only used by junit tests to test the recovery process.
152
    /**
153
     * For testing only: triggers a database corruption by disabling the page caches. The effect is
154
     * similar to a sudden power loss or the jvm being killed. The flag is used by some
155
     * junit tests to test the recovery process.
156
     */
157
    public static boolean FORCE_CORRUPTION = false;
1✔
158

159
    /**
160
     * <code>true</code> if the database instance is able to perform recovery.
161
     */
162
    private final boolean recoveryEnabled;
163

164
    /**
165
     * The name of the database instance
166
     */
167
    private final String instanceName;
168

169
    private final int concurrencyLevel;
170
    private LockManager lockManager;
171

172
    /**
173
     * Root thread group for all threads related
174
     * to this instance.
175
     */
176
    private final ThreadGroup instanceThreadGroup;
177

178
    /**
179
     * State of the BrokerPool instance
180
     */
181
    private enum State {
1✔
182
        SHUTTING_DOWN_MULTI_USER_MODE,
1✔
183
        SHUTTING_DOWN_SYSTEM_MODE,
1✔
184
        SHUTDOWN,
1✔
185
        INITIALIZING,
1✔
186
        INITIALIZING_SYSTEM_MODE,
1✔
187
        INITIALIZING_MULTI_USER_MODE,
1✔
188
        OPERATIONAL
1✔
189
    }
190

191
    private enum Event {
1✔
192
        INITIALIZE,
1✔
193
        INITIALIZE_SYSTEM_MODE,
1✔
194
        INITIALIZE_MULTI_USER_MODE,
1✔
195
        READY,
1✔
196

197
        START_SHUTDOWN_MULTI_USER_MODE,
1✔
198
        START_SHUTDOWN_SYSTEM_MODE,
1✔
199
        FINISHED_SHUTDOWN,
1✔
200
    }
201

202
    @SuppressWarnings("unchecked")
203
    private final FSM<State, Event> status = new AtomicFSM<>(State.SHUTDOWN,
1✔
204
            transitionTable(State.class, Event.class)
1✔
205
                    .when(State.SHUTDOWN).on(Event.INITIALIZE).switchTo(State.INITIALIZING)
1✔
206
                    .when(State.INITIALIZING).on(Event.INITIALIZE_SYSTEM_MODE).switchTo(State.INITIALIZING_SYSTEM_MODE)
1✔
207
                    .when(State.INITIALIZING_SYSTEM_MODE).on(Event.INITIALIZE_MULTI_USER_MODE).switchTo(State.INITIALIZING_MULTI_USER_MODE)
1✔
208
                    .when(State.INITIALIZING_MULTI_USER_MODE).on(Event.READY).switchTo(State.OPERATIONAL)
1✔
209
                    .when(State.OPERATIONAL).on(Event.START_SHUTDOWN_MULTI_USER_MODE).switchTo(State.SHUTTING_DOWN_MULTI_USER_MODE)
1✔
210
                    .when(State.SHUTTING_DOWN_MULTI_USER_MODE).on(Event.START_SHUTDOWN_SYSTEM_MODE).switchTo(State.SHUTTING_DOWN_SYSTEM_MODE)
1✔
211
                    .when(State.SHUTTING_DOWN_SYSTEM_MODE).on(Event.FINISHED_SHUTDOWN).switchTo(State.SHUTDOWN)
1✔
212
            .build()
1✔
213
    );
214

215

216
    public String getStatus() {
217
        return status.getCurrentState().name();
1✔
218
    }
219

220
    /**
221
     * The number of brokers for the database instance
222
     */
223
    private int brokersCount = 0;
1✔
224

225
    /**
226
     * The minimal number of brokers for the database instance
227
     */
228
    @ConfigurationFieldAsAttribute("min")
229
    private final int minBrokers;
230

231
    /**
232
     * The maximal number of brokers for the database instance
233
     */
234
    @ConfigurationFieldAsAttribute("max")
235
    private final int maxBrokers;
236

237
    /**
238
     * The number of inactive brokers for the database instance
239
     */
240
    private final Deque<DBBroker> inactiveBrokers = new ArrayDeque<>();
1✔
241

242
    /**
243
     * The number of active brokers for the database instance
244
     */
245
    private final Map<Thread, DBBroker> activeBrokers = new ConcurrentHashMap<>();
1✔
246

247

248
    /**
249
     * Used when TRACE level logging is enabled
250
     * to provide a history of broker leases
251
     */
252
    private final Map<String, TraceableStateChanges<TraceableBrokerLeaseChange.BrokerInfo, TraceableBrokerLeaseChange.Change>> brokerLeaseChangeTrace = LOG.isTraceEnabled() ? new HashMap<>() : null;
1!
253
    private final Map<String, List<TraceableStateChanges<TraceableBrokerLeaseChange.BrokerInfo, TraceableBrokerLeaseChange.Change>>> brokerLeaseChangeTraceHistory = LOG.isTraceEnabled() ? new HashMap<>() : null;
1!
254

255
    /**
256
     * The configuration object for the database instance
257
     */
258
    private final Configuration conf;
259

260
    private final ConcurrentSkipListSet<Observer> statusObservers = new ConcurrentSkipListSet<>();
1✔
261

262
    /**
263
     * <code>true</code> if a cache synchronization event is scheduled
264
     */
265
    //TODO : rename as syncScheduled ?
266
    //TODO : alternatively, delete this member and create a Sync.NOSYNC event
267
    private boolean syncRequired = false;
1✔
268

269
    /**
270
     * The kind of scheduled cache synchronization event.
271
     * One of {@link org.exist.storage.sync.Sync}
272
     */
273
    private Sync syncEvent = Sync.MINOR;
1✔
274

275
    private boolean checkpoint = false;
1✔
276

277
    /**
278
     * Indicates whether the database is operating in read-only mode
279
     */
280
    private final AtomicBoolean readOnly = new AtomicBoolean();
1✔
281

282
    @ConfigurationFieldAsAttribute("pageSize")
283
    private final int pageSize;
284

285
    private FileLockService dataLock;
286

287
    /**
288
     * The journal manager of the database instance.
289
     */
290
    private Optional<JournalManager> journalManager = Optional.empty();
1✔
291

292
    /**
293
     * The transaction manager of the database instance.
294
     */
295
    private TransactionManager transactionManager = null;
1✔
296

297
    /**
298
     * The Blob Store of the database instance.
299
     */
300
    private BlobStoreService blobStoreService;
301

302
    /**
303
     * Delay (in ms) for running jobs to return when the database instance shuts down.
304
     */
305
    @ConfigurationFieldAsAttribute("wait-before-shutdown")
306
    private final long maxShutdownWait;
307

308
    /**
309
     * The scheduler for the database instance.
310
     */
311
    @ConfigurationFieldAsAttribute("scheduler")
312
    private Scheduler scheduler;
313

314
    /**
315
     * Manages pluggable index structures.
316
     */
317
    private IndexManager indexManager;
318

319
    /**
320
     * Global symbol table used to encode element and attribute qnames.
321
     */
322
    private SymbolTable symbols;
323

324
    /**
325
     * Cache synchronization on the database instance.
326
     */
327
    @ConfigurationFieldAsAttribute("sync-period")
328
    private final long majorSyncPeriod;        //the period after which a major sync should occur
329
    private long lastMajorSync = System.currentTimeMillis();    //time the last major sync occurred
1✔
330

331
    private final long diskSpaceMin;
332

333
    /**
334
     * The listener that is notified when the database instance shuts down.
335
     */
336
    private ShutdownListener shutdownListener = null;
1✔
337

338
    /**
339
     * The security manager of the database instance.
340
     */
341
    private SecurityManager securityManager = null;
1✔
342

343
    /**
344
     * The global notification service used to subscribe
345
     * to document updates.
346
     */
347
    private NotificationService notificationService = null;
1✔
348

349
    /**
350
     * The cache in which the database instance may store items.
351
     */
352

353
    private DefaultCacheManager cacheManager;
354

355
    private long reservedMem;
356

357
    /**
358
     * The pool in which the database instance's <strong>compiled</strong> XQueries are stored.
359
     */
360
    private XQueryPool xQueryPool;
361

362
    /**
363
     * The monitor in which the database instance's strong>running</strong> XQueries are managed.
364
     */
365
    private ProcessMonitor processMonitor;
366

367
    /**
368
     * Global performance stats to gather function execution statistics
369
     * from all queries running on this database instance.
370
     */
371
    private PerformanceStats xqueryStats;
372

373
    /**
374
     * The global manager for accessing collection configuration files from the database instance.
375
     */
376
    private CollectionConfigurationManager collectionConfigurationManager = null;
1✔
377

378
    /**
379
     * The cache in which the database instance's collections are stored.
380
     */
381
    //TODO : rename as collectionsCache ?
382
    private CollectionCache collectionCache;
383

384
    /**
385
     * The pool in which the database instance's readers are stored.
386
     */
387
    private XMLReaderPool xmlReaderPool;
388

389
    private final NodeIdFactory nodeFactory = new DLNFactory();
1✔
390

391
    private final Lock globalXUpdateLock = new ReentrantLock();
1✔
392

393
    private Subject serviceModeUser = null;
1✔
394
    private boolean inServiceMode = false;
1✔
395

396
    //the time that the database was started
397
    private final Calendar startupTime = Calendar.getInstance();
1✔
398

399
    private final Optional<BrokerWatchdog> watchdog;
400

401
    private final ClassLoader classLoader;
402

403
    private Optional<ExistRepository> expathRepo = Optional.empty();
1✔
404

405
    private StartupTriggersManager startupTriggersManager;
406

407
    /**
408
     * Creates and configures the database instance.
409
     *
410
     * @param instanceName A name for the database instance.
411
     * @param minBrokers   The minimum number of concurrent brokers for handling requests on the database instance.
412
     * @param maxBrokers   The maximum number of concurrent brokers for handling requests on the database instance.
413
     * @param conf         The configuration object for the database instance
414
     * @param statusObserver    Observes the status of this database instance
415
     *
416
     * @throws EXistException If the initialization fails.
417
     */
418
    //TODO : Then write a configure(int minBrokers, int maxBrokers, Configuration conf) method
419
    BrokerPool(final String instanceName, final int minBrokers, final int maxBrokers, final Configuration conf,
1✔
420
            final Optional<Observer> statusObserver) {
421

422
        final NumberFormat nf = NumberFormat.getNumberInstance();
1✔
423

424
        this.classLoader = Thread.currentThread().getContextClassLoader();
1✔
425
        this.instanceName = instanceName;
1✔
426
        this.instanceThreadGroup = new ThreadGroup(nameInstanceThreadGroup(instanceName));
1✔
427

428
        this.maxShutdownWait = conf.getProperty(BrokerPool.PROPERTY_SHUTDOWN_DELAY, DEFAULT_MAX_SHUTDOWN_WAIT);
1✔
429
        LOG.info("database instance '{}' will wait  {} ms during shutdown", instanceName, nf.format(this.maxShutdownWait));
1✔
430

431
        this.recoveryEnabled = conf.getProperty(PROPERTY_RECOVERY_ENABLED, true);
1✔
432
        LOG.info("database instance '{}' is enabled for recovery : {}", instanceName, this.recoveryEnabled);
1✔
433

434
        this.minBrokers = conf.getProperty(PROPERTY_MIN_CONNECTIONS, minBrokers);
1✔
435
        this.maxBrokers = conf.getProperty(PROPERTY_MAX_CONNECTIONS, maxBrokers);
1✔
436
        LOG.info("database instance '{}' will have between {} and {} brokers", instanceName, nf.format(this.minBrokers), nf.format(this.maxBrokers));
1✔
437

438
        this.majorSyncPeriod = conf.getProperty(PROPERTY_SYNC_PERIOD, DEFAULT_SYNCH_PERIOD);
1✔
439
        LOG.info("database instance '{}' will be synchronized every {} ms", instanceName, nf.format(/*this.*/majorSyncPeriod));
1✔
440

441
        // convert from bytes to megabytes: 1024 * 1024
442
        this.diskSpaceMin = 1024L * 1024L * conf.getProperty(BrokerPool.DISK_SPACE_MIN_PROPERTY, DEFAULT_DISK_SPACE_MIN);
1✔
443

444
        this.pageSize = conf.getProperty(PROPERTY_PAGE_SIZE, DEFAULT_PAGE_SIZE);
1✔
445

446
        //Configuration is valid, save it
447
        this.conf = conf;
1✔
448

449
        this.concurrencyLevel = Math.max(maxBrokers, 2 * Runtime.getRuntime().availableProcessors());
1✔
450

451
        statusObserver.ifPresent(this.statusObservers::add);
1✔
452

453
        this.watchdog = Optional.ofNullable(System.getProperty(BrokerWatchdog.TRACE_BROKERS_PROPERTY_NAME))
1✔
454
                .filter(value -> value.equals("yes"))
1✔
455
                .map(value -> new BrokerWatchdog());
1✔
456
    }
1✔
457

458
    /**
459
     * Initializes the database instance.
460
     *
461
     * @throws EXistException
462
     * @throws DatabaseConfigurationException
463
     */
464
    void initialize() throws EXistException, DatabaseConfigurationException {
465
        try {
466
            _initialize();
1✔
467
        } catch(final Throwable e) {
1✔
468
            // remove that file lock we may have acquired in canReadDataDir
469

470
            if (dataLock != null && !readOnly.get()) {
×
471
                dataLock.release();
×
472
            }
473

474
            if (e instanceof EXistException existException) {
×
475
                throw existException;
×
476
            } else if(e instanceof DatabaseConfigurationException databaseConfigurationException) {
×
477
                throw databaseConfigurationException;
×
478
            } else {
479
                throw new EXistException(e);
×
480
            }
481
        }
482
    }
1✔
483

484
    private void _initialize() throws EXistException, DatabaseConfigurationException {
485
        this.lockManager = new LockManager(conf, concurrencyLevel);
1✔
486

487
        //Flag to indicate that we are initializing
488
        status.process(Event.INITIALIZE);
1✔
489

490
        if(LOG.isDebugEnabled()) {
1!
491
            LOG.debug("initializing database instance '{}'...", instanceName);
×
492
        }
493

494
        // register core broker pool services
495
        this.scheduler = servicesManager.register(new QuartzSchedulerImpl(this));
1✔
496

497
        // NOTE: this must occur after the scheduler, and before any other service which requires access to the data directory
498
        this.dataLock = servicesManager.register(new FileLockService("dbx_dir.lck", BrokerPool.PROPERTY_DATA_DIR, NativeBroker.DEFAULT_DATA_DIR));
1✔
499

500
        this.securityManager = servicesManager.register(new SecurityManagerImpl(this));
1✔
501

502
        this.cacheManager = servicesManager.register(new DefaultCacheManager(this));
1✔
503
        this.xQueryPool = servicesManager.register(new XQueryPool());
1✔
504
        this.processMonitor = servicesManager.register(new ProcessMonitor());
1✔
505
        this.xqueryStats = servicesManager.register(new PerformanceStatsService());
1✔
506
        final XMLReaderObjectFactory xmlReaderObjectFactory = servicesManager.register(new XMLReaderObjectFactory());
1✔
507
        this.xmlReaderPool = servicesManager.register(new XMLReaderPool(xmlReaderObjectFactory, maxBrokers, 0));
1✔
508
        final int bufferSize = Optional.of(conf.getInteger(PROPERTY_COLLECTION_CACHE_SIZE))
1✔
509
                .filter(size -> size != -1)
1!
510
                .orElse(DEFAULT_COLLECTION_BUFFER_SIZE);
1✔
511
        this.collectionCache = servicesManager.register(new CollectionCache());
1✔
512
        this.notificationService = servicesManager.register(new NotificationService());
1✔
513

514
        this.journalManager = recoveryEnabled ? Optional.of(new JournalManager()) : Optional.empty();
1✔
515
        journalManager.ifPresent(servicesManager::register);
1✔
516

517
        final SystemTaskManager systemTaskManager = servicesManager.register(new SystemTaskManager(this));
1✔
518
        this.transactionManager = servicesManager.register(new TransactionManager(this, journalManager, systemTaskManager));
1✔
519

520
        this.blobStoreService = servicesManager.register(new BlobStoreImplService());
1✔
521

522
        this.symbols = servicesManager.register(new SymbolTable());
1✔
523

524
        this.expathRepo = Optional.of(new ExistRepository());
1✔
525
        expathRepo.ifPresent(servicesManager::register);
1✔
526
        servicesManager.register(new ClasspathHelper());
1✔
527

528
        this.indexManager = servicesManager.register(new IndexManager(this));
1✔
529

530
        //Get a manager to handle further collections configuration
531
        this.collectionConfigurationManager = servicesManager.register(new CollectionConfigurationManager(this));
1✔
532

533
        this.startupTriggersManager = servicesManager.register(new StartupTriggersManager());
1✔
534

535
        // this is just used for unit tests
536
        final BrokerPoolService testBrokerPoolService = (BrokerPoolService) conf.getProperty("exist.testBrokerPoolService");
1✔
537
        if (testBrokerPoolService != null) {
1✔
538
            servicesManager.register(testBrokerPoolService);
1✔
539
        }
540

541
        //configure the registered services
542
        try {
543
            servicesManager.configureServices(conf);
1✔
544
        } catch(final BrokerPoolServiceException e) {
1✔
545
            throw new EXistException(e);
×
546
        }
547

548
        // calculate how much memory is reserved for caches to grow
549
        final Runtime rt = Runtime.getRuntime();
1✔
550
        final long maxMem = rt.maxMemory();
1✔
551
        final long minFree = maxMem / 5;
1✔
552
        reservedMem = cacheManager.getTotalMem() + collectionCache.getMaxCacheSize() + minFree;
1✔
553
        LOG.debug("Reserved memory: {}; max: {}; min: {}", reservedMem, maxMem, minFree);
1✔
554

555
        //prepare the registered services, before entering system (single-user) mode
556
        try {
557
            servicesManager.prepareServices(this);
1✔
558
        } catch(final BrokerPoolServiceException e) {
1✔
559
            throw new EXistException(e);
×
560
        }
561

562
        //setup database synchronization job
563
        if(majorSyncPeriod > 0) {
1!
564
            final SyncTask syncTask = new SyncTask();
1✔
565
            syncTask.configure(conf, null);
1✔
566
            scheduler.createPeriodicJob(2500, new SystemTaskJobImpl(SyncTask.getJobName(), syncTask), 2500);
1✔
567
        }
568

569
        try {
570
            statusReporter = new StatusReporter(SIGNAL_STARTUP);
1✔
571
            statusObservers.forEach(statusReporter::addObserver);
1✔
572

573
            final Thread statusThread = newInstanceThread(this, "startup-status-reporter", statusReporter);
1✔
574
            statusThread.start();
1✔
575

576
            // statusReporter may have to be terminated or the thread can/will hang.
577
            try {
578
                final boolean exportOnly = conf.getProperty(PROPERTY_EXPORT_ONLY, false);
1✔
579

580
                // If the initialization fails after transactionManager has been created this method better cleans up
581
                // or the FileSyncThread for the journal can/will hang.
582
                try {
583

584
                    // Enter System Mode
585
                    try(final DBBroker systemBroker = get(Optional.of(securityManager.getSystemSubject()))) {
1✔
586

587
                        status.process(Event.INITIALIZE_SYSTEM_MODE);
1✔
588

589
                        if(isReadOnly()) {
1!
590
                            journalManager.ifPresent(JournalManager::disableJournalling);
×
591
                        }
592

593
                        try(final Txn transaction = transactionManager.beginTransaction()) {
1✔
594
                            servicesManager.startPreSystemServices(systemBroker, transaction);
1✔
595
                            transaction.commit();
1✔
596
                        } catch(final BrokerPoolServiceException e) {
×
597
                            throw new EXistException(e);
×
598
                        }
599

600
                        //Run the recovery process
601
                        boolean recovered = false;
1✔
602
                        if(isRecoveryEnabled()) {
1✔
603
                            recovered = runRecovery(systemBroker);
1✔
604
                            //TODO : extract the following from this block ? What if we are not transactional ? -pb
605
                            if(!recovered) {
1✔
606
                                try {
607
                                    if(systemBroker.getCollection(XmldbURI.ROOT_COLLECTION_URI) == null) {
1✔
608
                                        final Txn txn = transactionManager.beginTransaction();
1✔
609
                                        try {
610
                                            systemBroker.getOrCreateCollection(txn, XmldbURI.ROOT_COLLECTION_URI);
1✔
611
                                            transactionManager.commit(txn);
1✔
612
                                        } catch(final IOException | TriggerException | PermissionDeniedException e) {
1✔
613
                                            transactionManager.abort(txn);
×
614
                                        } finally {
615
                                            transactionManager.close(txn);
1✔
616
                                        }
617
                                    }
618
                                } catch(final PermissionDeniedException pde) {
×
619
                                    LOG.fatal(pde.getMessage(), pde);
×
620
                                }
621
                            }
622
                        }
623

624
                        /* initialise required collections if they don't exist yet */
625
                        if(!exportOnly) {
1!
626
                            try {
627
                                initialiseSystemCollections(systemBroker);
1✔
628
                            } catch(final PermissionDeniedException pde) {
1✔
629
                                LOG.error(pde.getMessage(), pde);
×
630
                                throw new EXistException(pde.getMessage(), pde);
×
631
                            }
632
                        }
633

634
                        statusReporter.setStatus(SIGNAL_READINESS);
1✔
635

636
                        try(final Txn transaction = transactionManager.beginTransaction()) {
1✔
637
                            servicesManager.startSystemServices(systemBroker, transaction);
1✔
638
                            transaction.commit();
1✔
639
                        } catch(final BrokerPoolServiceException e) {
×
640
                            throw new EXistException(e);
×
641
                        }
642

643
                        //If necessary, launch a task to repair the DB
644
                        //TODO : merge this with the recovery process ?
645
                        if(isRecoveryEnabled() && recovered) {
1✔
646
                            if(!exportOnly) {
1!
647
                                reportStatus("Reindexing database files...");
1✔
648
                                try {
649
                                    systemBroker.repair();
1✔
650
                                } catch(final PermissionDeniedException e) {
1✔
651
                                    LOG.warn("Error during recovery: {}", e.getMessage(), e);
×
652
                                }
653
                            }
654

655
                            if((Boolean) conf.getProperty(PROPERTY_RECOVERY_CHECK)) {
1!
656
                                final ConsistencyCheckTask task = new ConsistencyCheckTask();
1✔
657
                                final Properties props = new Properties();
1✔
658
                                props.setProperty("backup", "no");
1✔
659
                                props.setProperty("output", "sanity");
1✔
660
                                task.configure(conf, props);
1✔
661
                                try (final Txn transaction = transactionManager.beginTransaction()) {
1✔
662
                                    task.execute(systemBroker, transaction);
1✔
663
                                    transaction.commit();
1✔
664
                                }
665
                            }
666
                        }
667

668
                        //OK : the DB is repaired; let's make a few RW operations
669
                        statusReporter.setStatus(SIGNAL_WRITABLE);
1✔
670

671
                        //initialize configurations watcher trigger
672
                        if(!exportOnly) {
1!
673
                            try {
674
                                initialiseTriggersForCollections(systemBroker, XmldbURI.SYSTEM_COLLECTION_URI);
1✔
675
                            } catch(final PermissionDeniedException pde) {
1✔
676
                                //XXX: do not catch exception!
677
                                LOG.error(pde.getMessage(), pde);
×
678
                            }
679
                        }
680

681
                        // remove temporary docs
682
                        try {
683
                            systemBroker.cleanUpTempResources(true);
1✔
684
                        } catch(final PermissionDeniedException pde) {
1✔
685
                            LOG.error(pde.getMessage(), pde);
×
686
                        }
687

688
                        sync(systemBroker, Sync.MAJOR);
1✔
689

690
                        // we have completed all system mode operations
691
                        // we can now prepare those services which need
692
                        // system mode before entering multi-user mode
693
                        try(final Txn transaction = transactionManager.beginTransaction()) {
1✔
694
                            servicesManager.startPreMultiUserSystemServices(systemBroker, transaction);
1✔
695
                                                        transaction.commit();
1✔
696
                        } catch(final BrokerPoolServiceException e) {
×
697
                            throw new EXistException(e);
×
698
                        }
699
                    }
700

701
                    //Create a default configuration file for the root collection
702
                    //TODO : why can't we call this from within CollectionConfigurationManager ?
703
                    //TODO : understand why we get a test suite failure
704
                    //collectionConfigurationManager.checkRootCollectionConfigCollection(broker);
705
                    //collectionConfigurationManager.checkRootCollectionConfig(broker);
706

707
                    //Create the minimal number of brokers required by the configuration
708
                    for(int i = 1; i < minBrokers; i++) {
1!
709
                        createBroker();
×
710
                    }
711

712
                    status.process(Event.INITIALIZE_MULTI_USER_MODE);
1✔
713

714
                    // register some MBeans to provide access to this instance
715
                    AgentFactory.getInstance().initDBInstance(this);
1✔
716

717
                    if(LOG.isDebugEnabled()) {
1!
718
                        LOG.debug("database instance '{}' initialized", instanceName);
×
719
                    }
720

721
                    servicesManager.startMultiUserServices(this);
1✔
722

723
                    status.process(Event.READY);
1✔
724

725
                    statusReporter.setStatus(SIGNAL_STARTED);
1✔
726
                } catch(final Throwable t) {
1✔
727
                    transactionManager.shutdown();
×
728
                    throw t;
×
729
                }
730
            } catch(final EXistException e) {
×
731
                throw e;
×
732
            } catch(final Throwable t) {
×
733
                throw new EXistException(t.getMessage(), t);
×
734
            }
735
        } finally {
736
            if(statusReporter != null) {
1!
737
                statusReporter.terminate();
1✔
738
                statusReporter = null;
1✔
739
            }
740
        }
741
    }
1✔
742

743
    /**
744
     * Initialise system collections, if it doesn't exist yet
745
     *
746
     * @param sysBroker        The system broker from before the brokerpool is populated
747
     * @param sysCollectionUri XmldbURI of the collection to create
748
     * @param permissions      The permissions to set on the created collection
749
     */
750
    private void initialiseSystemCollection(final DBBroker sysBroker, final XmldbURI sysCollectionUri, final int permissions) throws EXistException, PermissionDeniedException {
751
        Collection collection = sysBroker.getCollection(sysCollectionUri);
1✔
752
        if(collection == null) {
1✔
753
            final TransactionManager transact = getTransactionManager();
1✔
754
            try(final Txn txn = transact.beginTransaction()) {
1✔
755
                collection = sysBroker.getOrCreateCollection(txn, sysCollectionUri);
1✔
756
                if(collection == null) {
1!
757
                    throw new IOException("Could not create system collection: " + sysCollectionUri);
×
758
                }
759
                collection.setPermissions(sysBroker, permissions);
1✔
760
                sysBroker.saveCollection(txn, collection);
1✔
761

762
                transact.commit(txn);
1✔
763
            } catch(final Exception e) {
×
764
                e.printStackTrace();
×
765
                final String msg = "Initialisation of system collections failed: " + e.getMessage();
×
766
                LOG.error(msg, e);
×
767
                throw new EXistException(msg, e);
×
768
            }
769
        }
770
    }
1✔
771

772
    /**
773
     * Initialize required system collections, if they don't exist yet
774
     *
775
     * @param broker - The system broker from before the brokerpool is populated
776
     * @throws EXistException If a system collection cannot be created
777
     */
778
    private void initialiseSystemCollections(final DBBroker broker) throws EXistException, PermissionDeniedException {
779
        //create /db/system
780
        initialiseSystemCollection(broker, XmldbURI.SYSTEM_COLLECTION_URI, Permission.DEFAULT_SYSTEM_COLLECTION_PERM);
1✔
781
    }
1✔
782

783
    private void initialiseTriggersForCollections(final DBBroker broker, final XmldbURI uri) throws EXistException, PermissionDeniedException {
784
        final Collection collection = broker.getCollection(uri);
1✔
785

786
        //initialize configurations watcher trigger
787
        if(collection != null) {
1!
788
            final CollectionConfigurationManager manager = getConfigurationManager();
1✔
789
            final CollectionConfiguration collConf = manager.getOrCreateCollectionConfiguration(broker, collection);
1✔
790

791
            final DocumentTriggerProxy triggerProxy = new DocumentTriggerProxy(ConfigurationDocumentTrigger.class); //, collection.getURI());
1✔
792
            collConf.documentTriggers().add(triggerProxy);
1✔
793
        }
794
    }
1✔
795

796
    /**
797
     * Get the LockManager for this database instance
798
     *
799
     * @return The lock manager
800
     */
801
    public LockManager getLockManager() {
802
        return lockManager;
1✔
803
    }
804

805
    /**
806
     * Run a database recovery if required. This method is called once during
807
     * startup from {@link org.exist.storage.BrokerPool}.
808
     *
809
     * @param broker the database broker
810
     * @return true if recovery was run, false otherwise
811
     * @throws EXistException if a database error occurs
812
     */
813
    public boolean runRecovery(final DBBroker broker) throws EXistException {
814
        final boolean forceRestart = conf.getProperty(PROPERTY_RECOVERY_FORCE_RESTART, false);
1✔
815
        if(LOG.isDebugEnabled()) {
1!
816
            LOG.debug("ForceRestart = {}", forceRestart);
×
817
        }
818
        if(journalManager.isPresent()) {
1!
819
            final RecoveryManager recovery = new RecoveryManager(broker, journalManager.get(), forceRestart);
1✔
820
            return recovery.recover();
1✔
821
        } else {
822
            throw new IllegalStateException("Cannot run recovery without a JournalManager");
×
823
        }
824
    }
825

826
    public long getReservedMem() {
827
        return reservedMem - cacheManager.getCurrentSize();
1✔
828
    }
829

830
    public int getPageSize() {
831
        return pageSize;
1✔
832
    }
833

834
    /**
835
     * Returns the class loader used when this BrokerPool was configured.
836
     *
837
     * @return the classloader
838
     */
839
    public ClassLoader getClassLoader() {
840
        return this.classLoader;
1✔
841
    }
842

843
    /**
844
     * Whether or not the database instance is operational, i.e. initialization
845
     * has completed
846
     *
847
     * @return <code>true</code> if the database instance is operational
848
     */
849
    public boolean isOperational() {
850
        return status.getCurrentState() == State.OPERATIONAL;
×
851
    }
852

853
    /**
854
     * Returns the database instance's name.
855
     *
856
     * @return The id
857
     */
858
    //TODO : rename getInstanceName
859
    public String getId() {
860
        return instanceName;
1✔
861
    }
862

863
    @Override
864
    public ThreadGroup getThreadGroup() {
865
        return instanceThreadGroup;
1✔
866
    }
867

868
    /**
869
     * Returns the number of brokers currently serving requests for the database instance.
870
     *
871
     * @return The brokers count
872
     * @deprecated use countActiveBrokers
873
     */
874
    //TODO : rename as getActiveBrokers ?
875
    @Deprecated
876
    public int active() {
877
        return activeBrokers.size();
×
878
    }
879

880
    /**
881
     * Returns the number of brokers currently serving requests for the database instance.
882
     *
883
     * @return The active brokers count.
884
     */
885
    @Override
886
    public int countActiveBrokers() {
887
        return activeBrokers.size();
1✔
888
    }
889

890
    public Map<Thread, DBBroker> getActiveBrokers() {
891
        return new HashMap<>(activeBrokers);
1✔
892
    }
893

894
    /**
895
     * Returns the number of inactive brokers for the database instance.
896
     *
897
     * @return The brokers count
898
     */
899
    //TODO : rename as getInactiveBrokers ?
900
    public int available() {
901
        return inactiveBrokers.size();
1✔
902
    }
903

904
    //TODO : getMin() method ?
905

906
    /**
907
     * Returns the maximal number of brokers for the database instance.
908
     *
909
     * @return The brokers count
910
     */
911
    //TODO : rename as getMaxBrokers ?
912
    public int getMax() {
913
        return maxBrokers;
1✔
914
    }
915

916
    public int total() {
917
        return brokersCount;
1✔
918
    }
919

920
    /**
921
     * Returns whether the database instance has been configured.
922
     *
923
     * @return <code>true</code> if the datbase instance is configured
924
     */
925
    public final boolean isInstanceConfigured() {
926
        return conf != null;
1!
927
    }
928

929
    /**
930
     * Returns the configuration object for the database instance.
931
     *
932
     * @return The configuration
933
     */
934
    public Configuration getConfiguration() {
935
        return conf;
1✔
936
    }
937

938
    public Optional<ExistRepository> getExpathRepo() {
939
        return expathRepo;
1✔
940
    }
941

942
    //TODO : rename as setShutdwonListener ?
943
    public void registerShutdownListener(final ShutdownListener listener) {
944
        //TODO : check that we are not shutting down
945
        shutdownListener = listener;
1✔
946
    }
1✔
947

948
    public NodeIdFactory getNodeFactory() {
949
        return nodeFactory;
1✔
950
    }
951

952
    /**
953
     * Returns the database instance's security manager
954
     *
955
     * @return The security manager
956
     */
957
    public SecurityManager getSecurityManager() {
958
        return securityManager;
1✔
959
    }
960

961

962
    /**
963
     * Returns the Scheduler
964
     *
965
     * @return The scheduler
966
     */
967
    public Scheduler getScheduler() {
968
        return scheduler;
1✔
969
    }
970

971
    @Override
972
    public BlobStore getBlobStore() {
973
        return blobStoreService.getBlobStore();
1✔
974
    }
975

976
    public SymbolTable getSymbols() {
977
        return symbols;
1✔
978
    }
979

980
    public NotificationService getNotificationService() {
981
        return notificationService;
1✔
982
    }
983

984
    /**
985
     * Returns whether transactions can be handled by the database instance.
986
     *
987
     * @return <code>true</code> if transactions can be handled
988
     */
989
    public boolean isRecoveryEnabled() {
990
        return !readOnly.get() && recoveryEnabled;
1!
991
    }
992

993
    @Override
994
    public boolean isReadOnly() {
995
        final boolean isReadOnly = readOnly.get();
1✔
996
        if (!isReadOnly) {
1!
997
            final long freeSpace = FileUtils.measureFileStore(dataLock.getFile(), FileStore::getUsableSpace);
1✔
998
            if (freeSpace != -1 && freeSpace < diskSpaceMin) {
1!
999
                LOG.fatal("Partition containing DATA_DIR: {} is running out of disk space [minimum: {} free: {}]. Switching Elemental into read-only mode to prevent data loss!", dataLock.getFile().toAbsolutePath().toString(), diskSpaceMin, freeSpace);
×
1000
                setReadOnly();
×
1001
            }
1002
        }
1003
        return readOnly.get();
1✔
1004
    }
1005

1006
    public void setReadOnly() {
1007
        if (readOnly.compareAndSet(false, true)) {
×
1008
            LOG.warn("Switched database into read-only mode!");
×
1009
        }
1010
    }
×
1011

1012
    public boolean isInServiceMode() {
1013
        return inServiceMode;
×
1014
    }
1015

1016
    @Override
1017
    public Optional<JournalManager> getJournalManager() {
1018
        return journalManager;
1✔
1019
    }
1020

1021
    public TransactionManager getTransactionManager() {
1022
        return transactionManager;
1✔
1023
    }
1024

1025
    /**
1026
     * Returns a manager for accessing the database instance's collection configuration files.
1027
     *
1028
     * @return The manager
1029
     */
1030
    @Override
1031
    public CollectionConfigurationManager getConfigurationManager() {
1032
        return collectionConfigurationManager;
1✔
1033
    }
1034

1035
    /**
1036
     * Returns a cache in which the database instance's collections are stored.
1037
     *
1038
     * @return The cache
1039
     */
1040
    public CollectionCache getCollectionsCache() {
1041
        return collectionCache;
1✔
1042
    }
1043

1044
    /**
1045
     * Returns a cache in which the database instance's may store items.
1046
     *
1047
     * @return The cache
1048
     */
1049
    @Override
1050
    public DefaultCacheManager getCacheManager() {
1051
        return cacheManager;
1✔
1052
    }
1053

1054
    /**
1055
     * Returns the index manager which handles all additional indexes not
1056
     * being part of the database core.
1057
     *
1058
     * @return The IndexManager
1059
     */
1060
    @Override
1061
    public IndexManager getIndexManager() {
1062
        return indexManager;
1✔
1063
    }
1064

1065
    /**
1066
     * Returns a pool in which the database instance's <strong>compiled</strong> XQueries are stored.
1067
     *
1068
     * @return The pool
1069
     */
1070
    public XQueryPool getXQueryPool() {
1071
        return xQueryPool;
1✔
1072
    }
1073

1074
    /**
1075
     * Retuns the XQuery Service
1076
     *
1077
     * @return The XQuery service
1078
     */
1079
    public XQuery getXQueryService() {
1080
        return xqueryService;
1✔
1081
    }
1082

1083
    /**
1084
     * Returns a monitor in which the database instance's <strong>running</strong> XQueries are managed.
1085
     *
1086
     * @return The monitor
1087
     */
1088
    public ProcessMonitor getProcessMonitor() {
1089
        return processMonitor;
1✔
1090
    }
1091

1092
    /**
1093
     * Returns the global profiler used to gather execution statistics
1094
     * from all XQueries running on this db instance.
1095
     *
1096
     * @return the profiler
1097
     */
1098
    public PerformanceStats getPerformanceStats() {
1099
        return xqueryStats;
1✔
1100
    }
1101

1102
    /**
1103
     * Returns a pool in which the database instance's readers are stored.
1104
     *
1105
     * @return The pool
1106
     *
1107
     * @deprecated Use {@link #getXmlReaderPool()} instead
1108
     */
1109
    @Deprecated
1110
    public XMLReaderPool getParserPool() {
1111
        return xmlReaderPool;
1✔
1112
    }
1113

1114
    /**
1115
     * Returns a pool in which the database instance's readers are stored.
1116
     *
1117
     * @return The pool
1118
     */
1119
    public XMLReaderPool getXmlReaderPool() {
1120
        return xmlReaderPool;
1✔
1121
    }
1122

1123
    /**
1124
     * Returns the global update lock for the database instance.
1125
     * This lock is used by XUpdate operations to avoid that
1126
     * concurrent XUpdate requests modify the database until all
1127
     * document locks have been correctly set.
1128
     *
1129
     * @return The global lock
1130
     */
1131
    //TODO : rename as getUpdateLock ?
1132
    public Lock getGlobalUpdateLock() {
1133
        return globalXUpdateLock;
1✔
1134
    }
1135

1136
    /**
1137
     * Creates an inactive broker for the database instance.
1138
     *
1139
     * @return The broker
1140
     * @throws EXistException if the broker cannot be created
1141
     */
1142
    protected DBBroker createBroker() throws EXistException {
1143
        //TODO : in the future, don't pass the whole configuration, just the part relevant to brokers
1144
        final DBBroker broker = BrokerFactory.getInstance(this, this.getConfiguration());
1✔
1145
        inactiveBrokers.push(broker);
1✔
1146
        brokersCount++;
1✔
1147
        broker.setId(broker.getClass().getName() + '_' + instanceName + "_" + brokersCount);
1✔
1148
        if (LOG.isDebugEnabled()) {
1!
1149
            LOG.debug("Created broker '{} for database instance '{}'", broker.getId(), instanceName);
×
1150
        }
1151
        return broker;
1✔
1152
    }
1153

1154
    /**
1155
     * Get active broker for current thread
1156
     *
1157
     * Note - If you call getActiveBroker() you must not call
1158
     * release on both the returned active broker and the original
1159
     * lease from {@link BrokerPool#getBroker()} or {@link BrokerPool#get(Optional)}
1160
     * otherwise release will have been called more than get!
1161
     *
1162
     * @return Database broker
1163
     * @throws RuntimeException NO broker available for current thread.
1164
     */
1165
    public DBBroker getActiveBroker() { //throws EXistException {
1166
        //synchronized(this) {
1167
        //Try to get an active broker
1168
        final DBBroker broker = activeBrokers.get(Thread.currentThread());
1✔
1169
        if(broker == null) {
1!
1170
            final StringBuilder sb = new StringBuilder();
×
1171
            sb.append("Broker was not obtained for thread '");
×
1172
            sb.append(Thread.currentThread());
×
1173
            sb.append("'.");
×
1174
            sb.append(System.getProperty("line.separator"));
×
1175

1176
            for(final Entry<Thread, DBBroker> entry : activeBrokers.entrySet()) {
×
1177
                sb.append(entry.getKey());
×
1178
                sb.append(" = ");
×
1179
                sb.append(entry.getValue());
×
1180
                sb.append(System.getProperty("line.separator"));
×
1181
            }
1182

1183
            LOG.debug(sb.toString());
×
1184
            throw new RuntimeException(sb.toString());
×
1185
        }
1186
        return broker;
1✔
1187
        //}
1188
    }
1189

1190
    public DBBroker authenticate(final String username, final Object credentials) throws AuthenticationException {
1191
        final Subject subject = getSecurityManager().authenticate(username, credentials);
1✔
1192

1193
        try {
1194
            return get(Optional.ofNullable(subject));
1✔
1195
        } catch(final Exception e) {
×
1196
            throw new AuthenticationException(AuthenticationException.UNNOWN_EXCEPTION, e);
×
1197
        }
1198
    }
1199

1200
    /**
1201
     * Returns an active broker for the database instance.
1202
     *
1203
     * The current user will be inherited by this broker
1204
     *
1205
     * @return The broker
1206
     */
1207
    public DBBroker getBroker() throws EXistException {
1208
        return get(Optional.empty());
1✔
1209
    }
1210

1211
    /**
1212
     * Returns an active broker for the database instance.
1213
     *
1214
     * @param subject Optionally a subject to set on the broker, if a user is not provided then the
1215
     *                current user assigned to the broker will be re-used
1216
     * @return The broker
1217
     * @throws EXistException If the instance is not available (stopped or not configured)
1218
     */
1219
    //TODO : rename as getBroker ? getInstance (when refactored) ?
1220
    public DBBroker get(final Optional<Subject> subject) throws EXistException {
1221
        Objects.requireNonNull(subject, "Subject cannot be null, use BrokerPool#getBroker() instead");
1✔
1222

1223
        if(!isInstanceConfigured()) {
1!
1224
            throw new EXistException("database instance '" + instanceName + "' is not available");
×
1225
        }
1226

1227
        //Try to get an active broker
1228
        DBBroker broker = activeBrokers.get(Thread.currentThread());
1✔
1229
        //Use it...
1230
        //TOUNDERSTAND (pb) : why not pop a broker from the inactive ones rather than maintaining reference counters ?
1231
        // WM: a thread may call this more than once in the sequence of operations, i.e. calls to get/release can
1232
        // be nested. Returning a new broker every time would lead to a deadlock condition if two threads have
1233
        // to wait for a broker to become available. We thus use reference counts and return
1234
        // the same broker instance for each thread.
1235
        if(broker != null) {
1✔
1236
            //increase its number of uses
1237
            broker.incReferenceCount();
1✔
1238
            broker.pushSubject(subject.orElseGet(broker::getCurrentSubject));
1✔
1239

1240
            if(LOG.isTraceEnabled()) {
1!
1241
                if(!brokerLeaseChangeTrace.containsKey(broker.getId())) {
×
1242
                    brokerLeaseChangeTrace.put(broker.getId(), new TraceableStateChanges<>());
×
1243
                }
1244
                brokerLeaseChangeTrace.get(broker.getId()).add(TraceableBrokerLeaseChange.get(new TraceableBrokerLeaseChange.BrokerInfo(broker.getId(), broker.getReferenceCount())));
×
1245
            }
1246

1247
            return broker;
1✔
1248
            //TODO : share the code with what is below (including notifyAll) ?
1249
            // WM: notifyAll is not necessary if we don't have to wait for a broker.
1250
        }
1251

1252
        //No active broker : get one ASAP
1253

1254
        while(serviceModeUser != null && subject.isPresent() && !subject.equals(Optional.ofNullable(serviceModeUser))) {
1!
1255
            try {
1256
                LOG.debug("Db instance is in service mode. Waiting for db to become available again ...");
×
1257
                wait();
×
1258
            } catch(final InterruptedException e) {
×
1259
                Thread.currentThread().interrupt();
×
1260
                LOG.error("Interrupt detected");
×
1261
            }
1262
        }
1263

1264
        synchronized(this) {
1✔
1265
            //Are there any available brokers ?
1266
            if(inactiveBrokers.isEmpty()) {
1✔
1267
                //There are no available brokers. If allowed...
1268
                if(brokersCount < maxBrokers)
1✔
1269
                //... create one
1270
                {
1271
                    createBroker();
1✔
1272
                } else
1✔
1273
                    //... or wait until there is one available
1274
                    while(inactiveBrokers.isEmpty()) {
1✔
1275
                        LOG.debug("waiting for a broker to become available");
1✔
1276
                        try {
1277
                            this.wait();
1✔
1278
                        } catch(final InterruptedException e) {
1✔
1279
                            //nothing to be done!
1280
                        }
1281
                    }
1282
            }
1283
            broker = inactiveBrokers.pop();
1✔
1284
            broker.prepare();
1✔
1285

1286
            //activate the broker
1287
            activeBrokers.put(Thread.currentThread(), broker);
1✔
1288

1289
            if(LOG.isTraceEnabled()) {
1!
1290
                LOG.trace("+++ {}{}", Thread.currentThread(), Stacktrace.top(Thread.currentThread().getStackTrace(), Stacktrace.DEFAULT_STACK_TOP));
×
1291
            }
1292

1293
            if(watchdog.isPresent()) {
1!
1294
                watchdog.get().add(broker);
×
1295
            }
1296

1297
            broker.incReferenceCount();
1✔
1298

1299
            broker.pushSubject(subject.orElseGet(securityManager::getGuestSubject));
1✔
1300

1301
            if(LOG.isTraceEnabled()) {
1!
1302
                if(!brokerLeaseChangeTrace.containsKey(broker.getId())) {
×
1303
                    brokerLeaseChangeTrace.put(broker.getId(), new TraceableStateChanges<>());
×
1304
                }
1305
                brokerLeaseChangeTrace.get(broker.getId()).add(TraceableBrokerLeaseChange.get(new TraceableBrokerLeaseChange.BrokerInfo(broker.getId(), broker.getReferenceCount())));
×
1306
            }
1307

1308
            //Inform the other threads that we have a new-comer
1309
            // TODO: do they really need to be informed here???????
1310
            this.notifyAll();
1✔
1311
            return broker;
1✔
1312
        }
1313
    }
1314

1315
    /**
1316
     * Releases a broker for the database instance. If it is no more used, make if invactive.
1317
     * If there are pending system maintenance tasks,
1318
     * the method will block until these tasks have finished.
1319
     *
1320
     * NOTE - this is intentionally package-private, it is only meant to be
1321
     * called internally and from {@link DBBroker#close()}
1322
     *
1323
     * @param broker The broker to be released
1324
     */
1325
    //TODO : rename as releaseBroker ? releaseInstance (when refactored) ?
1326
    void release(final DBBroker broker) {
1327
        Objects.requireNonNull(broker, "Cannot release nothing");
1✔
1328

1329
        if(LOG.isTraceEnabled()) {
1!
1330
            if(!brokerLeaseChangeTrace.containsKey(broker.getId())) {
×
1331
                brokerLeaseChangeTrace.put(broker.getId(), new TraceableStateChanges<>());
×
1332
            }
1333
            brokerLeaseChangeTrace.get(broker.getId()).add(TraceableBrokerLeaseChange.release(new TraceableBrokerLeaseChange.BrokerInfo(broker.getId(), broker.getReferenceCount())));
×
1334
        }
1335

1336
        //first check that the broker is active ! If not, return immediately.
1337
        broker.decReferenceCount();
1✔
1338
        if(broker.getReferenceCount() > 0) {
1✔
1339
            broker.popSubject();
1✔
1340
            //it is still in use and thus can't be marked as inactive
1341
            return;
1✔
1342
        }
1343

1344
        synchronized(this) {
1✔
1345
            //Broker is no more used : inactivate it
1346
            for(final DBBroker inactiveBroker : inactiveBrokers) {
1✔
1347
                if(broker == inactiveBroker) {
1!
1348
                    LOG.error("Broker {} is already in the inactive list!!!", broker.getId());
×
1349
                    return;
×
1350
                }
1351
            }
1352

1353
            if(activeBrokers.remove(Thread.currentThread()) == null) {
1!
1354
                LOG.error("release() has been called from the wrong thread for broker {}", broker.getId());
×
1355
                // Cleanup the state of activeBrokers
1356
                for(final Entry<Thread, DBBroker> activeBroker : activeBrokers.entrySet()) {
×
1357
                    if(activeBroker.getValue() == broker) {
×
1358
                        final String msg = "release() has been called from '" + Thread.currentThread() + "', but occupied at '" + activeBroker.getKey() + "'.";
×
1359
                        final EXistException ex = new EXistException(msg);
×
1360
                        LOG.error(msg, ex);
×
1361
                        activeBrokers.remove(activeBroker.getKey());
×
1362
                        break;
×
1363
                    }
1364
                }
1365
            } else {
×
1366
                if(LOG.isTraceEnabled()) {
1!
1367
                    LOG.trace("--- {}{}", Thread.currentThread(), Stacktrace.top(Thread.currentThread().getStackTrace(), Stacktrace.DEFAULT_STACK_TOP));
×
1368
                }
1369
            }
1370
            
1371
            Subject lastUser = broker.popSubject();
1✔
1372

1373
            //guard to ensure that the broker has popped all its subjects
1374
            if(lastUser == null || broker.getCurrentSubject() != null) {
1!
1375
                LOG.warn("Broker {} was returned with extraneous Subjects, cleaning...", broker.getId(), new IllegalStateException("DBBroker pushSubject/popSubject mismatch").fillInStackTrace());
×
1376
                if(LOG.isTraceEnabled()) {
×
1377
                    broker.traceSubjectChanges();
×
1378
                }
1379

1380
                //cleanup any remaining erroneous subjects
1381
                while(broker.getCurrentSubject() != null) {
×
1382
                    lastUser = broker.popSubject();
×
1383
                }
1384
            }
1385

1386
            inactiveBrokers.push(broker);
1✔
1387
            watchdog.ifPresent(wd -> wd.remove(broker));
1✔
1388

1389
            if(LOG.isTraceEnabled()) {
1!
1390
                if(!brokerLeaseChangeTraceHistory.containsKey(broker.getId())) {
×
1391
                    brokerLeaseChangeTraceHistory.put(broker.getId(), new ArrayList<>());
×
1392
                }
1393
                try {
1394
                    brokerLeaseChangeTraceHistory.get(broker.getId()).add((TraceableStateChanges<TraceableBrokerLeaseChange.BrokerInfo, TraceableBrokerLeaseChange.Change>) brokerLeaseChangeTrace.get(broker.getId()).clone());
×
1395
                    brokerLeaseChangeTrace.get(broker.getId()).clear();
×
1396
                } catch(final CloneNotSupportedException e) {
×
1397
                    LOG.error(e);
×
1398
                }
1399

1400
                broker.clearSubjectChangesTrace();
×
1401
            }
1402

1403
            //If the database is now idle, do some useful stuff
1404
            if(activeBrokers.size() == 0) {
1✔
1405
                //TODO : use a "clean" dedicated method (we have some below) ?
1406
                if(syncRequired) {
1✔
1407
                    //Note that the broker is not yet really inactive ;-)
1408
                    sync(broker, syncEvent);
1✔
1409
                    this.syncRequired = false;
1✔
1410
                    this.checkpoint = false;
1✔
1411
                }
1412
                if(serviceModeUser != null && !lastUser.equals(serviceModeUser)) {
1!
1413
                    inServiceMode = true;
×
1414
                }
1415
            }
1416
            //Inform the other threads that someone is gone
1417
            this.notifyAll();
1✔
1418
        }
1419
    }
1✔
1420

1421
    public DBBroker enterServiceMode(final Subject user) throws PermissionDeniedException {
1422
        if(!user.hasDbaRole()) {
×
1423
            throw new PermissionDeniedException("Only users of group dba can switch the db to service mode");
×
1424
        }
1425

1426
        serviceModeUser = user;
×
1427
        synchronized(this) {
×
1428
            if(activeBrokers.size() != 0) {
×
1429
                while(!inServiceMode) {
×
1430
                    try {
1431
                        wait();
×
1432
                    } catch(final InterruptedException e) {
×
1433
                        //nothing to be done
1434
                    }
1435
                }
1436
            }
1437
        }
1438

1439
        inServiceMode = true;
×
1440
        final DBBroker broker = inactiveBrokers.peek();
×
1441
        broker.prepare();
×
1442
        checkpoint = true;
×
1443
        sync(broker, Sync.MAJOR);
×
1444
        checkpoint = false;
×
1445
        // Return a broker that can be used to perform system tasks
1446
        return broker;
×
1447
    }
1448

1449
    public void exitServiceMode(final Subject user) throws PermissionDeniedException {
1450
        if(!user.equals(serviceModeUser)) {
×
1451
            throw new PermissionDeniedException("The db has been locked by a different user");
×
1452
        }
1453
        serviceModeUser = null;
×
1454
        inServiceMode = false;
×
1455
        synchronized(this) {
×
1456
            this.notifyAll();
×
1457
        }
1458
    }
×
1459

1460
    public void reportStatus(final String message) {
1461
        if(statusReporter != null) {
1✔
1462
            statusReporter.setStatus(message);
1✔
1463
        }
1464
    }
1✔
1465

1466
    public long getMajorSyncPeriod() {
1467
        return majorSyncPeriod;
1✔
1468
    }
1469

1470
    public long getLastMajorSync() {
1471
        return lastMajorSync;
1✔
1472
    }
1473

1474
    /**
1475
     * Executes a waiting cache synchronization for the database instance.
1476
     *
1477
     * NOTE: This method should not be called concurrently from multiple threads.
1478
     *
1479
     * @param broker    A broker responsible for executing the job
1480
     * @param syncEvent One of {@link org.exist.storage.sync.Sync}
1481
     */
1482
    public void sync(final DBBroker broker, final Sync syncEvent) {
1483

1484
        /**
1485
         * Database Systems - The Complete Book (Second edition)
1486
         * § 17.4.1 The Undo/Redo Rules
1487
         *
1488
         * The constraints that an undo/redo logging system must follow are summarized by the following rule:
1489
         *     * UR1  Before modifying any database element X on disk because of changes
1490
         *            made by some transaction T, it is necessary that the update record
1491
         *            <T,X,v,w> appear on disk.
1492
         */
1493
        journalManager.ifPresent(manager -> manager.flush(true, true));
1✔
1494

1495
        // sync various DBX files
1496
        broker.sync(syncEvent);
1✔
1497

1498
        //TODO : strange that it is set *after* the sunc method has been called.
1499
        try {
1500
            broker.pushSubject(securityManager.getSystemSubject());
1✔
1501

1502
            if (syncEvent == Sync.MAJOR) {
1✔
1503
                LOG.debug("Major sync");
1✔
1504
                try {
1505
                    if (!FORCE_CORRUPTION) {
1✔
1506
                        transactionManager.checkpoint(checkpoint);
1✔
1507
                    }
1508
                } catch (final TransactionException e) {
1✔
1509
                    LOG.warn(e.getMessage(), e);
×
1510
                }
1511
                cacheManager.checkCaches();
1✔
1512

1513
                lastMajorSync = System.currentTimeMillis();
1✔
1514
                if (LOG.isDebugEnabled()) {
1!
1515
                    notificationService.debug();
×
1516
                }
1517
            } else {
×
1518
                cacheManager.checkDistribution();
1✔
1519
//            LOG.debug("Minor sync");
1520
            }
1521
            //TODO : touch this.syncEvent and syncRequired ?
1522
        } finally {
1✔
1523
            broker.popSubject();
1✔
1524
        }
1525
    }
1✔
1526

1527
    /**
1528
     * Schedules a system maintenance task for the database instance. If the database is idle,
1529
     * the task will be run immediately. Otherwise, the task will be deferred
1530
     * until all running threads have returned.
1531
     *
1532
     * @param task The task
1533
     */
1534
    //TOUNDERSTAND (pb) : synchronized, so... "schedules" or, rather, "executes" ?
1535
    public void triggerSystemTask(final SystemTask task) {
1536
        final State s = status.getCurrentState();
1✔
1537
        if(s == State.SHUTTING_DOWN_MULTI_USER_MODE || s == State.SHUTTING_DOWN_SYSTEM_MODE) {
1!
1538
            LOG.info("Skipping SystemTask: '{}' as database is shutting down...", task.getName());
1✔
1539
            return;
1✔
1540
        } else if(s == State.SHUTDOWN) {
1!
1541
            LOG.warn("Unable to execute SystemTask: '{}' as database is shut down!", task.getName());
×
1542
            return;
×
1543
        }
1544

1545
        transactionManager.triggerSystemTask(task);
1✔
1546
    }
1✔
1547

1548
    /**
1549
     * Shuts downs the database instance
1550
     */
1551
    public void shutdown() {
1552
        shutdown(false);
1✔
1553
    }
1✔
1554

1555
    /**
1556
     * Returns true if the BrokerPool is in the
1557
     * process of shutting down.
1558
     *
1559
     * @return true if the BrokerPool is shutting down.
1560
     */
1561
    public boolean isShuttingDown() {
1562
        final State s = status.getCurrentState();
1✔
1563
        return s == State.SHUTTING_DOWN_MULTI_USER_MODE
1!
1564
                || s == State.SHUTTING_DOWN_SYSTEM_MODE;
1!
1565
    }
1566

1567
    /**
1568
     * Returns true if the BrokerPool is either in the
1569
     * process of shutting down, or has already shutdown.
1570
     *
1571
     * @return true if the BrokerPool is shutting down or
1572
     *     has shutdown.
1573
     */
1574
    public boolean isShuttingDownOrDown() {
1575
        final State s = status.getCurrentState();
1✔
1576
        return s == State.SHUTTING_DOWN_MULTI_USER_MODE
1!
1577
                || s == State.SHUTTING_DOWN_SYSTEM_MODE
1!
1578
                || s == State.SHUTDOWN;
1!
1579
    }
1580

1581
    /**
1582
     * Returns true of the BrokerPool is shutdown.
1583
     *
1584
     * @return true if the BrokerPool is shutdown.
1585
     */
1586
    public boolean isShutDown() {
1587
        return status.getCurrentState() == State.SHUTDOWN;
1!
1588
    }
1589

1590
    /**
1591
     * Shuts downs the database instance
1592
     *
1593
     * @param killed <code>true</code> when the JVM is (cleanly) exiting
1594
     */
1595
    public void shutdown(final boolean killed) {
1596
        shutdown(killed, BrokerPools::removeInstance);
1✔
1597
    }
1✔
1598

1599
    void shutdown(final boolean killed, final Consumer<String> shutdownInstanceConsumer) {
1600

1601
        try {
1602
            status.process(Event.START_SHUTDOWN_MULTI_USER_MODE);
1✔
1603
        } catch(final IllegalStateException e) {
1✔
1604
            // we are not operational!
1605
            LOG.warn(e);
1✔
1606
            return;
1✔
1607
        }
1608

1609
        // notify any BrokerPoolServices that we are about to shutdown
1610
        try {
1611
            // instruct database services that we are about to stop multi-user mode
1612
            servicesManager.stopMultiUserServices(this);
1✔
1613
        } catch(final BrokerPoolServicesManagerException e) {
1✔
1614
            for(final BrokerPoolServiceException bpse : e.getServiceExceptions()) {
×
1615
                LOG.error(bpse.getMessage(), bpse);
×
1616
            }
1617
        }
1618

1619
        try {
1620
            status.process(Event.START_SHUTDOWN_SYSTEM_MODE);
1✔
1621
        } catch(final IllegalStateException e) {
1✔
1622
            // we are not in SHUTTING_DOWN_MULTI_USER_MODE!
1623
            LOG.warn(e);
×
1624
            return;
×
1625
        }
1626

1627
        try {
1628
            LOG.info("Database is shutting down ...");
1✔
1629

1630
            processMonitor.stopRunningJobs();
1✔
1631

1632
            //Shutdown the scheduler
1633
            scheduler.shutdown(true);
1✔
1634

1635
            try {
1636
                statusReporter = new StatusReporter(SIGNAL_SHUTDOWN);
1✔
1637
                statusObservers.forEach(statusReporter::addObserver);
1✔
1638

1639
                synchronized (this) {
1✔
1640
                    final Thread statusThread = newInstanceThread(this, "shutdown-status-reporter", statusReporter);
1✔
1641
                    statusThread.start();
1✔
1642

1643
                    // DW: only in debug mode
1644
                    if (LOG.isDebugEnabled()) {
1!
1645
                        notificationService.debug();
×
1646
                    }
1647

1648
                    //Notify all running tasks that we are shutting down
1649

1650
                    //Notify all running XQueries that we are shutting down
1651
                    processMonitor.killAll(500);
1✔
1652

1653
                    if (isRecoveryEnabled()) {
1✔
1654
                        journalManager.ifPresent(jm -> jm.flush(true, true));
1✔
1655
                    }
1656

1657
                    final long waitStart = System.currentTimeMillis();
1✔
1658
                    //Are there active brokers ?
1659
                    if (activeBrokers.size() > 0) {
1!
1660
                        printSystemInfo();
×
1661
                        LOG.info("Waiting {}ms for remaining threads to shut down...", maxShutdownWait);
×
1662
                        while (activeBrokers.size() > 0) {
×
1663
                            try {
1664
                                //Wait until they become inactive...
1665
                                this.wait(1000);
×
1666
                            } catch (final InterruptedException e) {
×
1667
                                //nothing to be done
1668
                            }
1669

1670
                            //...or force the shutdown
1671
                            if (maxShutdownWait > -1 && System.currentTimeMillis() - waitStart > maxShutdownWait) {
×
1672
                                LOG.warn("Not all threads returned. Forcing shutdown ...");
×
1673
                                break;
×
1674
                            }
1675
                        }
1676
                    }
1677
                    LOG.debug("Calling shutdown ...");
1✔
1678

1679
                    //TODO : replace the following code by get()/release() statements ?
1680
                    // WM: deadlock risk if not all brokers returned properly.
1681
                    DBBroker broker = null;
1✔
1682
                    if (inactiveBrokers.isEmpty())
1!
1683
                        try {
1684
                            broker = createBroker();
×
1685
                        } catch (final EXistException e) {
×
1686
                            LOG.warn("could not create instance for shutdown. Giving up.");
×
1687
                        }
×
1688
                    else
1689
                    //TODO : this broker is *not* marked as active and may be reused by another process !
1690
                    //TODO : use get() then release the broker ?
1691
                    // WM: deadlock risk if not all brokers returned properly.
1692
                    //TODO: always createBroker? -dmitriy
1693
                    {
1694
                        broker = inactiveBrokers.peek();
1✔
1695
                    }
1696

1697
                    try {
1698
                        if (broker != null) {
1!
1699
                            broker.prepare();
1✔
1700
                            broker.pushSubject(securityManager.getSystemSubject());
1✔
1701
                        }
1702

1703
                        try {
1704
                            // instruct all database services to stop
1705
                            servicesManager.stopSystemServices(broker);
1✔
1706
                        } catch(final BrokerPoolServicesManagerException e) {
1✔
1707
                           for(final BrokerPoolServiceException bpse : e.getServiceExceptions()) {
×
1708
                               LOG.error(bpse.getMessage(), bpse);
×
1709
                           }
1710
                        }
1711

1712
                        //TOUNDERSTAND (pb) : shutdown() is called on only *one* broker ?
1713
                        // WM: yes, the database files are shared, so only one broker is needed to close them for all
1714
                        broker.shutdown();
1✔
1715

1716
                    } finally {
1✔
1717
                        if(broker != null) {
1!
1718
                            broker.popSubject();
1✔
1719
                        }
1720
                    }
1721

1722
                    collectionCache.invalidateAll();
1✔
1723

1724
                    // final notification to database services to shutdown
1725
                    servicesManager.shutdown();
1✔
1726

1727
                    // remove all remaining inactive brokers as we have shutdown now and no longer need those
1728
                    inactiveBrokers.clear();
1✔
1729

1730
                    // deregister JMX MBeans
1731
                    AgentFactory.getInstance().closeDBInstance(this);
1✔
1732

1733
                    //Clear the living instances container
1734
                    shutdownInstanceConsumer.accept(instanceName);
1✔
1735
                    if (!readOnly.get()) {
1!
1736
                        // release the lock on the data directory
1737
                        dataLock.release();
1✔
1738
                    }
1739

1740
                    //clearing additional resources, like ThreadLocal
1741
                    clearThreadLocals();
1✔
1742

1743
                    LOG.info("shutdown complete !");
1✔
1744

1745
                    if (shutdownListener != null) {
1✔
1746
                        shutdownListener.shutdown(instanceName, instancesCount());
1✔
1747
                    }
1748
                }
1749
            } finally {
1750
                // clear instance variables, just to be sure they will be garbage collected
1751
                // the test suite restarts the db a few hundred times
1752
                Configurator.clear(this);
1✔
1753
                transactionManager = null;
1✔
1754
                collectionCache = null;
1✔
1755
                xQueryPool = null;
1✔
1756
                processMonitor = null;
1✔
1757
                collectionConfigurationManager = null;
1✔
1758
                notificationService = null;
1✔
1759
                indexManager = null;
1✔
1760
                xmlReaderPool = null;
1✔
1761
                shutdownListener = null;
1✔
1762
                securityManager = null;
1✔
1763

1764
                if (lockManager != null) {
1!
1765
                    lockManager.getLockTable().shutdown();
1✔
1766
                    lockManager = null;
1✔
1767
                }
1768

1769
                notificationService = null;
1✔
1770
                statusObservers.clear();
1✔
1771
                startupTriggersManager = null;
1✔
1772
                statusReporter.terminate();
1✔
1773
                statusReporter = null;
1✔
1774

1775
//                instanceThreadGroup.destroy();
1776
            }
1777
        } finally {
1778
            status.process(Event.FINISHED_SHUTDOWN);
1✔
1779
        }
1780
    }
1✔
1781

1782
    public void addStatusObserver(final Observer statusObserver) {
1783
        this.statusObservers.add(statusObserver);
×
1784
    }
×
1785

1786
    public boolean removeStatusObserver(final Observer statusObserver) {
1787
        return this.statusObservers.remove(statusObserver);
×
1788
    }
1789

1790
    private void clearThreadLocals() {
1791
        for (final Thread thread : Thread.getAllStackTraces().keySet()) {
1✔
1792
            try {
1793
                cleanThreadLocalsForThread(thread);
×
1794
            } catch (final EXistException ex) {
1✔
1795
                if (LOG.isDebugEnabled()) {
1!
1796
                    LOG.warn("Could not clear ThreadLocals for thread: {}", thread.getName());
×
1797
                }
1798
            }
1799
        }
1800
    }
1✔
1801

1802
    private void cleanThreadLocalsForThread(final Thread thread) throws EXistException {
1803
        try {
1804
            // Get a reference to the thread locals table of the current thread
1805
            final Field threadLocalsField = Thread.class.getDeclaredField("threadLocals");
1✔
1806
            threadLocalsField.setAccessible(true);
×
1807
            final Object threadLocalTable = threadLocalsField.get(thread);
×
1808

1809
            // Get a reference to the array holding the thread local variables inside the
1810
            // ThreadLocalMap of the current thread
1811
            final Class threadLocalMapClass = Class.forName("java.lang.ThreadLocal$ThreadLocalMap");
×
1812
            final Field tableField = threadLocalMapClass.getDeclaredField("table");
×
1813
            tableField.setAccessible(true);
×
1814
            final Object table = tableField.get(threadLocalTable);
×
1815

1816
            // The key to the ThreadLocalMap is a WeakReference object. The referent field of this object
1817
            // is a reference to the actual ThreadLocal variable
1818
            final Field referentField = Reference.class.getDeclaredField("referent");
×
1819
            referentField.setAccessible(true);
×
1820

1821
            for (int i = 0; i < Array.getLength(table); i++) {
×
1822
                // Each entry in the table array of ThreadLocalMap is an Entry object
1823
                // representing the thread local reference and its value
1824
                final Object entry = Array.get(table, i);
×
1825
                if (entry != null) {
×
1826
                    // Get a reference to the thread local object and remove it from the table
1827
                    final ThreadLocal threadLocal = (ThreadLocal)referentField.get(entry);
×
1828
                    threadLocal.remove();
×
1829
                }
1830
            }
1831
        } catch(final Exception e) {
1✔
1832
            // We will tolerate an exception here and just log it
1833
            throw new EXistException(e);
1✔
1834
        }
1835
    }
×
1836

1837
    public Optional<BrokerWatchdog> getWatchdog() {
1838
        return watchdog;
×
1839
    }
1840

1841
    //TODO : move this elsewhere
1842
    public void triggerCheckpoint() {
1843
        if(syncRequired) {
1✔
1844
            return;
1✔
1845
        }
1846
        synchronized(this) {
1✔
1847
            syncEvent = Sync.MAJOR;
1✔
1848
            syncRequired = true;
1✔
1849
            checkpoint = true;
1✔
1850
        }
1851
    }
1✔
1852

1853
    private Debuggee debuggee = null;
1✔
1854

1855
    public Debuggee getDebuggee() {
1856
        synchronized(this) {
×
1857
            if(debuggee == null) {
×
1858
                debuggee = DebuggeeFactory.getInstance();
×
1859
            }
1860
        }
1861

1862
        return debuggee;
×
1863
    }
1864

1865
    public Calendar getStartupTime() {
1866
        return startupTime;
1✔
1867
    }
1868

1869
    public void printSystemInfo() {
1870
        try(final StringWriter sout = new StringWriter();
×
1871
            final PrintWriter writer = new PrintWriter(sout)) {
×
1872

1873
            writer.println("SYSTEM INFO");
×
1874
            writer.format("Database instance: %s\n", getId());
×
1875
            writer.println("-------------------------------------------------------------------");
×
1876
            watchdog.ifPresent(wd -> wd.dump(writer));
×
1877

1878
            final String s = sout.toString();
×
1879
            LOG.info(s);
×
1880
            System.err.println(s);
×
1881
        } catch(final IOException e) {
×
1882
            LOG.error(e);
×
1883
        }
1884
    }
×
1885

1886
    @ThreadSafe
1887
    private static class StatusReporter extends Observable implements Runnable {
1888
        private String status;
1889
        private volatile boolean terminate = false;
1✔
1890

1891
        public StatusReporter(final String status) {
1✔
1892
            this.status = status;
1✔
1893
        }
1✔
1894

1895
        public synchronized void setStatus(final String status) {
1896
            this.status = status;
1✔
1897
            this.setChanged();
1✔
1898
            this.notifyObservers(status);
1✔
1899
        }
1✔
1900

1901
        public void terminate() {
1902
            this.terminate = true;
1✔
1903
            synchronized(this) {
1✔
1904
                this.notifyAll();
1✔
1905
            }
1906
        }
1✔
1907

1908
        @Override
1909
        public void run() {
1910
            while(!terminate) {
1✔
1911
                synchronized(this) {
1✔
1912
                    try {
1913
                        wait(500);
1✔
1914
                    } catch(final InterruptedException e) {
1✔
1915
                        // nothing to do
1916
                    }
1917
                    this.setChanged();
1✔
1918
                    this.notifyObservers(status);
1✔
1919
                }
1920
            }
1921
        }
1✔
1922
    }
1923

1924
    @Override
1925
    public Path getStoragePlace() {
1926
        return (Path)conf.getProperty(BrokerPool.PROPERTY_DATA_DIR);
×
1927
    }
1928

1929
    private final List<TriggerProxy<? extends DocumentTrigger>> documentTriggers = new ArrayList<>();
1✔
1930
    private final List<TriggerProxy<? extends CollectionTrigger>> collectionTriggers = new ArrayList<>();
1✔
1931

1932
    @Override
1933
    public List<TriggerProxy<? extends DocumentTrigger>> getDocumentTriggers() {
1934
        return documentTriggers;
1✔
1935
    }
1936

1937
    @Override
1938
    public List<TriggerProxy<? extends CollectionTrigger>> getCollectionTriggers() {
1939
        return collectionTriggers;
1✔
1940
    }
1941

1942
    @Override
1943
    public void registerDocumentTrigger(final Class<? extends DocumentTrigger> clazz) {
1944
        documentTriggers.add(new DocumentTriggerProxy(clazz));
1✔
1945
    }
1✔
1946

1947
    @Override
1948
    public void registerCollectionTrigger(final Class<? extends CollectionTrigger> clazz) {
1949
        collectionTriggers.add(new CollectionTriggerProxy(clazz));
×
1950
    }
×
1951

1952
    public net.sf.saxon.Configuration getSaxonConfiguration() {
1953
        return saxonConfiguration.get().getConfiguration();
1✔
1954
    }
1955

1956
    public net.sf.saxon.s9api.Processor getSaxonProcessor() {
1957
        return saxonConfiguration.get().getProcessor();
1✔
1958
    }
1959

1960
    /**
1961
     * Represents a change involving {@link BrokerPool#inactiveBrokers}
1962
     * or {@link BrokerPool#activeBrokers} or {@link DBBroker#getReferenceCount}
1963
     *
1964
     * Used for tracing broker leases
1965
     */
1966
    private static class TraceableBrokerLeaseChange extends TraceableStateChange<TraceableBrokerLeaseChange.BrokerInfo, TraceableBrokerLeaseChange.Change> {
1967
        public enum Change {
×
1968
            GET,
×
1969
            RELEASE
×
1970
        }
1971

1972
        public static class BrokerInfo {
1973
            final String id;
1974
            final int referenceCount;
1975

1976
            public BrokerInfo(final String id, final int referenceCount) {
×
1977
                this.id = id;
×
1978
                this.referenceCount = referenceCount;
×
1979
            }
×
1980
        }
1981

1982
        private TraceableBrokerLeaseChange(final Change change, final BrokerInfo brokerInfo) {
1983
            super(change, brokerInfo);
×
1984
        }
×
1985

1986
        @Override
1987
        public String getId() {
1988
            return getState().id;
×
1989
        }
1990

1991
        @Override
1992
        public String describeState() {
1993
            return Integer.toString(getState().referenceCount);
×
1994
        }
1995

1996
        static TraceableBrokerLeaseChange get(final BrokerInfo brokerInfo) {
1997
            return new TraceableBrokerLeaseChange(Change.GET, brokerInfo);
×
1998
        }
1999

2000
        static TraceableBrokerLeaseChange release(final BrokerInfo brokerInfo) {
2001
            return new TraceableBrokerLeaseChange(Change.RELEASE, brokerInfo);
×
2002
        }
2003
    }
2004
}
2005

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc