• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

knowledgepixels / nanopub-registry / 23548564094

25 Mar 2026 03:15PM UTC coverage: 31.259% (-0.4%) from 31.704%
23548564094

Pull #90

github

web-flow
Merge 566d7831d into 1a3640ad0
Pull Request #90: perf: parallel stream loading, batched task scheduling, skip idle peer discovery

206 of 740 branches covered (27.84%)

Branch coverage included in aggregate %.

693 of 2136 relevant lines covered (32.44%)

5.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

8.68
src/main/java/com/knowledgepixels/registry/NanopubLoader.java
1
package com.knowledgepixels.registry;
2

3
import com.mongodb.ErrorCategory;
4
import com.mongodb.MongoWriteException;
5
import com.mongodb.client.ClientSession;
6
import com.mongodb.client.MongoCursor;
7
import net.trustyuri.TrustyUriUtils;
8
import net.trustyuri.rdf.RdfModule;
9
import org.apache.http.Header;
10
import org.apache.http.HttpResponse;
11
import org.apache.http.client.HttpClient;
12
import org.apache.http.client.methods.CloseableHttpResponse;
13
import org.apache.http.client.methods.HttpGet;
14
import org.apache.http.util.EntityUtils;
15
import org.bson.Document;
16
import org.bson.types.Binary;
17
import org.eclipse.rdf4j.common.exception.RDF4JException;
18
import org.eclipse.rdf4j.rio.RDFFormat;
19
import org.nanopub.MalformedNanopubException;
20
import org.nanopub.Nanopub;
21
import org.nanopub.NanopubImpl;
22
import org.nanopub.NanopubUtils;
23
import org.nanopub.extra.server.GetNanopub;
24
import org.nanopub.jelly.JellyUtils;
25
import org.nanopub.jelly.MaybeNanopub;
26
import org.nanopub.jelly.NanopubStream;
27
import org.nanopub.trusty.TrustyNanopubUtils;
28
import org.nanopub.vocabulary.NPX;
29
import org.slf4j.Logger;
30
import org.slf4j.LoggerFactory;
31

32
import java.io.IOException;
33
import java.io.InputStream;
34
import java.util.ArrayList;
35
import java.util.Collections;
36
import java.util.List;
37
import java.util.concurrent.ExecutorService;
38
import java.util.concurrent.Executors;
39
import java.util.concurrent.Semaphore;
40
import java.util.concurrent.TimeUnit;
41
import java.util.concurrent.atomic.AtomicReference;
42
import java.util.function.Consumer;
43
import java.util.stream.Stream;
44

45
import static com.knowledgepixels.registry.RegistryDB.has;
46
import static com.knowledgepixels.registry.RegistryDB.insert;
47

48
public class NanopubLoader {
49

50
    private NanopubLoader() {
51
    }
52

53
    public final static String INTRO_TYPE = NPX.DECLARED_BY.stringValue();
9✔
54
    public final static String INTRO_TYPE_HASH = Utils.getHash(INTRO_TYPE);
9✔
55
    public final static String ENDORSE_TYPE = Utils.APPROVES_OF.stringValue();
9✔
56
    public final static String ENDORSE_TYPE_HASH = Utils.getHash(ENDORSE_TYPE);
9✔
57
    private static final Logger log = LoggerFactory.getLogger(NanopubLoader.class);
9✔
58

59
    // TODO Distinguish and support these cases:
60
    //      1. Simple load: load to all core lists if pubkey is "core-loaded", or load to all lists if pubkey is "full-loaded"
61
    //      2. Core load: load to all core lists (initialize if needed), or load to all lists if pubkey is "full-loaded"
62
    //      3. Full load: load to all lists (initialize if needed)
63

64
    public static void simpleLoad(ClientSession mongoSession, String nanopubId) {
65
        simpleLoad(mongoSession, retrieveNanopub(mongoSession, nanopubId));
×
66
    }
×
67

68
    public static void simpleLoad(ClientSession mongoSession, Nanopub np) {
69
        String pubkey = RegistryDB.getPubkey(np);
×
70
        if (pubkey == null) {
×
71
            log.info("Ignore (not signed): {}", np.getUri());
×
72
            return;
×
73
        }
74
        simpleLoad(mongoSession, np, pubkey);
×
75
    }
×
76

77
    /**
78
     * Loads a nanopub to the appropriate lists, using a pre-verified public key
79
     * to skip redundant signature verification.
80
     */
81
    public static void simpleLoad(ClientSession mongoSession, Nanopub np, String verifiedPubkey) {
82
        String pubkeyHash = Utils.getHash(verifiedPubkey);
9✔
83
        // TODO Do we need to load anything else here, into the other DB collections?
84
        if (has(mongoSession, "lists", new Document("pubkey", pubkeyHash).append("type", "$").append("status", "loaded"))) {
45!
85
            RegistryDB.loadNanopubVerified(mongoSession, np, verifiedPubkey, pubkeyHash, "$");
×
86
        } else if (has(mongoSession, "lists", new Document("pubkey", pubkeyHash).append("type", INTRO_TYPE_HASH).append("status", "loaded"))) {
45!
87
            RegistryDB.loadNanopubVerified(mongoSession, np, verifiedPubkey, pubkeyHash, INTRO_TYPE, ENDORSE_TYPE);
×
88
        } else if (!has(mongoSession, "lists", new Document("pubkey", pubkeyHash).append("type", INTRO_TYPE_HASH))) {
36!
89
            // Unknown pubkey: create encountered intro list so RUN_OPTIONAL_LOAD picks it up
90
            try {
91
                insert(mongoSession, "lists", new Document("pubkey", pubkeyHash)
30✔
92
                        .append("type", INTRO_TYPE_HASH)
9✔
93
                        .append("status", EntryStatus.encountered.getValue()));
6✔
94
            } catch (MongoWriteException e) {
×
95
                if (e.getError().getCategory() != ErrorCategory.DUPLICATE_KEY) throw e;
×
96
            }
3✔
97
        }
98
    }
3✔
99

100
    private static final int LOAD_PARALLELISM = Integer.parseInt(
12✔
101
            Utils.getEnv("REGISTRY_LOAD_PARALLELISM", String.valueOf(Runtime.getRuntime().availableProcessors())));
12✔
102

103
    /**
104
     * Processes a stream of nanopubs in parallel using a thread pool.
105
     * Each worker thread uses its own MongoDB ClientSession.
106
     * Backpressure is applied via a semaphore to avoid unbounded memory growth.
107
     *
108
     * @param stream    the nanopub stream to process
109
     * @param processor consumer that processes each nanopub (called with its own ClientSession)
110
     */
111
    public static void loadStreamInParallel(Stream<MaybeNanopub> stream, Consumer<Nanopub> processor) {
112
        if (LOAD_PARALLELISM <= 1) {
×
113
            // Fall back to sequential processing
114
            stream.forEach(m -> {
×
115
                if (!m.isSuccess()) throw new AbortingTaskException("Failed to download nanopub; aborting task...");
×
116
                processor.accept(m.getNanopub());
×
117
            });
×
118
            return;
×
119
        }
120

121
        ExecutorService executor = Executors.newFixedThreadPool(LOAD_PARALLELISM);
×
122
        Semaphore semaphore = new Semaphore(LOAD_PARALLELISM * 2);
×
123
        AtomicReference<Exception> error = new AtomicReference<>();
×
124

125
        try {
126
            stream.forEach(m -> {
×
127
                if (error.get() != null) return;
×
128
                if (!m.isSuccess()) {
×
129
                    error.compareAndSet(null, new AbortingTaskException("Failed to download nanopub; aborting task..."));
×
130
                    return;
×
131
                }
132
                Nanopub np = m.getNanopub();
×
133
                try {
134
                    semaphore.acquire();
×
135
                } catch (InterruptedException e) {
×
136
                    Thread.currentThread().interrupt();
×
137
                    error.compareAndSet(null, e);
×
138
                    return;
×
139
                }
×
140
                executor.submit(() -> {
×
141
                    try {
142
                        processor.accept(np);
×
143
                    } catch (Exception e) {
×
144
                        error.compareAndSet(null, e);
×
145
                    } finally {
146
                        semaphore.release();
×
147
                    }
148
                });
×
149
            });
×
150
        } finally {
151
            executor.shutdown();
×
152
            try {
153
                executor.awaitTermination(1, TimeUnit.HOURS);
×
154
            } catch (InterruptedException e) {
×
155
                Thread.currentThread().interrupt();
×
156
            }
×
157
        }
158

159
        if (error.get() != null) {
×
160
            if (error.get() instanceof RuntimeException re) throw re;
×
161
            throw new RuntimeException("Parallel loading failed", error.get());
×
162
        }
163
    }
×
164

165
    /**
166
     * Retrieve Nanopubs from the peers of this Nanopub Registry.
167
     *
168
     * @param typeHash   The hash of the type of the Nanopub to retrieve.
169
     * @param pubkeyHash The hash of the pubkey of the Nanopub to retrieve.
170
     * @return A stream of MaybeNanopub objects, or an empty stream if no peer is available.
171
     */
172
    public static Stream<MaybeNanopub> retrieveNanopubsFromPeers(String typeHash, String pubkeyHash) {
173
        return retrieveNanopubsFromPeers(typeHash, pubkeyHash, null);
×
174
    }
175

176
    /**
177
     * Retrieve Nanopubs from the peers, optionally skipping ahead using checksums.
178
     *
179
     * @param typeHash        The hash of the type of the Nanopub to retrieve.
180
     * @param pubkeyHash      The hash of the pubkey of the Nanopub to retrieve.
181
     * @param afterChecksums  Comma-separated checksums for skip-ahead (geometric fallback), or null for full fetch.
182
     * @return A stream of MaybeNanopub objects, or an empty stream if no peer is available.
183
     */
184
    public static Stream<MaybeNanopub> retrieveNanopubsFromPeers(String typeHash, String pubkeyHash, String afterChecksums) {
185
        // TODO Move the code of this method to nanopub-java library.
186

187
        List<String> peerUrlsToTry = new ArrayList<>(Utils.getPeerUrls());
×
188
        Collections.shuffle(peerUrlsToTry);
×
189
        while (!peerUrlsToTry.isEmpty()) {
×
190
            String peerUrl = peerUrlsToTry.removeFirst();
×
191

192
            String requestUrl = peerUrl + "list/" + pubkeyHash + "/" + typeHash + ".jelly";
×
193
            if (afterChecksums != null) {
×
194
                requestUrl += "?afterChecksums=" + afterChecksums;
×
195
            }
196
            log.info("Request: {}", requestUrl);
×
197
            try {
198
                CloseableHttpResponse resp = NanopubUtils.getHttpClient().execute(new HttpGet(requestUrl));
×
199
                int httpStatus = resp.getStatusLine().getStatusCode();
×
200
                if (httpStatus < 200 || httpStatus >= 300) {
×
201
                    log.info("Request failed: {} {}", peerUrl, httpStatus);
×
202
                    EntityUtils.consumeQuietly(resp.getEntity());
×
203
                    continue;
×
204
                }
205
                Header nrStatus = resp.getFirstHeader("Nanopub-Registry-Status");
×
206
                if (nrStatus == null) {
×
207
                    log.info("Nanopub-Registry-Status header not found at: {}", peerUrl);
×
208
                    EntityUtils.consumeQuietly(resp.getEntity());
×
209
                    continue;
×
210
                } else if (!nrStatus.getValue().equals("ready") && !nrStatus.getValue().equals("updating")) {
×
211
                    log.info("Peer in non-ready state: {} {}", peerUrl, nrStatus.getValue());
×
212
                    EntityUtils.consumeQuietly(resp.getEntity());
×
213
                    continue;
×
214
                }
215
                InputStream is = resp.getEntity().getContent();
×
216
                return NanopubStream.fromByteStream(is).getAsNanopubs().onClose(() -> {
×
217
                    try {
218
                        resp.close();
×
219
                    } catch (IOException e) {
×
220
                        log.debug("Error closing HTTP response", e);
×
221
                    }
×
222
                });
×
223
            } catch (UnsupportedOperationException | IOException ex) {
×
224
                log.info("Request failed: ", ex);
×
225
            }
226
        }
×
227
        return Stream.empty();
×
228
    }
229

230
    public static Nanopub retrieveNanopub(ClientSession mongoSession, String nanopubId) {
231
        Nanopub np = retrieveLocalNanopub(mongoSession, nanopubId);
×
232
        int tryCount = 0;
×
233
        while (np == null) {
×
234
            if (tryCount > 10) {
×
235
                throw new RuntimeException("Could not load nanopub: " + nanopubId);
×
236
            } else if (tryCount > 0) {
×
237
                try {
238
                    Thread.sleep(100);
×
239
                } catch (InterruptedException ex) {
×
240
                    log.info("Thread was interrupted", ex);
×
241
                }
×
242
            }
243
            log.info("Loading {}", nanopubId);
×
244

245
            // TODO Reach out to other Nanopub Registries here:
246
            np = getNanopub(nanopubId);
×
247
            if (np != null) {
×
248
                RegistryDB.loadNanopub(mongoSession, np);
×
249
            } else {
250
                tryCount = tryCount + 1;
×
251
            }
252
        }
253
        return np;
×
254
    }
255

256
    public static Nanopub retrieveLocalNanopub(ClientSession mongoSession, String nanopubId) {
257
        String ac = TrustyUriUtils.getArtifactCode(nanopubId);
×
258
        MongoCursor<Document> cursor = RegistryDB.get(mongoSession, Collection.NANOPUBS.toString(), new Document("_id", ac));
×
259
        if (!cursor.hasNext()) return null;
×
260
        try {
261
            // Parse from Jelly, not TriG (it's faster)
262
            return JellyUtils.readFromDB(((Binary) cursor.next().get("jelly")).getData());
×
263
        } catch (RDF4JException | MalformedNanopubException ex) {
×
264
            log.info("Exception reading Jelly", ex);
×
265
            return null;
×
266
        }
267
    }
268

269
    // TODO Provide this method in nanopub-java (GetNanopub)
270
    private static Nanopub getNanopub(String uriOrArtifactCode) {
271
        List<String> peerUrls = new ArrayList<>(Utils.getPeerUrls());
×
272
        Collections.shuffle(peerUrls);
×
273
        String ac = GetNanopub.getArtifactCode(uriOrArtifactCode);
×
274
        if (!ac.startsWith(RdfModule.MODULE_ID)) {
×
275
            throw new IllegalArgumentException("Not a trusty URI of type RA");
×
276
        }
277
        while (!peerUrls.isEmpty()) {
×
278
            String peerUrl = peerUrls.removeFirst();
×
279
            try {
280
                Nanopub np = get(ac, peerUrl, NanopubUtils.getHttpClient());
×
281
                if (np != null) {
×
282
                    return np;
×
283
                }
284
            } catch (IOException | RDF4JException | MalformedNanopubException ex) {
×
285
                // ignore
286
            }
×
287
        }
×
288
        return null;
×
289
    }
290

291
    // TODO Provide this method in nanopub-java (GetNanopub)
292
    private static Nanopub get(String artifactCode, String registryUrl, HttpClient httpClient)
293
            throws IOException, RDF4JException, MalformedNanopubException {
294
        HttpGet get = null;
×
295
        // TODO Get in Jelly format:
296
        String getUrl = registryUrl + "np/" + artifactCode;
×
297
        try {
298
            get = new HttpGet(getUrl);
×
299
        } catch (IllegalArgumentException ex) {
×
300
            throw new IOException("invalid URL: " + getUrl);
×
301
        }
×
302
        get.setHeader("Accept", "application/trig");
×
303
        InputStream in = null;
×
304
        try {
305
            HttpResponse resp = httpClient.execute(get);
×
306
            if (!wasSuccessful(resp)) {
×
307
                EntityUtils.consumeQuietly(resp.getEntity());
×
308
                throw new IOException(resp.getStatusLine().toString());
×
309
            }
310
            in = resp.getEntity().getContent();
×
311
            Nanopub nanopub = new NanopubImpl(in, RDFFormat.TRIG);
×
312
            if (!TrustyNanopubUtils.isValidTrustyNanopub(nanopub)) {
×
313
                throw new MalformedNanopubException("Nanopub is not trusty");
×
314
            }
315
            return nanopub;
×
316
        } finally {
317
            if (in != null) in.close();
×
318
        }
319
    }
320

321
    private static boolean wasSuccessful(HttpResponse resp) {
322
        int c = resp.getStatusLine().getStatusCode();
×
323
        return c >= 200 && c < 300;
×
324
    }
325

326
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc