• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

knowledgepixels / nanopub-query / 20776539696

07 Jan 2026 09:18AM UTC coverage: 71.251% (-0.7%) from 71.961%
20776539696

push

github

tkuhn
chore: Add nohub.out to .gitignore

214 of 326 branches covered (65.64%)

Branch coverage included in aggregate %.

589 of 801 relevant lines covered (73.53%)

11.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

24.57
src/main/java/com/knowledgepixels/query/JellyNanopubLoader.java
1
package com.knowledgepixels.query;
2

3
import java.io.IOException;
4
import java.util.concurrent.atomic.AtomicLong;
5

6
import org.apache.http.client.methods.CloseableHttpResponse;
7
import org.apache.http.client.methods.HttpGet;
8
import org.apache.http.client.methods.HttpHead;
9
import org.apache.http.impl.client.CloseableHttpClient;
10
import org.apache.http.impl.client.HttpClientBuilder;
11
import org.apache.http.util.EntityUtils;
12
import org.nanopub.NanopubUtils;
13
import org.nanopub.jelly.NanopubStream;
14
import org.slf4j.Logger;
15
import org.slf4j.LoggerFactory;
16

17
/**
18
 * Loads nanopubs from the attached Nanopub Registry via a restartable Jelly stream.
19
 */
20
public class JellyNanopubLoader {
×
21
    private static final String registryUrl;
22
    private static long lastCommittedCounter = -1;
6✔
23
    private static final CloseableHttpClient metadataClient;
24
    private static final CloseableHttpClient jellyStreamClient;
25

26
    private static final int MAX_RETRIES_METADATA = 10;
27
    private static final int RETRY_DELAY_METADATA = 3000;
28
    private static final int RETRY_DELAY_JELLY = 5000;
29

30
    private static final Logger log = LoggerFactory.getLogger(JellyNanopubLoader.class);
9✔
31

32
    /**
33
     * The interval in milliseconds at which the updates loader should poll for new nanopubs.
34
     */
35
    public static final int UPDATES_POLL_INTERVAL = 2000;
36

37
    enum LoadingType {
×
38
        INITIAL,
×
39
        UPDATE,
×
40
    }
41

42
    static {
43
        // Initialize registryUrl
44
        var url = Utils.getEnvString(
12✔
45
                "REGISTRY_FIXED_URL", "https://registry.knowledgepixels.com/"
46
        );
47
        if (!url.endsWith("/")) url += "/";
12!
48
        registryUrl = url;
6✔
49

50
        metadataClient = HttpClientBuilder.create().setDefaultRequestConfig(Utils.getHttpRequestConfig()).build();
15✔
51
        jellyStreamClient = NanopubUtils.getHttpClient();
6✔
52
    }
3✔
53

54
    /**
55
     * Start or continue (after restart) the initial loading procedure. This simply loads all
56
     * nanopubs from the attached Registry.
57
     *
58
     * @param afterCounter which counter to start from (-1 for the beginning)
59
     */
60
    public static void loadInitial(long afterCounter) {
61
        long targetCounter = fetchRegistryLoadCounter();
6✔
62
        log.info("Fetched Registry load counter: {}", targetCounter);
15✔
63
        lastCommittedCounter = afterCounter;
6✔
64
        while (lastCommittedCounter < targetCounter) {
12!
65
            try {
66
                loadBatch(lastCommittedCounter, LoadingType.INITIAL);
×
67
                log.info("Initial load: loaded batch up to counter {}", lastCommittedCounter);
×
68
            } catch (Exception e) {
×
69
                log.info("Failed to load batch starting from counter {}", lastCommittedCounter);
×
70
                log.info("Failure reason: ", e);
×
71
                try {
72
                    Thread.sleep(RETRY_DELAY_JELLY);
×
73
                } catch (InterruptedException e2) {
×
74
                    throw new RuntimeException("Interrupted while waiting to retry loading batch.");
×
75
                }
×
76
            }
×
77
        }
78
        log.info("Initial load complete.");
9✔
79
    }
3✔
80

81
    /**
82
     * Check if the Registry has any new nanopubs. If it does, load them.
83
     * This method should be called periodically, and you should wait for it to finish before
84
     * calling it again.
85
     */
86
    public static void loadUpdates() {
87
        try {
88
            final var status = StatusController.get().getState();
×
89
            lastCommittedCounter = status.loadCounter;
×
90
            StatusController.get().setLoadingUpdates(status.loadCounter);
×
91
            long targetCounter = fetchRegistryLoadCounter();
×
92
            if (lastCommittedCounter >= targetCounter) {
×
93
                // Keep quiet so as not to spam the log every second
94
                // log.info("No updates to load.");
95
                StatusController.get().setReady();
×
96
                return;
×
97
            }
98
            loadBatch(lastCommittedCounter, LoadingType.UPDATE);
×
99
            log.info("Loaded {} update(s). Counter: {}, target was: {}",
×
100
                    lastCommittedCounter - status.loadCounter, lastCommittedCounter, targetCounter);
×
101
            if (lastCommittedCounter < targetCounter) {
×
102
                log.info("Warning: expected to load nanopubs up to (inclusive) counter " +
×
103
                        targetCounter + " based on the counter reported in Registry's headers, " +
104
                        "but loaded only up to {}.", lastCommittedCounter);
×
105
            }
106
        } catch (Exception e) {
×
107
            log.info("Failed to load updates. Current counter: {}", lastCommittedCounter);
×
108
            log.info("Failure Reason: ", e);
×
109
        } finally {
110
            try {
111
                StatusController.get().setReady();
×
112
            } catch (Exception e) {
×
113
                log.info("Update loader: failed to set status to READY.");
×
114
                log.info("Failure Reason: ", e);
×
115
            }
×
116
        }
117
    }
×
118

119
    /**
120
     * Load a batch of nanopubs from the Jelly stream.
121
     * <p>
122
     * The method requests the list of all nanopubs from the Registry and reads it for as long
123
     * as it can. If the stream is interrupted, the method will throw an exception, and you
124
     * can resume loading from the last known counter.
125
     *
126
     * @param afterCounter the last known nanopub counter to have been committed in the DB
127
     * @param type         the type of loading operation (initial or update)
128
     */
129
    static void loadBatch(long afterCounter, LoadingType type) {
130
        CloseableHttpResponse response;
131
        try {
132
            var request = new HttpGet(makeStreamFetchUrl(afterCounter));
×
133
            response = jellyStreamClient.execute(request);
×
134
        } catch (IOException e) {
×
135
            throw new RuntimeException("Failed to fetch Jelly stream from the Registry (I/O error).", e);
×
136
        }
×
137

138
        int httpStatus = response.getStatusLine().getStatusCode();
×
139
        if (httpStatus < 200 || httpStatus >= 300) {
×
140
            EntityUtils.consumeQuietly(response.getEntity());
×
141
            throw new RuntimeException("Jelly stream HTTP status is not 2xx: " + httpStatus + ".");
×
142
        }
143

144
        try (
145
                var is = response.getEntity().getContent();
×
146
                var npStream = NanopubStream.fromByteStream(is).getAsNanopubs()
×
147
        ) {
148
            AtomicLong checkpointTime = new AtomicLong(System.currentTimeMillis());
×
149
            AtomicLong checkpointCounter = new AtomicLong(lastCommittedCounter);
×
150
            AtomicLong lastSavedCounter = new AtomicLong(lastCommittedCounter);
×
151
            AtomicLong loaded = new AtomicLong(0L);
×
152

153
            npStream.forEach(m -> {
×
154
                if (!m.isSuccess()) throw new RuntimeException("Failed to load " +
×
155
                        "nanopub from Jelly stream. Last known counter: " + lastCommittedCounter,
156
                        m.getException()
×
157
                );
158
                if (m.getCounter() < lastCommittedCounter) {
×
159
                    throw new RuntimeException("Received a nanopub with a counter lower than " +
×
160
                            "the last known counter. Last known counter: " + lastCommittedCounter +
161
                            ", received counter: " + m.getCounter());
×
162
                }
163
                NanopubLoader.load(m.getNanopub(), m.getCounter());
×
164
                if (m.getCounter() % 10 == 0) {
×
165
                    // Save the committed counter only every 10 nanopubs to reduce DB load
166
                    saveCommittedCounter(type);
×
167
                    lastSavedCounter.set(m.getCounter());
×
168
                }
169
                lastCommittedCounter = m.getCounter();
×
170
                loaded.getAndIncrement();
×
171

172
                if (loaded.get() % 50 == 0) {
×
173
                    long currTime = System.currentTimeMillis();
×
174
                    double speed = 50 / ((currTime - checkpointTime.get()) / 1000.0);
×
175
                    log.info("Loading speed: " + String.format("%.2f", speed) +
×
176
                            " np/s. Counter: " + lastCommittedCounter);
177
                    checkpointTime.set(currTime);
×
178
                    checkpointCounter.set(lastCommittedCounter);
×
179
                }
180
            });
×
181
            // Make sure to save the last committed counter at the end of the batch
182
            if (lastCommittedCounter >= lastSavedCounter.get()) {
×
183
                saveCommittedCounter(type);
×
184
            }
185
        } catch (IOException e) {
×
186
            throw new RuntimeException("I/O error while reading the response Jelly stream.", e);
×
187
        } finally {
188
            try {
189
                response.close();
×
190
            } catch (IOException e) {
×
191
                log.info("Failed to close the Jelly stream response.");
×
192
            }
×
193
        }
194
    }
×
195

196
    /**
197
     * Save the last committed counter to the DB. Do this every N nanopubs to reduce DB load.
198
     * Remember to call this method at the end of the batch as well.
199
     *
200
     * @param type the type of loading operation (initial or update)
201
     */
202
    private static void saveCommittedCounter(LoadingType type) {
203
        try {
204
            if (type == LoadingType.INITIAL) {
×
205
                StatusController.get().setLoadingInitial(lastCommittedCounter);
×
206
            } else {
207
                StatusController.get().setLoadingUpdates(lastCommittedCounter);
×
208
            }
209
        } catch (Exception e) {
×
210
            throw new RuntimeException("Could not update the nanopub counter in DB", e);
×
211
        }
×
212
    }
×
213

214
    /**
215
     * Run a HEAD request to the Registry to fetch its current load counter.
216
     *
217
     * @return the current load counter
218
     */
219
    static long fetchRegistryLoadCounter() {
220
        int tries = 0;
6✔
221
        long counter = -1;
6✔
222
        while (counter == -1 && tries < MAX_RETRIES_METADATA) {
21!
223
            try {
224
                counter = fetchRegistryLoadCounterInner();
6✔
225
            } catch (Exception e) {
×
226
                tries++;
×
227
                log.info("Failed to fetch registry load counter, try " + tries +
×
228
                        ". Retrying in {}ms...", RETRY_DELAY_METADATA);
×
229
                log.info("Failure Reason: ", e);
×
230
                try {
231
                    Thread.sleep(RETRY_DELAY_METADATA);
×
232
                } catch (InterruptedException e2) {
×
233
                    throw new RuntimeException(
×
234
                            "Interrupted while waiting to retry fetching registry load counter.");
235
                }
×
236
            }
3✔
237
        }
238
        if (counter == -1) {
12!
239
            throw new RuntimeException("Failed to fetch registry load counter after " +
×
240
                    MAX_RETRIES_METADATA + " retries.");
241
        }
242
        return counter;
6✔
243
    }
244

245
    /**
246
     * Inner logic for fetching the registry load counter.
247
     *
248
     * @return the current load counter
249
     * @throws IOException if the HTTP request fails
250
     */
251
    private static long fetchRegistryLoadCounterInner() throws IOException {
252
        var request = new HttpHead(registryUrl);
15✔
253
        try (var response = metadataClient.execute(request)) {
12✔
254
            int status = response.getStatusLine().getStatusCode();
12✔
255
            EntityUtils.consumeQuietly(response.getEntity());
9✔
256
            if (status < 200 || status >= 300) {
18!
257
                throw new RuntimeException("Registry load counter HTTP status is not 2xx: " +
×
258
                        status + ".");
259
            }
260

261
            // Check if the registry is ready
262
            var hStatus = response.getHeaders("Nanopub-Registry-Status");
12✔
263
            if (hStatus.length == 0) {
9!
264
                throw new RuntimeException("Registry did not return a Nanopub-Registry-Status header.");
×
265
            }
266
            if (!"ready".equals(hStatus[0].getValue()) && !"updating".equals(hStatus[0].getValue())) {
21!
267
                throw new RuntimeException("Registry is not in ready state.");
×
268
            }
269

270
            // Get the actual load counter
271
            var hCounter = response.getHeaders("Nanopub-Registry-Load-Counter");
12✔
272
            if (hCounter.length == 0) {
9!
273
                throw new RuntimeException("Registry did not return a Nanopub-Registry-Load-Counter header.");
×
274
            }
275
            return Long.parseLong(hCounter[0].getValue());
24✔
276
        }
277
    }
278

279
    /**
280
     * Construct the URL for fetching the Jelly stream.
281
     *
282
     * @param afterCounter the last known nanopub counter to have been committed in the DB
283
     * @return the URL for fetching the Jelly stream
284
     */
285
    private static String makeStreamFetchUrl(long afterCounter) {
286
        return registryUrl + "nanopubs.jelly?afterCounter=" + afterCounter;
×
287
    }
288
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc