• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

knowledgepixels / nanopub-query / 26015151666

18 May 2026 05:23AM UTC coverage: 58.908% (+0.3%) from 58.609%
26015151666

Pull #99

github

web-flow
Merge 66797259a into 7de527b77
Pull Request #99: fix: allow setResetting() from LOADING_INITIAL and RESETTING

494 of 920 branches covered (53.7%)

Branch coverage included in aggregate %.

1318 of 2156 relevant lines covered (61.13%)

9.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.66
src/main/java/com/knowledgepixels/query/StatusController.java
1
package com.knowledgepixels.query;
2

3
import org.eclipse.rdf4j.common.transaction.IsolationLevels;
4
import org.eclipse.rdf4j.model.IRI;
5
import org.eclipse.rdf4j.model.Literal;
6
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
7
import org.eclipse.rdf4j.repository.RepositoryConnection;
8
import org.nanopub.vocabulary.NPA;
9

10
import java.util.Objects;
11

12
/**
13
 * Class to control the load status of the database.
14
 */
15
public class StatusController {
6✔
16

17
    /**
18
     * The load states in which the database can be.
19
     */
20
    public enum State {
9✔
21
        /**
22
         * The service is launching.
23
         */
24
        LAUNCHING,
18✔
25
        /**
26
         * The service is loading.
27
         */
28
        LOADING_INITIAL,
18✔
29
        /**
30
         * The service is loading updates.
31
         */
32
        LOADING_UPDATES,
18✔
33
        /**
34
         * The service is ready to serve requests.
35
         */
36
        READY,
18✔
37
        /**
38
         * The service detected a registry reset and is about to resync.
39
         */
40
        RESETTING,
18✔
41
    }
42

43
    /**
44
     * Get the singleton instance of the StatusController.
45
     *
46
     * @return the StatusController instance
47
     */
48
    public static StatusController get() {
49
        return instance;
6✔
50
    }
51

52
    private final static StatusController instance = new StatusController();
12✔
53

54
    static final IRI HAS_REGISTRY_SETUP_ID =
3✔
55
            SimpleValueFactory.getInstance().createIRI(NPA.NAMESPACE, "hasRegistrySetupId");
15✔
56

57
    private boolean initialized = false;
9✔
58
    private volatile State state = null;
9✔
59
    private volatile long lastCommittedCounter = -1;
9✔
60
    private volatile Long registrySetupId = null;
12✔
61
    private RepositoryConnection adminRepoConn;
62

63
    /**
64
     * Represents the current status of the service, including the load counter.
65
     */
66
    public static class LoadingStatus {
67

68
        /**
69
         * The current state of the service.
70
         */
71
        public final State state;
72

73
        /**
74
         * The current load counter.
75
         */
76
        public final long loadCounter;
77

78
        private LoadingStatus(State state, long loadCounter) {
6✔
79
            this.state = state;
9✔
80
            this.loadCounter = loadCounter;
9✔
81
        }
3✔
82

83
        /**
84
         * Create a new LoadingStatus instance.
85
         *
86
         * @param state       the current state of the service
87
         * @param loadCounter the current load counter
88
         * @return a new LoadingStatus instance
89
         */
90
        public static LoadingStatus of(State state, long loadCounter) {
91
            return new LoadingStatus(state, loadCounter);
18✔
92
        }
93

94
        @Override
95
        public boolean equals(Object o) {
96
            if (o == null || getClass() != o.getClass()) {
21✔
97
                return false;
6✔
98
            }
99
            LoadingStatus that = (LoadingStatus) o;
9✔
100
            return loadCounter == that.loadCounter && state == that.state;
45✔
101
        }
102

103
        @Override
104
        public int hashCode() {
105
            return Objects.hash(state, loadCounter);
45✔
106
        }
107

108
    }
109

110
    /**
111
     * Initialize the StatusController, fetching the last known state from the DB.
112
     * This must be called right after service startup, before loading any nanopubs.
113
     *
114
     * @return the current state and the last committed counter
115
     */
116
    public LoadingStatus initialize() {
117
        synchronized (this) {
12✔
118
            if (initialized) {
9✔
119
                throw new IllegalStateException("Already initialized");
15✔
120
            }
121
            state = State.LAUNCHING;
9✔
122
            adminRepoConn = TripleStore.get().getAdminRepoConnection();
12✔
123
            // Serializable, as the service state needs to be strictly consistent
124
            adminRepoConn.begin(IsolationLevels.SERIALIZABLE);
12✔
125
            // Fetch the state from the DB
126
            try (var statements = adminRepoConn.getStatements(NPA.THIS_REPO, NPA.HAS_STATUS, null, NPA.GRAPH)) {
39✔
127
                if (!statements.hasNext()) {
9!
128
                    adminRepoConn.add(NPA.THIS_REPO, NPA.HAS_STATUS, stateAsLiteral(state), NPA.GRAPH);
48✔
129
                } else {
130
                    var stateStatement = statements.next();
×
131
                    state = State.valueOf(stateStatement.getObject().stringValue());
×
132
                }
133
            }
134
            // Fetch the load counter from the DB
135
            try (var statements = adminRepoConn.getStatements(NPA.THIS_REPO, NPA.HAS_REGISTRY_LOAD_COUNTER, null, NPA.GRAPH)) {
39✔
136
                if (!statements.hasNext()) {
9!
137
                    adminRepoConn.add(NPA.THIS_REPO, NPA.HAS_REGISTRY_LOAD_COUNTER, adminRepoConn.getValueFactory().createLiteral(-1L), NPA.GRAPH);
51✔
138
                } else {
139
                    var counterStatement = statements.next();
×
140
                    var stringVal = counterStatement.getObject().stringValue();
×
141
                    lastCommittedCounter = Long.parseLong(stringVal);
×
142
                }
143
            }
144
            // Fetch the registry setup ID from the DB
145
            try (var statements = adminRepoConn.getStatements(NPA.THIS_REPO, HAS_REGISTRY_SETUP_ID, null, NPA.GRAPH)) {
39✔
146
                if (statements.hasNext()) {
9!
147
                    var setupIdStatement = statements.next();
×
148
                    registrySetupId = Long.parseLong(setupIdStatement.getObject().stringValue());
×
149
                }
150
                adminRepoConn.commit();
9✔
151
            } catch (Exception e) {
×
152
                if (adminRepoConn.isActive()) {
×
153
                    try {
154
                        adminRepoConn.rollback();
×
155
                    } catch (Exception rollbackException) {
×
156
                        // Transaction may not be registered on server (e.g., already committed, timed out, or connection reset)
157
                        // Log the rollback failure but don't mask the original exception
158
                    }
×
159
                }
160
                throw new RuntimeException(e);
×
161
            }
3✔
162
            initialized = true;
9✔
163
            return getState();
15✔
164
        }
165
    }
166

167
    /**
168
     * Get the current state of the service.
169
     *
170
     * @return the current state and the last committed counter
171
     */
172
    public LoadingStatus getState() {
173
        return LoadingStatus.of(state, lastCommittedCounter);
18✔
174
    }
175

176
    /**
177
     * Transition the service to the LOADING_INITIAL state and update the load counter.
178
     * This should be called in two situations:
179
     * - By the main loading thread (after calling initialize()) to start loading the initial nanopubs.
180
     * - By the initial nanopub loader, as it processes the initial nanopubs.
181
     *
182
     * @param loadCounter the new load counter
183
     */
184
    public void setLoadingInitial(long loadCounter) {
185
        synchronized (this) {
12✔
186
            if (state != State.LAUNCHING && state != State.LOADING_INITIAL && state != State.RESETTING) {
36✔
187
                throw new IllegalStateException("Cannot transition to LOADING_INITIAL, as the " + "current state is " + state);
24✔
188
            }
189
            if (state != State.RESETTING && lastCommittedCounter > loadCounter) {
27✔
190
                throw new IllegalStateException("Cannot update the load counter from " + lastCommittedCounter + " to " + loadCounter);
24✔
191
            }
192
            updateState(State.LOADING_INITIAL, loadCounter);
12✔
193
        }
9✔
194
    }
3✔
195

196
    /**
197
     * Transition the service to the LOADING_UPDATES state and update the load counter.
198
     * This should be called by the updates loader, when it starts processing new nanopubs, or
199
     * when it finishes processing a batch of nanopubs.
200
     *
201
     * @param loadCounter the new load counter
202
     */
203
    public void setLoadingUpdates(long loadCounter) {
204
        synchronized (this) {
12✔
205
            if (state != State.LAUNCHING && state != State.LOADING_UPDATES && state != State.READY) {
36✔
206
                throw new IllegalStateException("Cannot transition to LOADING_UPDATES, as the " + "current state is " + state);
24✔
207
            }
208
            if (lastCommittedCounter > loadCounter) {
15✔
209
                throw new IllegalStateException("Cannot update the load counter from " + lastCommittedCounter + " to " + loadCounter);
24✔
210
            }
211
            // Idempotence guard: skip the admin-repo rewrite if the state and counter
212
            // are already where we'd set them. A no-op loop iteration of the updates
213
            // loader otherwise re-writes the same two triples hundreds of times an
214
            // hour over a long idle tail, each write growing the admin-repo LMDB via
215
            // copy-on-write.
216
            if (state == State.LOADING_UPDATES && lastCommittedCounter == loadCounter) {
27!
217
                return;
×
218
            }
219
            updateState(State.LOADING_UPDATES, loadCounter);
12✔
220
        }
9✔
221
    }
3✔
222

223
    /**
224
     * Transition the service to the READY state.
225
     * This should be called by the loaders, after they finish their work.
226
     */
227
    public void setReady() {
228
        synchronized (this) {
12✔
229
            if (state != State.READY) {
12✔
230
                updateState(State.READY, lastCommittedCounter);
15✔
231
            }
232
        }
9✔
233
    }
3✔
234

235
    /**
236
     * Transition the service to the RESETTING state, resetting the load counter to -1.
237
     * This is triggered when a registry reset is detected (setupId changed or counter decreased),
238
     * or when FORCE_RESYNC is set on startup. Accepts any post-initialize source state:
239
     * a reset discards prior load progress, so it's valid from LOADING_INITIAL (crashed mid-init),
240
     * LOADING_UPDATES, READY, or RESETTING (retry after a crashed prior reset).
241
     */
242
    public void setResetting() {
243
        synchronized (this) {
12✔
244
            if (state == State.LAUNCHING) {
12✔
245
                throw new IllegalStateException("Cannot transition to RESETTING, as the " + "current state is " + state);
24✔
246
            }
247
            updateState(State.RESETTING, -1);
12✔
248
        }
9✔
249
    }
3✔
250

251
    /**
252
     * Get the persisted registry setup ID.
253
     *
254
     * @return the registry setup ID, or null if not yet known
255
     */
256
    public Long getRegistrySetupId() {
257
        // Lock-free read: field is volatile, writers still hold synchronized(this)
258
        // so there are no concurrent writers. applyGlobalHeaders in MainVerticle
259
        // calls this on every inbound request on the Vert.x event loop — blocking
260
        // here behind updateState's admin-repo transaction was a BlockedThreadChecker
261
        // hazard. The DB-commit-first order in the setter (setRegistrySetupId)
262
        // means a reader can observe the previous value for the few ms between DB
263
        // commit and field assignment; no caller depends on stronger consistency.
264
        return registrySetupId;
9✔
265
    }
266

267
    /**
268
     * Persist a new registry setup ID to the admin repo.
269
     *
270
     * @param setupId the new setup ID
271
     */
272
    public void setRegistrySetupId(long setupId) {
273
        synchronized (this) {
12✔
274
            try {
275
                adminRepoConn.begin(IsolationLevels.SERIALIZABLE);
12✔
276
                adminRepoConn.remove(NPA.THIS_REPO, HAS_REGISTRY_SETUP_ID, null, NPA.GRAPH);
36✔
277
                adminRepoConn.add(NPA.THIS_REPO, HAS_REGISTRY_SETUP_ID, adminRepoConn.getValueFactory().createLiteral(setupId), NPA.GRAPH);
48✔
278
                adminRepoConn.commit();
9✔
279
                registrySetupId = setupId;
12✔
280
            } catch (Exception e) {
×
281
                if (adminRepoConn.isActive()) {
×
282
                    try {
283
                        adminRepoConn.rollback();
×
284
                    } catch (Exception rollbackException) {
×
285
                        // Log the rollback failure but don't mask the original exception
286
                    }
×
287
                }
288
                throw new RuntimeException(e);
×
289
            }
3✔
290
        }
9✔
291
    }
3✔
292

293
    void updateState(State newState, long loadCounter) {
294
        synchronized (this) {
12✔
295
            // Update in-memory state first so getState() (called from the event loop) never blocks
296
            state = newState;
9✔
297
            lastCommittedCounter = loadCounter;
9✔
298
            try {
299
                // Serializable, as the service state needs to be strictly consistent
300
                adminRepoConn.begin(IsolationLevels.SERIALIZABLE);
12✔
301
                adminRepoConn.remove(NPA.THIS_REPO, NPA.HAS_STATUS, null, NPA.GRAPH);
36✔
302
                adminRepoConn.add(NPA.THIS_REPO, NPA.HAS_STATUS, stateAsLiteral(newState), NPA.GRAPH);
42✔
303
                adminRepoConn.remove(NPA.THIS_REPO, NPA.HAS_REGISTRY_LOAD_COUNTER, null, NPA.GRAPH);
36✔
304
                adminRepoConn.add(NPA.THIS_REPO, NPA.HAS_REGISTRY_LOAD_COUNTER, adminRepoConn.getValueFactory().createLiteral(loadCounter), NPA.GRAPH);
48✔
305
                adminRepoConn.commit();
9✔
306
            } catch (Exception e) {
×
307
                if (adminRepoConn.isActive()) {
×
308
                    try {
309
                        adminRepoConn.rollback();
×
310
                    } catch (Exception rollbackException) {
×
311
                        // Transaction may not be registered on server (e.g., already committed, timed out, or connection reset)
312
                        // Log the rollback failure but don't mask the original exception
313
                    }
×
314
                }
315
                throw new RuntimeException(e);
×
316
            }
3✔
317
        }
9✔
318
    }
3✔
319

320
    private Literal stateAsLiteral(State s) {
321
        return adminRepoConn.getValueFactory().createLiteral(s.toString());
21✔
322
    }
323

324
    /**
325
     * Reset the StatusController for testing purposes.
326
     * This will clear the state and allow re-initialization.
327
     */
328
    void resetForTest() {
329
        synchronized (this) {
12✔
330
            initialized = false;
9✔
331
            state = null;
9✔
332
            lastCommittedCounter = -1;
9✔
333
            registrySetupId = null;
9✔
334
            adminRepoConn = null;
9✔
335
        }
9✔
336
    }
3✔
337

338
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc