• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

knowledgepixels / nanopub-query / 23622439422

26 Mar 2026 11:05PM UTC coverage: 67.082% (-2.4%) from 69.473%
23622439422

push

github

tkuhn
feat: detect and recover from Nanopub Registry resets

When the registry is reset and repopulated, its load counter restarts
from 0 while nanopub-query's stored counter remains high, causing it
to silently serve stale data. This adds detection via the registry's
setupId (Nanopub-Registry-Setup-Id header) and counter decrease, then
re-streams all nanopubs while skipping duplicates via existing per-repo
dedup in NanopubLoader.

- Read setupId from HEAD response alongside existing counter/status
- Persist setupId in admin repo, restore on startup
- Add RESETTING state to StatusController for counter reset
- Trigger resync on setupId change or counter decrease
- Re-stream with dedup preserves existing data, only loads new nanopubs

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

227 of 372 branches covered (61.02%)

Branch coverage included in aggregate %.

635 of 913 relevant lines covered (69.55%)

10.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.57
src/main/java/com/knowledgepixels/query/StatusController.java
1
package com.knowledgepixels.query;
2

3
import org.eclipse.rdf4j.common.transaction.IsolationLevels;
4
import org.eclipse.rdf4j.model.IRI;
5
import org.eclipse.rdf4j.model.Literal;
6
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
7
import org.eclipse.rdf4j.repository.RepositoryConnection;
8
import org.nanopub.vocabulary.NPA;
9

10
import java.util.Objects;
11

12
/**
13
 * Class to control the load status of the database.
14
 */
15
public class StatusController {
6✔
16

17
    /**
18
     * The load states in which the database can be.
19
     */
20
    public enum State {
9✔
21
        /**
22
         * The service is launching.
23
         */
24
        LAUNCHING,
18✔
25
        /**
26
         * The service is loading.
27
         */
28
        LOADING_INITIAL,
18✔
29
        /**
30
         * The service is loading updates.
31
         */
32
        LOADING_UPDATES,
18✔
33
        /**
34
         * The service is ready to serve requests.
35
         */
36
        READY,
18✔
37
        /**
38
         * The service detected a registry reset and is about to resync.
39
         */
40
        RESETTING,
18✔
41
    }
42

43
    /**
44
     * Get the singleton instance of the StatusController.
45
     *
46
     * @return the StatusController instance
47
     */
48
    public static StatusController get() {
49
        return instance;
6✔
50
    }
51

52
    private final static StatusController instance = new StatusController();
12✔
53

54
    static final IRI HAS_REGISTRY_SETUP_ID =
3✔
55
            SimpleValueFactory.getInstance().createIRI(NPA.NAMESPACE, "hasRegistrySetupId");
15✔
56

57
    private boolean initialized = false;
9✔
58
    private State state = null;
9✔
59
    private long lastCommittedCounter = -1;
9✔
60
    private Long registrySetupId = null;
12✔
61
    private RepositoryConnection adminRepoConn;
62

63
    /**
64
     * Represents the current status of the service, including the load counter.
65
     */
66
    public static class LoadingStatus {
67

68
        /**
69
         * The current state of the service.
70
         */
71
        public final State state;
72

73
        /**
74
         * The current load counter.
75
         */
76
        public final long loadCounter;
77

78
        private LoadingStatus(State state, long loadCounter) {
6✔
79
            this.state = state;
9✔
80
            this.loadCounter = loadCounter;
9✔
81
        }
3✔
82

83
        /**
84
         * Create a new LoadingStatus instance.
85
         *
86
         * @param state       the current state of the service
87
         * @param loadCounter the current load counter
88
         * @return a new LoadingStatus instance
89
         */
90
        public static LoadingStatus of(State state, long loadCounter) {
91
            return new LoadingStatus(state, loadCounter);
18✔
92
        }
93

94
        @Override
95
        public boolean equals(Object o) {
96
            if (o == null || getClass() != o.getClass()) {
21✔
97
                return false;
6✔
98
            }
99
            LoadingStatus that = (LoadingStatus) o;
9✔
100
            return loadCounter == that.loadCounter && state == that.state;
45✔
101
        }
102

103
        @Override
104
        public int hashCode() {
105
            return Objects.hash(state, loadCounter);
45✔
106
        }
107

108
    }
109

110
    /**
111
     * Initialize the StatusController, fetching the last known state from the DB.
112
     * This must be called right after service startup, before loading any nanopubs.
113
     *
114
     * @return the current state and the last committed counter
115
     */
116
    public LoadingStatus initialize() {
117
        synchronized (this) {
12✔
118
            if (initialized) {
9✔
119
                throw new IllegalStateException("Already initialized");
15✔
120
            }
121
            state = State.LAUNCHING;
9✔
122
            adminRepoConn = TripleStore.get().getAdminRepoConnection();
12✔
123
            // Serializable, as the service state needs to be strictly consistent
124
            adminRepoConn.begin(IsolationLevels.SERIALIZABLE);
12✔
125
            // Fetch the state from the DB
126
            try (var statements = adminRepoConn.getStatements(NPA.THIS_REPO, NPA.HAS_STATUS, null, NPA.GRAPH)) {
39✔
127
                if (!statements.hasNext()) {
9!
128
                    adminRepoConn.add(NPA.THIS_REPO, NPA.HAS_STATUS, stateAsLiteral(state), NPA.GRAPH);
48✔
129
                } else {
130
                    var stateStatement = statements.next();
×
131
                    state = State.valueOf(stateStatement.getObject().stringValue());
×
132
                }
133
            }
134
            // Fetch the load counter from the DB
135
            try (var statements = adminRepoConn.getStatements(NPA.THIS_REPO, NPA.HAS_REGISTRY_LOAD_COUNTER, null, NPA.GRAPH)) {
39✔
136
                if (!statements.hasNext()) {
9!
137
                    adminRepoConn.add(NPA.THIS_REPO, NPA.HAS_REGISTRY_LOAD_COUNTER, adminRepoConn.getValueFactory().createLiteral(-1L), NPA.GRAPH);
51✔
138
                } else {
139
                    var counterStatement = statements.next();
×
140
                    var stringVal = counterStatement.getObject().stringValue();
×
141
                    lastCommittedCounter = Long.parseLong(stringVal);
×
142
                }
143
            }
144
            // Fetch the registry setup ID from the DB
145
            try (var statements = adminRepoConn.getStatements(NPA.THIS_REPO, HAS_REGISTRY_SETUP_ID, null, NPA.GRAPH)) {
39✔
146
                if (statements.hasNext()) {
9!
147
                    var setupIdStatement = statements.next();
×
148
                    registrySetupId = Long.parseLong(setupIdStatement.getObject().stringValue());
×
149
                }
150
                adminRepoConn.commit();
9✔
151
            } catch (Exception e) {
×
152
                if (adminRepoConn.isActive()) {
×
153
                    try {
154
                        adminRepoConn.rollback();
×
155
                    } catch (Exception rollbackException) {
×
156
                        // Transaction may not be registered on server (e.g., already committed, timed out, or connection reset)
157
                        // Log the rollback failure but don't mask the original exception
158
                    }
×
159
                }
160
                throw new RuntimeException(e);
×
161
            }
3✔
162
            initialized = true;
9✔
163
            return getState();
15✔
164
        }
165
    }
166

167
    /**
168
     * Get the current state of the service.
169
     *
170
     * @return the current state and the last committed counter
171
     */
172
    public LoadingStatus getState() {
173
        synchronized (this) {
12✔
174
            return LoadingStatus.of(state, lastCommittedCounter);
24✔
175
        }
176
    }
177

178
    /**
179
     * Transition the service to the LOADING_INITIAL state and update the load counter.
180
     * This should be called in two situations:
181
     * - By the main loading thread (after calling initialize()) to start loading the initial nanopubs.
182
     * - By the initial nanopub loader, as it processes the initial nanopubs.
183
     *
184
     * @param loadCounter the new load counter
185
     */
186
    public void setLoadingInitial(long loadCounter) {
187
        synchronized (this) {
12✔
188
            if (state != State.LAUNCHING && state != State.LOADING_INITIAL && state != State.RESETTING) {
36✔
189
                throw new IllegalStateException("Cannot transition to LOADING_INITIAL, as the " + "current state is " + state);
24✔
190
            }
191
            if (state != State.RESETTING && lastCommittedCounter > loadCounter) {
27✔
192
                throw new IllegalStateException("Cannot update the load counter from " + lastCommittedCounter + " to " + loadCounter);
24✔
193
            }
194
            updateState(State.LOADING_INITIAL, loadCounter);
12✔
195
        }
9✔
196
    }
3✔
197

198
    /**
199
     * Transition the service to the LOADING_UPDATES state and update the load counter.
200
     * This should be called by the updates loader, when it starts processing new nanopubs, or
201
     * when it finishes processing a batch of nanopubs.
202
     *
203
     * @param loadCounter the new load counter
204
     */
205
    public void setLoadingUpdates(long loadCounter) {
206
        synchronized (this) {
12✔
207
            if (state != State.LAUNCHING && state != State.LOADING_UPDATES && state != State.READY) {
36✔
208
                throw new IllegalStateException("Cannot transition to LOADING_UPDATES, as the " + "current state is " + state);
24✔
209
            }
210
            if (lastCommittedCounter > loadCounter) {
15✔
211
                throw new IllegalStateException("Cannot update the load counter from " + lastCommittedCounter + " to " + loadCounter);
24✔
212
            }
213
            updateState(State.LOADING_UPDATES, loadCounter);
12✔
214
        }
9✔
215
    }
3✔
216

217
    /**
218
     * Transition the service to the READY state.
219
     * This should be called by the loaders, after they finish their work.
220
     */
221
    public void setReady() {
222
        synchronized (this) {
12✔
223
            if (state != State.READY) {
12✔
224
                updateState(State.READY, lastCommittedCounter);
15✔
225
            }
226
        }
9✔
227
    }
3✔
228

229
    /**
230
     * Transition the service to the RESETTING state, resetting the load counter to -1.
231
     * This is triggered when a registry reset is detected (setupId changed or counter decreased).
232
     */
233
    public void setResetting() {
234
        synchronized (this) {
12✔
235
            if (state != State.READY && state != State.LOADING_UPDATES) {
24✔
236
                throw new IllegalStateException("Cannot transition to RESETTING, as the " + "current state is " + state);
24✔
237
            }
238
            updateState(State.RESETTING, -1);
12✔
239
        }
9✔
240
    }
3✔
241

242
    /**
243
     * Get the persisted registry setup ID.
244
     *
245
     * @return the registry setup ID, or null if not yet known
246
     */
247
    public Long getRegistrySetupId() {
248
        synchronized (this) {
12✔
249
            return registrySetupId;
15✔
250
        }
251
    }
252

253
    /**
254
     * Persist a new registry setup ID to the admin repo.
255
     *
256
     * @param setupId the new setup ID
257
     */
258
    public void setRegistrySetupId(long setupId) {
259
        synchronized (this) {
12✔
260
            try {
261
                adminRepoConn.begin(IsolationLevels.SERIALIZABLE);
12✔
262
                adminRepoConn.remove(NPA.THIS_REPO, HAS_REGISTRY_SETUP_ID, null, NPA.GRAPH);
36✔
263
                adminRepoConn.add(NPA.THIS_REPO, HAS_REGISTRY_SETUP_ID, adminRepoConn.getValueFactory().createLiteral(setupId), NPA.GRAPH);
48✔
264
                adminRepoConn.commit();
9✔
265
                registrySetupId = setupId;
12✔
266
            } catch (Exception e) {
×
267
                if (adminRepoConn.isActive()) {
×
268
                    try {
269
                        adminRepoConn.rollback();
×
270
                    } catch (Exception rollbackException) {
×
271
                        // Log the rollback failure but don't mask the original exception
272
                    }
×
273
                }
274
                throw new RuntimeException(e);
×
275
            }
3✔
276
        }
9✔
277
    }
3✔
278

279
    void updateState(State newState, long loadCounter) {
280
        synchronized (this) {
12✔
281
            try {
282
                // Serializable, as the service state needs to be strictly consistent
283
                adminRepoConn.begin(IsolationLevels.SERIALIZABLE);
12✔
284
                adminRepoConn.remove(NPA.THIS_REPO, NPA.HAS_STATUS, null, NPA.GRAPH);
36✔
285
                adminRepoConn.add(NPA.THIS_REPO, NPA.HAS_STATUS, stateAsLiteral(newState), NPA.GRAPH);
42✔
286
                adminRepoConn.remove(NPA.THIS_REPO, NPA.HAS_REGISTRY_LOAD_COUNTER, null, NPA.GRAPH);
36✔
287
                adminRepoConn.add(NPA.THIS_REPO, NPA.HAS_REGISTRY_LOAD_COUNTER, adminRepoConn.getValueFactory().createLiteral(loadCounter), NPA.GRAPH);
48✔
288
                adminRepoConn.commit();
9✔
289
                state = newState;
9✔
290
                lastCommittedCounter = loadCounter;
9✔
291
            } catch (Exception e) {
×
292
                if (adminRepoConn.isActive()) {
×
293
                    try {
294
                        adminRepoConn.rollback();
×
295
                    } catch (Exception rollbackException) {
×
296
                        // Transaction may not be registered on server (e.g., already committed, timed out, or connection reset)
297
                        // Log the rollback failure but don't mask the original exception
298
                    }
×
299
                }
300
                throw new RuntimeException(e);
×
301
            }
3✔
302
        }
9✔
303
    }
3✔
304

305
    private Literal stateAsLiteral(State s) {
306
        return adminRepoConn.getValueFactory().createLiteral(s.toString());
21✔
307
    }
308

309
    /**
310
     * Reset the StatusController for testing purposes.
311
     * This will clear the state and allow re-initialization.
312
     */
313
    void resetForTest() {
314
        synchronized (this) {
12✔
315
            initialized = false;
9✔
316
            state = null;
9✔
317
            lastCommittedCounter = -1;
9✔
318
            registrySetupId = null;
9✔
319
            adminRepoConn = null;
9✔
320
        }
9✔
321
    }
3✔
322

323
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc