• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

evolvedbinary / elemental / 982

29 Apr 2025 08:34PM UTC coverage: 56.409% (+0.007%) from 56.402%
982

push

circleci

adamretter
[feature] Improve README.md badges

28451 of 55847 branches covered (50.94%)

Branch coverage included in aggregate %.

77468 of 131924 relevant lines covered (58.72%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.34
/exist-core/src/main/java/org/exist/storage/ProcessMonitor.java
1
/*
1✔
2
 * Elemental
3
 * Copyright (C) 2024, Evolved Binary Ltd
4
 *
5
 * admin@evolvedbinary.com
6
 * https://www.evolvedbinary.com | https://www.elemental.xyz
7
 *
8
 * Use of this software is governed by the Business Source License 1.1
9
 * included in the LICENSE file and at www.mariadb.com/bsl11.
10
 *
11
 * Change Date: 2028-04-27
12
 *
13
 * On the date above, in accordance with the Business Source License, use
14
 * of this software will be governed by the Apache License, Version 2.0.
15
 *
16
 * Additional Use Grant: Production use of the Licensed Work for a permitted
17
 * purpose. A Permitted Purpose is any purpose other than a Competing Use.
18
 * A Competing Use means making the Software available to others in a commercial
19
 * product or service that: substitutes for the Software; substitutes for any
20
 * other product or service we offer using the Software that exists as of the
21
 * date we make the Software available; or offers the same or substantially
22
 * similar functionality as the Software.
23
 *
24
 * NOTE: Parts of this file contain code from 'The eXist-db Authors'.
25
 *       The original license header is included below.
26
 *
27
 * =====================================================================
28
 *
29
 * eXist-db Open Source Native XML Database
30
 * Copyright (C) 2001 The eXist-db Authors
31
 *
32
 * info@exist-db.org
33
 * http://www.exist-db.org
34
 *
35
 * This library is free software; you can redistribute it and/or
36
 * modify it under the terms of the GNU Lesser General Public
37
 * License as published by the Free Software Foundation; either
38
 * version 2.1 of the License, or (at your option) any later version.
39
 *
40
 * This library is distributed in the hope that it will be useful,
41
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
42
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
43
 * Lesser General Public License for more details.
44
 *
45
 * You should have received a copy of the GNU Lesser General Public
46
 * License along with this library; if not, write to the Free Software
47
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
48
 */
49
package org.exist.storage;
50

51
import org.apache.logging.log4j.LogManager;
52
import org.apache.logging.log4j.Logger;
53
import org.exist.http.servlets.RequestWrapper;
54
import org.exist.http.urlrewrite.XQueryURLRewrite;
55
import org.exist.source.Source;
56
import org.exist.util.Configuration;
57
import org.exist.xquery.Module;
58
import org.exist.xquery.XQueryContext;
59
import org.exist.xquery.XQueryWatchDog;
60

61
import org.exist.xquery.functions.request.RequestModule;
62
import org.exist.xquery.util.ExpressionDumper;
63

64
import javax.annotation.Nullable;
65
import java.util.*;
66
import java.util.concurrent.DelayQueue;
67
import java.util.concurrent.Delayed;
68
import java.util.concurrent.TimeUnit;
69

70
/**
71
 * Class to keep track of all running queries in a database instance. The main
72
 * purpose of this class is to signal running queries that the database is going to
73
 * shut down. This is done through the {@link org.exist.xquery.XQueryWatchDog}
74
 * registered by each query. It is up to the query to check the watchdog's state.
75
 * If it simply ignores the terminate signal, it will be killed after the shutdown
76
 * timeout is reached.
77
 *
78
 * @author wolf
79
 */
80
public class ProcessMonitor implements BrokerPoolService {
1✔
81

82
    public static final String ACTION_UNSPECIFIED = "unspecified";
83
    public static final String ACTION_VALIDATE_DOC = "validating document";
84
    public static final String ACTION_STORE_DOC = "storing document";
85
    public static final String ACTION_STORE_BINARY = "storing binary resource";
86
    public static final String ACTION_REMOVE_XML = "remove XML resource";
87
    public static final String ACTION_REMOVE_BINARY = "remove binary resource";
88
    public static final String ACTION_REMOVE_COLLECTION = "remove collection";
89
    public static final String ACTION_REINDEX_COLLECTION = "reindex collection";
90
    public static final String ACTION_COPY_COLLECTION = "copy collection";
91
    public static final String ACTION_MOVE_COLLECTION = "move collection";
92
    public static final String ACTION_BACKUP = "backup";
93

94
    private static final Logger LOG = LogManager.getLogger(ProcessMonitor.class);
1✔
95
    private static final long QUERY_HISTORY_TIMEOUT = 2 * 60 * 1000; // 2 minutes
96
    private static final long MIN_TIME = 100;
1✔
97

98
    private final Set<XQueryWatchDog> runningQueries = new HashSet<>();
1✔
99
    private final DelayQueue<QueryHistory> history = new DelayQueue<>();
1✔
100
    private final Map<Thread, JobInfo> processes = new HashMap<>();
1✔
101
    private long maxShutdownWait;
102
    private long historyTimespan = QUERY_HISTORY_TIMEOUT;
1✔
103
    private long minTime = MIN_TIME;
1✔
104
    private boolean trackRequests = false;
1✔
105

106
    @Override
107
    public void configure(final Configuration configuration) {
108
        this.maxShutdownWait = configuration.getProperty(BrokerPool.PROPERTY_SHUTDOWN_DELAY, BrokerPool.DEFAULT_MAX_SHUTDOWN_WAIT);
1✔
109
    }
1✔
110

111
    public void startJob(final String action) {
112
        startJob(action, null);
×
113
    }
×
114

115
    public void startJob(final String action, final Object addInfo) {
116
        startJob(action, addInfo, null);
1✔
117
    }
1✔
118

119
    //TODO: addInfo = XmldbURI ? -shabanovd
120
    public void startJob(final String action, final Object addInfo, final Monitor monitor) {
121
        final JobInfo info = new JobInfo(action, monitor);
1✔
122
        info.setAddInfo(addInfo);
1✔
123
        synchronized (this) {
1✔
124
            processes.put(info.getThread(), info);
1✔
125
        }
126
    }
1✔
127

128
    public synchronized void endJob() {
129
        processes.remove(Thread.currentThread());
1✔
130
        notifyAll();
1✔
131
    }
1✔
132

133
    public JobInfo[] runningJobs() {
134
        synchronized (this) {
1✔
135
            final JobInfo[] jobs = new JobInfo[processes.size()];
1✔
136
            int j = 0;
1✔
137
            for (final Iterator<JobInfo> i = processes.values().iterator(); i.hasNext(); j++) {
1!
138
                //BUG: addInfo = XmldbURI ? -shabanovd
139
                jobs[j] = i.next();
×
140
            }
141
            return jobs;
1✔
142
        }
143
    }
144

145
    public void stopRunningJobs() {
146
        final long waitStart = System.currentTimeMillis();
1✔
147
        synchronized (this) {
1✔
148
            if (maxShutdownWait > -1) {
1!
149
                while (processes.size() > 0) {
1!
150
                    try {
151
                        //Wait until they become inactive...
152
                        this.wait(1000);
×
153
                    } catch (final InterruptedException e) {
×
154
                        //no op
155
                        Thread.currentThread().interrupt(); // pass on interrupted status
×
156
                    }
157
                    //...or force the shutdown
158
                    if (maxShutdownWait > -1 && System.currentTimeMillis() - waitStart > maxShutdownWait) {
×
159
                        break;
×
160
                    }
161
                }
162
            }
163
            for (final JobInfo job : processes.values()) {
1!
164
                job.stop();
×
165
            }
166
        }
167
    }
1✔
168

169
    public void queryStarted(final XQueryWatchDog watchdog) {
170
        synchronized (runningQueries) {
1✔
171
            watchdog.setRunningThread(Thread.currentThread().getName());
1✔
172
            runningQueries.add(watchdog);
1✔
173
        }
174
    }
1✔
175

176
    public void queryCompleted(final XQueryWatchDog watchdog) {
177
        boolean found;
178
        synchronized (runningQueries) {
1✔
179
            found = runningQueries.remove(watchdog);
1✔
180
        }
181

182
        // add to query history if elapsed time > minTime
183
        final long elapsed = System.currentTimeMillis() - watchdog.getStartTime();
1✔
184
        if (found && elapsed > minTime) {
1✔
185
            synchronized (history) {
1✔
186
                final Source source = watchdog.getContext().getSource();
1✔
187
                final String sourceKey = source == null ? "unknown" : source.pathOrShortIdentifier();
1!
188
                QueryHistory qh = new QueryHistory(sourceKey, historyTimespan);
1✔
189
                qh.setMostRecentExecutionTime(watchdog.getStartTime());
1✔
190
                qh.setMostRecentExecutionDuration(elapsed);
1✔
191
                qh.incrementInvocationCount();
1✔
192
                if (trackRequests) {
1!
193
                    qh.setRequestURI(getRequestURI(watchdog));
×
194
                }
195
                history.add(qh);
1✔
196
                cleanHistory();
1✔
197
            }
198
        }
199
    }
1✔
200

201
    private void cleanHistory() {
202
        // remove timed out entries
203
        while (history.poll() != null) ;
1!
204
    }
1✔
205

206
    /**
207
     * The max duration (in milliseconds) for which queries are tracked in the query history. Older queries
208
     * will be removed (default is {@link #QUERY_HISTORY_TIMEOUT}).
209
     *
210
     * @param time max duration in ms
211
     */
212
    public void setHistoryTimespan(final long time) {
213
        historyTimespan = time;
×
214
    }
×
215

216
    public long getHistoryTimespan() {
217
        return historyTimespan;
1✔
218
    }
219

220
    /**
221
     * The minimum duration of a query (in milliseconds) to be added to the query history. Use this to filter out
222
     * very short-running queries (default is {@link #MIN_TIME}).
223
     *
224
     * @param time min duration in ms
225
     */
226
    public void setMinTime(final long time) {
227
        this.minTime = time;
×
228
    }
×
229

230
    public long getMinTime() {
231
        return minTime;
1✔
232
    }
233

234
    /**
235
     * Set to true if the class should attempt to determine the HTTP URI through which the query was triggered.
236
     * This is an important piece of information for diagnosis, but gathering it might be expensive, so request
237
     * URI tracking is disabled by default.
238
     *
239
     * @param track attempt to track URIs if true
240
     */
241
    public void setTrackRequestURI(final boolean track) {
242
        trackRequests = track;
×
243
    }
×
244

245
    public boolean getTrackRequestURI() {
246
        return trackRequests;
1✔
247
    }
248

249
    public static class QueryHistory implements Delayed {
250

251
        private final String source;
252
        private String requestURI = null;
1✔
253
        private long mostRecentExecutionTime;
254
        private long mostRecentExecutionDuration;
255
        private int invocationCount = 0;
1✔
256
        private long expires;
257

258
        public QueryHistory(final String source, final long delay) {
1✔
259
            this.source = source;
1✔
260
            this.expires = System.currentTimeMillis() + delay;
1✔
261
        }
1✔
262

263
        public String getSource() {
264
            return source;
×
265
        }
266

267
        public void incrementInvocationCount() {
268
            invocationCount++;
1✔
269
        }
1✔
270

271
        public int getInvocationCount() {
272
            return invocationCount;
×
273
        }
274

275
        public long getMostRecentExecutionTime() {
276
            return mostRecentExecutionTime;
×
277
        }
278

279
        public void setMostRecentExecutionTime(final long mostRecentExecutionTime) {
280
            this.mostRecentExecutionTime = mostRecentExecutionTime;
1✔
281
        }
1✔
282

283
        public long getMostRecentExecutionDuration() {
284
            return mostRecentExecutionDuration;
×
285
        }
286

287
        public void setMostRecentExecutionDuration(final long mostRecentExecutionDuration) {
288
            this.mostRecentExecutionDuration = mostRecentExecutionDuration;
1✔
289
        }
1✔
290

291
        public String getRequestURI() {
292
            return requestURI;
×
293
        }
294

295
        public void setRequestURI(final String uri) {
296
            requestURI = uri;
×
297
        }
×
298

299
        @Override
300
        public long getDelay(final TimeUnit unit) {
301
            return unit.convert(expires - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
1✔
302
        }
303

304
        @Override
305
        public int compareTo(final Delayed o) {
306
            return Long.compare(expires, ((QueryHistory) o).expires);
1✔
307
        }
308
    }
309

310
    public QueryHistory[] getRecentQueryHistory() {
311
        synchronized (history) {
1✔
312
            cleanHistory();
1✔
313
            return
1✔
314
                    history.stream()
1✔
315
                            .sorted((o1, o2) -> Long.compare(o2.expires, o1.expires))
1✔
316
                            .toArray(QueryHistory[]::new);
1✔
317
        }
318
    }
319

320

321
    public void killAll(final long waitTime) {
322
        synchronized(runningQueries) {
1✔
323
            for (final XQueryWatchDog watchdog : runningQueries) {
1!
324
                LOG.debug("Killing query: {}", ExpressionDumper.dump(watchdog.getContext().getRootExpression()));
×
325
                watchdog.kill(waitTime);
×
326
            }
327
        }
328
    }
1✔
329

330
    public XQueryWatchDog[] getRunningXQueries() {
331
        synchronized (runningQueries) {
1✔
332
            return runningQueries.toArray(new XQueryWatchDog[0]);
1✔
333
        }
334
    }
335

336
    public final static class Monitor {
1✔
337
        boolean stop = false;
1✔
338

339
        public boolean proceed() {
340
            return !stop;
1!
341
        }
342

343
        public void stop() {
344
            LOG.debug("Terminating job");
×
345
            this.stop = true;
×
346
        }
×
347
    }
348

349
    public final static class JobInfo {
350
        private final Thread thread;
351
        private final String action;
352
        private final long startTime;
353
        private final Monitor monitor;
354

355
        private Object addInfo = null;
1✔
356

357
        public JobInfo(final String action, final Monitor monitor) {
1✔
358
            this.thread = Thread.currentThread();
1✔
359
            this.action = action;
1✔
360
            this.monitor = monitor;
1✔
361
            this.startTime = System.currentTimeMillis();
1✔
362
        }
1✔
363

364
        public String getAction() {
365
            return action;
×
366
        }
367

368
        public Thread getThread() {
369
            return thread;
1✔
370
        }
371

372
        public long getStartTime() {
373
            return startTime;
×
374
        }
375

376
        public void setAddInfo(final Object info) {
377
            this.addInfo = info;
1✔
378
        }
1✔
379

380
        public Object getAddInfo() {
381
            return addInfo;
×
382
        }
383

384
        public void stop() {
385
            monitor.stop();
×
386
        }
×
387
    }
388

389
    /**
390
     * Try to figure out the HTTP request URI by which a query was called.
391
     * Request tracking is not enabled unless {@link #setTrackRequestURI(boolean)}
392
     * is called.
393
     *
394
     * @param watchdog XQuery WatchDog
395
     * @return HTTP request URI by which a query was called
396
     */
397
    public static String getRequestURI(final XQueryWatchDog watchdog) {
398
        @Nullable final Module[] modules = watchdog.getContext().getModules(RequestModule.NAMESPACE_URI);
×
399
        if (modules == null || modules.length == 0) {
×
400
            return null;
×
401
        }
402

403
        final Optional<RequestWrapper> maybeRequest = Optional.ofNullable(watchdog.getContext())
×
404
                .map(XQueryContext::getHttpContext)
×
405
                .map(XQueryContext.HttpContext::getRequest);
×
406

407
        if (!maybeRequest.isPresent()) {
×
408
            return null;
×
409
        }
410

411
        final RequestWrapper request = maybeRequest.get();
×
412
        final Object attr = request.getAttribute(XQueryURLRewrite.RQ_ATTR_REQUEST_URI);
×
413
        String uri;
414
        if (attr == null) {
×
415
            uri = request.getRequestURI();
×
416
        } else {
×
417
            uri = attr.toString();
×
418
        }
419
        String queryString = request.getQueryString();
×
420
        if (queryString != null) {
×
421
            uri += "?" + queryString;
×
422
        }
423
        return uri;
×
424
    }
425
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc