• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

knowledgepixels / nanopub-query / 24127353986

08 Apr 2026 09:07AM UTC coverage: 66.922% (-0.08%) from 66.999%
24127353986

push

github

tkuhn
fix: prevent event loop blocking from network I/O and lock contention

- Cache nanopub lookups in GrlcSpec to avoid blocking GetNanopub.get()
  calls on the Vert.x event loop for every API request
- Wrap /, /pubkeys, /types handlers in executeBlocking to prevent
  getRepositoryNames() HTTP calls from blocking the event loop
- Make StatusController.getState() lock-free with volatile fields
- Increase HTTP fetch timeouts from 1s to 10s

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

227 of 378 branches covered (60.05%)

Branch coverage included in aggregate %.

645 of 925 relevant lines covered (69.73%)

10.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.46
src/main/java/com/knowledgepixels/query/StatusController.java
1
package com.knowledgepixels.query;
2

3
import org.eclipse.rdf4j.common.transaction.IsolationLevels;
4
import org.eclipse.rdf4j.model.IRI;
5
import org.eclipse.rdf4j.model.Literal;
6
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
7
import org.eclipse.rdf4j.repository.RepositoryConnection;
8
import org.nanopub.vocabulary.NPA;
9

10
import java.util.Objects;
11

12
/**
13
 * Class to control the load status of the database.
14
 */
15
public class StatusController {
6✔
16

17
    /**
18
     * The load states in which the database can be.
19
     */
20
    public enum State {
9✔
21
        /**
22
         * The service is launching.
23
         */
24
        LAUNCHING,
18✔
25
        /**
26
         * The service is loading.
27
         */
28
        LOADING_INITIAL,
18✔
29
        /**
30
         * The service is loading updates.
31
         */
32
        LOADING_UPDATES,
18✔
33
        /**
34
         * The service is ready to serve requests.
35
         */
36
        READY,
18✔
37
        /**
38
         * The service detected a registry reset and is about to resync.
39
         */
40
        RESETTING,
18✔
41
    }
42

43
    /**
44
     * Get the singleton instance of the StatusController.
45
     *
46
     * @return the StatusController instance
47
     */
48
    public static StatusController get() {
49
        return instance;
6✔
50
    }
51

52
    private final static StatusController instance = new StatusController();
12✔
53

54
    static final IRI HAS_REGISTRY_SETUP_ID =
3✔
55
            SimpleValueFactory.getInstance().createIRI(NPA.NAMESPACE, "hasRegistrySetupId");
15✔
56

57
    private boolean initialized = false;
9✔
58
    private volatile State state = null;
9✔
59
    private volatile long lastCommittedCounter = -1;
9✔
60
    private Long registrySetupId = null;
12✔
61
    private RepositoryConnection adminRepoConn;
62

63
    /**
64
     * Represents the current status of the service, including the load counter.
65
     */
66
    public static class LoadingStatus {
67

68
        /**
69
         * The current state of the service.
70
         */
71
        public final State state;
72

73
        /**
74
         * The current load counter.
75
         */
76
        public final long loadCounter;
77

78
        private LoadingStatus(State state, long loadCounter) {
6✔
79
            this.state = state;
9✔
80
            this.loadCounter = loadCounter;
9✔
81
        }
3✔
82

83
        /**
84
         * Create a new LoadingStatus instance.
85
         *
86
         * @param state       the current state of the service
87
         * @param loadCounter the current load counter
88
         * @return a new LoadingStatus instance
89
         */
90
        public static LoadingStatus of(State state, long loadCounter) {
91
            return new LoadingStatus(state, loadCounter);
18✔
92
        }
93

94
        @Override
95
        public boolean equals(Object o) {
96
            if (o == null || getClass() != o.getClass()) {
21✔
97
                return false;
6✔
98
            }
99
            LoadingStatus that = (LoadingStatus) o;
9✔
100
            return loadCounter == that.loadCounter && state == that.state;
45✔
101
        }
102

103
        @Override
104
        public int hashCode() {
105
            return Objects.hash(state, loadCounter);
45✔
106
        }
107

108
    }
109

110
    /**
111
     * Initialize the StatusController, fetching the last known state from the DB.
112
     * This must be called right after service startup, before loading any nanopubs.
113
     *
114
     * @return the current state and the last committed counter
115
     */
116
    public LoadingStatus initialize() {
117
        synchronized (this) {
12✔
118
            if (initialized) {
9✔
119
                throw new IllegalStateException("Already initialized");
15✔
120
            }
121
            state = State.LAUNCHING;
9✔
122
            adminRepoConn = TripleStore.get().getAdminRepoConnection();
12✔
123
            // Serializable, as the service state needs to be strictly consistent
124
            adminRepoConn.begin(IsolationLevels.SERIALIZABLE);
12✔
125
            // Fetch the state from the DB
126
            try (var statements = adminRepoConn.getStatements(NPA.THIS_REPO, NPA.HAS_STATUS, null, NPA.GRAPH)) {
39✔
127
                if (!statements.hasNext()) {
9!
128
                    adminRepoConn.add(NPA.THIS_REPO, NPA.HAS_STATUS, stateAsLiteral(state), NPA.GRAPH);
48✔
129
                } else {
130
                    var stateStatement = statements.next();
×
131
                    state = State.valueOf(stateStatement.getObject().stringValue());
×
132
                }
133
            }
134
            // Fetch the load counter from the DB
135
            try (var statements = adminRepoConn.getStatements(NPA.THIS_REPO, NPA.HAS_REGISTRY_LOAD_COUNTER, null, NPA.GRAPH)) {
39✔
136
                if (!statements.hasNext()) {
9!
137
                    adminRepoConn.add(NPA.THIS_REPO, NPA.HAS_REGISTRY_LOAD_COUNTER, adminRepoConn.getValueFactory().createLiteral(-1L), NPA.GRAPH);
51✔
138
                } else {
139
                    var counterStatement = statements.next();
×
140
                    var stringVal = counterStatement.getObject().stringValue();
×
141
                    lastCommittedCounter = Long.parseLong(stringVal);
×
142
                }
143
            }
144
            // Fetch the registry setup ID from the DB
145
            try (var statements = adminRepoConn.getStatements(NPA.THIS_REPO, HAS_REGISTRY_SETUP_ID, null, NPA.GRAPH)) {
39✔
146
                if (statements.hasNext()) {
9!
147
                    var setupIdStatement = statements.next();
×
148
                    registrySetupId = Long.parseLong(setupIdStatement.getObject().stringValue());
×
149
                }
150
                adminRepoConn.commit();
9✔
151
            } catch (Exception e) {
×
152
                if (adminRepoConn.isActive()) {
×
153
                    try {
154
                        adminRepoConn.rollback();
×
155
                    } catch (Exception rollbackException) {
×
156
                        // Transaction may not be registered on server (e.g., already committed, timed out, or connection reset)
157
                        // Log the rollback failure but don't mask the original exception
158
                    }
×
159
                }
160
                throw new RuntimeException(e);
×
161
            }
3✔
162
            initialized = true;
9✔
163
            return getState();
15✔
164
        }
165
    }
166

167
    /**
168
     * Get the current state of the service.
169
     *
170
     * @return the current state and the last committed counter
171
     */
172
    public LoadingStatus getState() {
173
        return LoadingStatus.of(state, lastCommittedCounter);
18✔
174
    }
175

176
    /**
177
     * Transition the service to the LOADING_INITIAL state and update the load counter.
178
     * This should be called in two situations:
179
     * - By the main loading thread (after calling initialize()) to start loading the initial nanopubs.
180
     * - By the initial nanopub loader, as it processes the initial nanopubs.
181
     *
182
     * @param loadCounter the new load counter
183
     */
184
    public void setLoadingInitial(long loadCounter) {
185
        synchronized (this) {
12✔
186
            if (state != State.LAUNCHING && state != State.LOADING_INITIAL && state != State.RESETTING) {
36✔
187
                throw new IllegalStateException("Cannot transition to LOADING_INITIAL, as the " + "current state is " + state);
24✔
188
            }
189
            if (state != State.RESETTING && lastCommittedCounter > loadCounter) {
27✔
190
                throw new IllegalStateException("Cannot update the load counter from " + lastCommittedCounter + " to " + loadCounter);
24✔
191
            }
192
            updateState(State.LOADING_INITIAL, loadCounter);
12✔
193
        }
9✔
194
    }
3✔
195

196
    /**
197
     * Transition the service to the LOADING_UPDATES state and update the load counter.
198
     * This should be called by the updates loader, when it starts processing new nanopubs, or
199
     * when it finishes processing a batch of nanopubs.
200
     *
201
     * @param loadCounter the new load counter
202
     */
203
    public void setLoadingUpdates(long loadCounter) {
204
        synchronized (this) {
12✔
205
            if (state != State.LAUNCHING && state != State.LOADING_UPDATES && state != State.READY) {
36✔
206
                throw new IllegalStateException("Cannot transition to LOADING_UPDATES, as the " + "current state is " + state);
24✔
207
            }
208
            if (lastCommittedCounter > loadCounter) {
15✔
209
                throw new IllegalStateException("Cannot update the load counter from " + lastCommittedCounter + " to " + loadCounter);
24✔
210
            }
211
            updateState(State.LOADING_UPDATES, loadCounter);
12✔
212
        }
9✔
213
    }
3✔
214

215
    /**
216
     * Transition the service to the READY state.
217
     * This should be called by the loaders, after they finish their work.
218
     */
219
    public void setReady() {
220
        synchronized (this) {
12✔
221
            if (state != State.READY) {
12✔
222
                updateState(State.READY, lastCommittedCounter);
15✔
223
            }
224
        }
9✔
225
    }
3✔
226

227
    /**
228
     * Transition the service to the RESETTING state, resetting the load counter to -1.
229
     * This is triggered when a registry reset is detected (setupId changed or counter decreased).
230
     */
231
    public void setResetting() {
232
        synchronized (this) {
12✔
233
            if (state != State.READY && state != State.LOADING_UPDATES) {
24✔
234
                throw new IllegalStateException("Cannot transition to RESETTING, as the " + "current state is " + state);
24✔
235
            }
236
            updateState(State.RESETTING, -1);
12✔
237
        }
9✔
238
    }
3✔
239

240
    /**
241
     * Get the persisted registry setup ID.
242
     *
243
     * @return the registry setup ID, or null if not yet known
244
     */
245
    public Long getRegistrySetupId() {
246
        synchronized (this) {
12✔
247
            return registrySetupId;
15✔
248
        }
249
    }
250

251
    /**
252
     * Persist a new registry setup ID to the admin repo.
253
     *
254
     * @param setupId the new setup ID
255
     */
256
    public void setRegistrySetupId(long setupId) {
257
        synchronized (this) {
12✔
258
            try {
259
                adminRepoConn.begin(IsolationLevels.SERIALIZABLE);
12✔
260
                adminRepoConn.remove(NPA.THIS_REPO, HAS_REGISTRY_SETUP_ID, null, NPA.GRAPH);
36✔
261
                adminRepoConn.add(NPA.THIS_REPO, HAS_REGISTRY_SETUP_ID, adminRepoConn.getValueFactory().createLiteral(setupId), NPA.GRAPH);
48✔
262
                adminRepoConn.commit();
9✔
263
                registrySetupId = setupId;
12✔
264
            } catch (Exception e) {
×
265
                if (adminRepoConn.isActive()) {
×
266
                    try {
267
                        adminRepoConn.rollback();
×
268
                    } catch (Exception rollbackException) {
×
269
                        // Log the rollback failure but don't mask the original exception
270
                    }
×
271
                }
272
                throw new RuntimeException(e);
×
273
            }
3✔
274
        }
9✔
275
    }
3✔
276

277
    void updateState(State newState, long loadCounter) {
278
        synchronized (this) {
12✔
279
            // Update in-memory state first so getState() (called from the event loop) never blocks
280
            state = newState;
9✔
281
            lastCommittedCounter = loadCounter;
9✔
282
            try {
283
                // Serializable, as the service state needs to be strictly consistent
284
                adminRepoConn.begin(IsolationLevels.SERIALIZABLE);
12✔
285
                adminRepoConn.remove(NPA.THIS_REPO, NPA.HAS_STATUS, null, NPA.GRAPH);
36✔
286
                adminRepoConn.add(NPA.THIS_REPO, NPA.HAS_STATUS, stateAsLiteral(newState), NPA.GRAPH);
42✔
287
                adminRepoConn.remove(NPA.THIS_REPO, NPA.HAS_REGISTRY_LOAD_COUNTER, null, NPA.GRAPH);
36✔
288
                adminRepoConn.add(NPA.THIS_REPO, NPA.HAS_REGISTRY_LOAD_COUNTER, adminRepoConn.getValueFactory().createLiteral(loadCounter), NPA.GRAPH);
48✔
289
                adminRepoConn.commit();
9✔
290
            } catch (Exception e) {
×
291
                if (adminRepoConn.isActive()) {
×
292
                    try {
293
                        adminRepoConn.rollback();
×
294
                    } catch (Exception rollbackException) {
×
295
                        // Transaction may not be registered on server (e.g., already committed, timed out, or connection reset)
296
                        // Log the rollback failure but don't mask the original exception
297
                    }
×
298
                }
299
                throw new RuntimeException(e);
×
300
            }
3✔
301
        }
9✔
302
    }
3✔
303

304
    private Literal stateAsLiteral(State s) {
305
        return adminRepoConn.getValueFactory().createLiteral(s.toString());
21✔
306
    }
307

308
    /**
309
     * Reset the StatusController for testing purposes.
310
     * This will clear the state and allow re-initialization.
311
     */
312
    void resetForTest() {
313
        synchronized (this) {
12✔
314
            initialized = false;
9✔
315
            state = null;
9✔
316
            lastCommittedCounter = -1;
9✔
317
            registrySetupId = null;
9✔
318
            adminRepoConn = null;
9✔
319
        }
9✔
320
    }
3✔
321

322
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc