• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

knowledgepixels / nanopub-query / 17120526436

21 Aug 2025 07:44AM UTC coverage: 49.971% (-0.09%) from 50.058%
17120526436

push

github

tkuhn
Define timeouts via environment variables, and some refactoring

240 of 490 branches covered (48.98%)

Branch coverage included in aggregate %.

629 of 1249 relevant lines covered (50.36%)

2.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

25.15
src/main/java/com/knowledgepixels/query/JellyNanopubLoader.java
1
package com.knowledgepixels.query;
2

3
import java.io.IOException;
4
import java.util.concurrent.atomic.AtomicLong;
5

6
import org.apache.http.client.methods.CloseableHttpResponse;
7
import org.apache.http.client.methods.HttpGet;
8
import org.apache.http.client.methods.HttpHead;
9
import org.apache.http.impl.client.CloseableHttpClient;
10
import org.apache.http.impl.client.HttpClientBuilder;
11
import org.apache.http.util.EntityUtils;
12
import org.nanopub.NanopubUtils;
13
import org.nanopub.jelly.NanopubStream;
14

15
/**
16
 * Loads nanopubs from the attached Nanopub Registry via a restartable Jelly stream.
17
 */
18
public class JellyNanopubLoader {
×
19
    private static final String registryUrl;
20
    private static long lastCommittedCounter = -1;
2✔
21
    private static final CloseableHttpClient metadataClient;
22
    private static final CloseableHttpClient jellyStreamClient;
23

24
    private static final int MAX_RETRIES_METADATA = 10;
25
    private static final int RETRY_DELAY_METADATA = 3000;
26
    private static final int RETRY_DELAY_JELLY = 5000;
27

28
    /**
29
     * The interval in milliseconds at which the updates loader should poll for new nanopubs.
30
     */
31
    public static final int UPDATES_POLL_INTERVAL = 2000;
32

33
    enum LoadingType {
×
34
        INITIAL,
×
35
        UPDATE,
×
36
    }
37

38
    static {
39
        // Initialize registryUrl
40
        var url = Utils.getEnvString(
4✔
41
                "REGISTRY_FIXED_URL", "https://registry.knowledgepixels.com/"
42
        );
43
        if (!url.endsWith("/")) url += "/";
4!
44
        registryUrl = url;
2✔
45

46
        metadataClient = HttpClientBuilder.create().setDefaultRequestConfig(Utils.getHttpRequestConfig()).build();
5✔
47
        jellyStreamClient = NanopubUtils.getHttpClient();
2✔
48
    }
1✔
49

50
    /**
51
     * Start or continue (after restart) the initial loading procedure. This simply loads all
52
     * nanopubs from the attached Registry.
53
     *
54
     * @param afterCounter which counter to start from (-1 for the beginning)
55
     */
56
    public static void loadInitial(long afterCounter) {
57
        long targetCounter = fetchRegistryLoadCounter();
2✔
58
        System.err.println("Fetched Registry load counter: " + targetCounter);
4✔
59
        lastCommittedCounter = afterCounter;
2✔
60
        while (lastCommittedCounter < targetCounter) {
4!
61
            try {
62
                loadBatch(lastCommittedCounter, LoadingType.INITIAL);
×
63
                System.err.println("Initial load: loaded batch up to counter " + lastCommittedCounter);
×
64
            } catch (Exception e) {
×
65
                System.err.println("Failed to load batch starting from counter " + lastCommittedCounter);
×
66
                System.err.println(e.getMessage());
×
67
                try {
68
                    Thread.sleep(RETRY_DELAY_JELLY);
×
69
                } catch (InterruptedException e2) {
×
70
                    throw new RuntimeException("Interrupted while waiting to retry loading batch.");
×
71
                }
×
72
            }
×
73
        }
74
        System.err.println("Initial load complete.");
3✔
75
    }
1✔
76

77
    /**
78
     * Check if the Registry has any new nanopubs. If it does, load them.
79
     * This method should be called periodically, and you should wait for it to finish before
80
     * calling it again.
81
     */
82
    public static void loadUpdates() {
83
        try {
84
            final var status = StatusController.get().getState();
×
85
            lastCommittedCounter = status.loadCounter;
×
86
            StatusController.get().setLoadingUpdates(status.loadCounter);
×
87
            long targetCounter = fetchRegistryLoadCounter();
×
88
            if (lastCommittedCounter >= targetCounter) {
×
89
                // Keep quiet so as not to spam the log every second
90
                // System.err.println("No updates to load.");
91
                StatusController.get().setReady();
×
92
                return;
×
93
            }
94
            loadBatch(lastCommittedCounter, LoadingType.UPDATE);
×
95
            System.err.println("Loaded " + (lastCommittedCounter - status.loadCounter) +
×
96
                    " update(s). Counter: " + lastCommittedCounter + ", target was: " + targetCounter);
97
            if (lastCommittedCounter < targetCounter) {
×
98
                System.err.println("Warning: expected to load nanopubs up to (inclusive) counter " +
×
99
                        targetCounter + " based on the counter reported in Registry's headers, " +
100
                        "but loaded only up to " + lastCommittedCounter + ".");
101
            }
102
        } catch (Exception e) {
×
103
            System.err.println("Failed to load updates. Current counter: " + lastCommittedCounter);
×
104
            System.err.println(e.getMessage());
×
105
        } finally {
106
            try {
107
                StatusController.get().setReady();
×
108
            } catch (Exception e) {
×
109
                System.err.println("Update loader: failed to set status to READY.");
×
110
                System.err.println(e.getMessage());
×
111
            }
×
112
        }
113
    }
×
114

115
    /**
116
     * Load a batch of nanopubs from the Jelly stream.
117
     * <p>
118
     * The method requests the list of all nanopubs from the Registry and reads it for as long
119
     * as it can. If the stream is interrupted, the method will throw an exception, and you
120
     * can resume loading from the last known counter.
121
     *
122
     * @param afterCounter the last known nanopub counter to have been committed in the DB
123
     * @param type         the type of loading operation (initial or update)
124
     */
125
    static void loadBatch(long afterCounter, LoadingType type) {
126
        CloseableHttpResponse response;
127
        try {
128
            var request = new HttpGet(makeStreamFetchUrl(afterCounter));
×
129
            response = jellyStreamClient.execute(request);
×
130
        } catch (IOException e) {
×
131
            throw new RuntimeException("Failed to fetch Jelly stream from the Registry (I/O error).", e);
×
132
        }
×
133

134
        int httpStatus = response.getStatusLine().getStatusCode();
×
135
        if (httpStatus < 200 || httpStatus >= 300) {
×
136
            EntityUtils.consumeQuietly(response.getEntity());
×
137
            throw new RuntimeException("Jelly stream HTTP status is not 2xx: " + httpStatus + ".");
×
138
        }
139

140
        try (
141
                var is = response.getEntity().getContent();
×
142
                var npStream = NanopubStream.fromByteStream(is).getAsNanopubs()
×
143
        ) {
144
            AtomicLong checkpointTime = new AtomicLong(System.currentTimeMillis());
×
145
            AtomicLong checkpointCounter = new AtomicLong(lastCommittedCounter);
×
146
            AtomicLong lastSavedCounter = new AtomicLong(lastCommittedCounter);
×
147
            AtomicLong loaded = new AtomicLong(0L);
×
148

149
            npStream.forEach(m -> {
×
150
                if (!m.isSuccess()) throw new RuntimeException("Failed to load " +
×
151
                        "nanopub from Jelly stream. Last known counter: " + lastCommittedCounter,
152
                        m.getException()
×
153
                );
154
                if (m.getCounter() < lastCommittedCounter) {
×
155
                    throw new RuntimeException("Received a nanopub with a counter lower than " +
×
156
                            "the last known counter. Last known counter: " + lastCommittedCounter +
157
                            ", received counter: " + m.getCounter());
×
158
                }
159
                NanopubLoader.load(m.getNanopub(), m.getCounter());
×
160
                if (m.getCounter() % 10 == 0) {
×
161
                    // Save the committed counter only every 10 nanopubs to reduce DB load
162
                    saveCommittedCounter(type);
×
163
                    lastSavedCounter.set(m.getCounter());
×
164
                }
165
                lastCommittedCounter = m.getCounter();
×
166
                loaded.getAndIncrement();
×
167

168
                if (loaded.get() % 50 == 0) {
×
169
                    long currTime = System.currentTimeMillis();
×
170
                    double speed = 50 / ((currTime - checkpointTime.get()) / 1000.0);
×
171
                    System.err.println("Loading speed: " + String.format("%.2f", speed) +
×
172
                            " np/s. Counter: " + lastCommittedCounter);
173
                    checkpointTime.set(currTime);
×
174
                    checkpointCounter.set(lastCommittedCounter);
×
175
                }
176
            });
×
177
            // Make sure to save the last committed counter at the end of the batch
178
            if (lastCommittedCounter >= lastSavedCounter.get()) {
×
179
                saveCommittedCounter(type);
×
180
            }
181
        } catch (IOException e) {
×
182
            throw new RuntimeException("I/O error while reading the response Jelly stream.", e);
×
183
        } finally {
184
            try {
185
                response.close();
×
186
            } catch (IOException e) {
×
187
                System.err.println("Failed to close the Jelly stream response.");
×
188
            }
×
189
        }
190
    }
×
191

192
    /**
193
     * Save the last committed counter to the DB. Do this every N nanopubs to reduce DB load.
194
     * Remember to call this method at the end of the batch as well.
195
     *
196
     * @param type the type of loading operation (initial or update)
197
     */
198
    private static void saveCommittedCounter(LoadingType type) {
199
        try {
200
            if (type == LoadingType.INITIAL) {
×
201
                StatusController.get().setLoadingInitial(lastCommittedCounter);
×
202
            } else {
203
                StatusController.get().setLoadingUpdates(lastCommittedCounter);
×
204
            }
205
        } catch (Exception e) {
×
206
            throw new RuntimeException("Could not update the nanopub counter in DB", e);
×
207
        }
×
208
    }
×
209

210
    /**
211
     * Run a HEAD request to the Registry to fetch its current load counter.
212
     *
213
     * @return the current load counter
214
     */
215
    static long fetchRegistryLoadCounter() {
216
        int tries = 0;
2✔
217
        long counter = -1;
2✔
218
        while (counter == -1 && tries < MAX_RETRIES_METADATA) {
7!
219
            try {
220
                counter = fetchRegistryLoadCounterInner();
2✔
221
            } catch (Exception e) {
×
222
                tries++;
×
223
                System.err.println("Failed to fetch registry load counter, try " + tries +
×
224
                        ". Retrying in " + RETRY_DELAY_METADATA + "ms...");
225
                System.err.println(e.getMessage());
×
226
                try {
227
                    Thread.sleep(RETRY_DELAY_METADATA);
×
228
                } catch (InterruptedException e2) {
×
229
                    throw new RuntimeException(
×
230
                            "Interrupted while waiting to retry fetching registry load counter.");
231
                }
×
232
            }
1✔
233
        }
234
        if (counter == -1) {
4!
235
            throw new RuntimeException("Failed to fetch registry load counter after " +
×
236
                    MAX_RETRIES_METADATA + " retries.");
237
        }
238
        return counter;
2✔
239
    }
240

241
    /**
242
     * Inner logic for fetching the registry load counter.
243
     *
244
     * @return the current load counter
245
     * @throws IOException if the HTTP request fails
246
     */
247
    private static long fetchRegistryLoadCounterInner() throws IOException {
248
        var request = new HttpHead(registryUrl);
5✔
249
        try (var response = metadataClient.execute(request)) {
4✔
250
            int status = response.getStatusLine().getStatusCode();
4✔
251
            EntityUtils.consumeQuietly(response.getEntity());
3✔
252
            if (status < 200 || status >= 300) {
6!
253
                throw new RuntimeException("Registry load counter HTTP status is not 2xx: " +
×
254
                        status + ".");
255
            }
256

257
            // Check if the registry is ready
258
            var hStatus = response.getHeaders("Nanopub-Registry-Status");
4✔
259
            if (hStatus.length == 0) {
3!
260
                throw new RuntimeException("Registry did not return a Nanopub-Registry-Status header.");
×
261
            }
262
            if (!"ready".equals(hStatus[0].getValue()) && !"updating".equals(hStatus[0].getValue())) {
14!
263
                throw new RuntimeException("Registry is not in ready state.");
×
264
            }
265

266
            // Get the actual load counter
267
            var hCounter = response.getHeaders("Nanopub-Registry-Load-Counter");
4✔
268
            if (hCounter.length == 0) {
3!
269
                throw new RuntimeException("Registry did not return a Nanopub-Registry-Load-Counter header.");
×
270
            }
271
            return Long.parseLong(hCounter[0].getValue());
8✔
272
        }
273
    }
274

275
    /**
276
     * Construct the URL for fetching the Jelly stream.
277
     *
278
     * @param afterCounter the last known nanopub counter to have been committed in the DB
279
     * @return the URL for fetching the Jelly stream
280
     */
281
    private static String makeStreamFetchUrl(long afterCounter) {
282
        return registryUrl + "nanopubs.jelly?afterCounter=" + afterCounter;
×
283
    }
284
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc