• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

knowledgepixels / nanopub-registry / 24089041807

07 Apr 2026 03:14PM UTC coverage: 33.374% (+0.5%) from 32.913%
24089041807

push

github

web-flow
Merge pull request #96 from knowledgepixels/feat/agent-quota-enforcement

feat: enforce agent/pubkey quota restrictions

273 of 900 branches covered (30.33%)

Branch coverage included in aggregate %.

809 of 2342 relevant lines covered (34.54%)

5.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

9.62
src/main/java/com/knowledgepixels/registry/NanopubLoader.java
1
package com.knowledgepixels.registry;
2

3
import com.mongodb.ErrorCategory;
4
import com.mongodb.MongoWriteException;
5
import com.mongodb.client.ClientSession;
6
import com.mongodb.client.MongoCursor;
7
import net.trustyuri.TrustyUriUtils;
8
import net.trustyuri.rdf.RdfModule;
9
import org.apache.http.Header;
10
import org.apache.http.HttpResponse;
11
import org.apache.http.client.HttpClient;
12
import org.apache.http.client.methods.CloseableHttpResponse;
13
import org.apache.http.client.methods.HttpGet;
14
import org.apache.http.util.EntityUtils;
15
import org.bson.Document;
16
import org.bson.types.Binary;
17
import org.eclipse.rdf4j.common.exception.RDF4JException;
18
import org.eclipse.rdf4j.rio.RDFFormat;
19
import org.nanopub.MalformedNanopubException;
20
import org.nanopub.Nanopub;
21
import org.nanopub.NanopubImpl;
22
import org.nanopub.NanopubUtils;
23
import org.nanopub.extra.server.GetNanopub;
24
import org.nanopub.jelly.JellyUtils;
25
import org.nanopub.jelly.MaybeNanopub;
26
import org.nanopub.jelly.NanopubStream;
27
import org.nanopub.trusty.TrustyNanopubUtils;
28
import org.nanopub.vocabulary.NPX;
29
import org.slf4j.Logger;
30
import org.slf4j.LoggerFactory;
31

32
import java.io.IOException;
33
import java.io.InputStream;
34
import java.util.ArrayList;
35
import java.util.Collections;
36
import java.util.List;
37
import java.util.concurrent.ExecutorService;
38
import java.util.concurrent.Executors;
39
import java.util.concurrent.Semaphore;
40
import java.util.concurrent.TimeUnit;
41
import java.util.concurrent.atomic.AtomicReference;
42
import java.util.function.Consumer;
43
import java.util.stream.Stream;
44

45
import static com.knowledgepixels.registry.RegistryDB.has;
46
import static com.knowledgepixels.registry.RegistryDB.insert;
47

48
public class NanopubLoader {
49

50
    private NanopubLoader() {
51
    }
52

53
    public final static String INTRO_TYPE = NPX.DECLARED_BY.stringValue();
9✔
54
    public final static String INTRO_TYPE_HASH = Utils.getHash(INTRO_TYPE);
9✔
55
    public final static String ENDORSE_TYPE = Utils.APPROVES_OF.stringValue();
9✔
56
    public final static String ENDORSE_TYPE_HASH = Utils.getHash(ENDORSE_TYPE);
9✔
57
    private static final Logger log = LoggerFactory.getLogger(NanopubLoader.class);
9✔
58

59
    // TODO Distinguish and support these cases:
60
    //      1. Simple load: load to all core lists if pubkey is "core-loaded", or load to all lists if pubkey is "full-loaded"
61
    //      2. Core load: load to all core lists (initialize if needed), or load to all lists if pubkey is "full-loaded"
62
    //      3. Full load: load to all lists (initialize if needed)
63

64
    public static void simpleLoad(ClientSession mongoSession, String nanopubId) {
65
        simpleLoad(mongoSession, nanopubId, true);
×
66
    }
×
67

68
    public static void simpleLoad(ClientSession mongoSession, String nanopubId, boolean persistOnRetrieve) {
69
        if (persistOnRetrieve) {
×
70
            simpleLoad(mongoSession, retrieveNanopub(mongoSession, nanopubId));
×
71
        } else {
72
            Nanopub np = retrieveLocalNanopub(mongoSession, nanopubId);
×
73
            if (np == null) {
×
74
                np = getNanopub(nanopubId);
×
75
            }
76
            if (np != null) {
×
77
                simpleLoad(mongoSession, np);
×
78
            }
79
        }
80
    }
×
81

82
    public static void simpleLoad(ClientSession mongoSession, Nanopub np) {
83
        String pubkey = RegistryDB.getPubkey(np);
×
84
        if (pubkey == null) {
×
85
            log.info("Ignore (not signed): {}", np.getUri());
×
86
            return;
×
87
        }
88
        simpleLoad(mongoSession, np, pubkey);
×
89
    }
×
90

91
    /**
92
     * Loads a nanopub to the appropriate lists, using a pre-verified public key
93
     * to skip redundant signature verification.
94
     */
95
    public static void simpleLoad(ClientSession mongoSession, Nanopub np, String verifiedPubkey) {
96
        String pubkeyHash = Utils.getHash(verifiedPubkey);
9✔
97
        if (!AgentFilter.isAllowed(mongoSession, pubkeyHash)) return;
12!
98
        if (AgentFilter.isOverQuota(mongoSession, pubkeyHash)) return;
12!
99
        // TODO Do we need to load anything else here, into the other DB collections?
100
        if (has(mongoSession, "lists", new Document("pubkey", pubkeyHash).append("type", "$").append("status", "loaded"))) {
45!
101
            RegistryDB.loadNanopubVerified(mongoSession, np, verifiedPubkey, pubkeyHash, "$");
×
102
        } else if (has(mongoSession, "lists", new Document("pubkey", pubkeyHash).append("type", INTRO_TYPE_HASH).append("status", "loaded"))) {
45!
103
            RegistryDB.loadNanopubVerified(mongoSession, np, verifiedPubkey, pubkeyHash, INTRO_TYPE, ENDORSE_TYPE);
×
104
        } else if (!has(mongoSession, "lists", new Document("pubkey", pubkeyHash).append("type", INTRO_TYPE_HASH))) {
36!
105
            // Unknown pubkey: create encountered intro list so RUN_OPTIONAL_LOAD picks it up
106
            try {
107
                insert(mongoSession, "lists", new Document("pubkey", pubkeyHash)
30✔
108
                        .append("type", INTRO_TYPE_HASH)
9✔
109
                        .append("status", EntryStatus.encountered.getValue()));
6✔
110
            } catch (MongoWriteException e) {
×
111
                if (e.getError().getCategory() != ErrorCategory.DUPLICATE_KEY) throw e;
×
112
            }
3✔
113
        }
114
    }
3✔
115

116
    private static final int LOAD_PARALLELISM = Integer.parseInt(
12✔
117
            Utils.getEnv("REGISTRY_LOAD_PARALLELISM", String.valueOf(Runtime.getRuntime().availableProcessors())));
12✔
118

119
    /**
120
     * Processes a stream of nanopubs in parallel using a thread pool.
121
     * Each worker thread uses its own MongoDB ClientSession.
122
     * Backpressure is applied via a semaphore to avoid unbounded memory growth.
123
     *
124
     * @param stream    the nanopub stream to process
125
     * @param processor consumer that processes each nanopub (called with its own ClientSession)
126
     */
127
    public static void loadStreamInParallel(Stream<MaybeNanopub> stream, Consumer<Nanopub> processor) {
128
        if (LOAD_PARALLELISM <= 1) {
×
129
            // Fall back to sequential processing
130
            stream.forEach(m -> {
×
131
                if (!m.isSuccess()) throw new AbortingTaskException("Failed to download nanopub; aborting task...");
×
132
                processor.accept(m.getNanopub());
×
133
            });
×
134
            return;
×
135
        }
136

137
        ExecutorService executor = Executors.newFixedThreadPool(LOAD_PARALLELISM);
×
138
        Semaphore semaphore = new Semaphore(LOAD_PARALLELISM * 2);
×
139
        AtomicReference<Exception> error = new AtomicReference<>();
×
140

141
        try {
142
            stream.forEach(m -> {
×
143
                if (error.get() != null) return;
×
144
                if (!m.isSuccess()) {
×
145
                    error.compareAndSet(null, new AbortingTaskException("Failed to download nanopub; aborting task..."));
×
146
                    return;
×
147
                }
148
                Nanopub np = m.getNanopub();
×
149
                try {
150
                    semaphore.acquire();
×
151
                } catch (InterruptedException e) {
×
152
                    Thread.currentThread().interrupt();
×
153
                    error.compareAndSet(null, e);
×
154
                    return;
×
155
                }
×
156
                executor.submit(() -> {
×
157
                    try {
158
                        processor.accept(np);
×
159
                    } catch (Exception e) {
×
160
                        error.compareAndSet(null, e);
×
161
                    } finally {
162
                        semaphore.release();
×
163
                    }
164
                });
×
165
            });
×
166
        } finally {
167
            executor.shutdown();
×
168
            try {
169
                executor.awaitTermination(1, TimeUnit.HOURS);
×
170
            } catch (InterruptedException e) {
×
171
                Thread.currentThread().interrupt();
×
172
            }
×
173
        }
174

175
        if (error.get() != null) {
×
176
            if (error.get() instanceof RuntimeException re) throw re;
×
177
            throw new RuntimeException("Parallel loading failed", error.get());
×
178
        }
179
    }
×
180

181
    /**
182
     * Retrieve Nanopubs from the peers of this Nanopub Registry.
183
     *
184
     * @param typeHash   The hash of the type of the Nanopub to retrieve.
185
     * @param pubkeyHash The hash of the pubkey of the Nanopub to retrieve.
186
     * @return A stream of MaybeNanopub objects, or an empty stream if no peer is available.
187
     */
188
    public static Stream<MaybeNanopub> retrieveNanopubsFromPeers(String typeHash, String pubkeyHash) {
189
        return retrieveNanopubsFromPeers(typeHash, pubkeyHash, null);
×
190
    }
191

192
    /**
193
     * Retrieve Nanopubs from the peers, optionally skipping ahead using checksums.
194
     *
195
     * @param typeHash        The hash of the type of the Nanopub to retrieve.
196
     * @param pubkeyHash      The hash of the pubkey of the Nanopub to retrieve.
197
     * @param afterChecksums  Comma-separated checksums for skip-ahead (geometric fallback), or null for full fetch.
198
     * @return A stream of MaybeNanopub objects, or an empty stream if no peer is available.
199
     */
200
    public static Stream<MaybeNanopub> retrieveNanopubsFromPeers(String typeHash, String pubkeyHash, String afterChecksums) {
201
        // TODO Move the code of this method to nanopub-java library.
202

203
        List<String> peerUrlsToTry = new ArrayList<>(Utils.getPeerUrls());
×
204
        Collections.shuffle(peerUrlsToTry);
×
205
        while (!peerUrlsToTry.isEmpty()) {
×
206
            String peerUrl = peerUrlsToTry.removeFirst();
×
207

208
            String requestUrl = peerUrl + "list/" + pubkeyHash + "/" + typeHash + ".jelly";
×
209
            if (afterChecksums != null) {
×
210
                requestUrl += "?afterChecksums=" + afterChecksums;
×
211
            }
212
            log.info("Request: {}", requestUrl);
×
213
            try {
214
                CloseableHttpResponse resp = NanopubUtils.getHttpClient().execute(new HttpGet(requestUrl));
×
215
                int httpStatus = resp.getStatusLine().getStatusCode();
×
216
                if (httpStatus < 200 || httpStatus >= 300) {
×
217
                    log.info("Request failed: {} {}", peerUrl, httpStatus);
×
218
                    EntityUtils.consumeQuietly(resp.getEntity());
×
219
                    continue;
×
220
                }
221
                Header nrStatus = resp.getFirstHeader("Nanopub-Registry-Status");
×
222
                if (nrStatus == null) {
×
223
                    log.info("Nanopub-Registry-Status header not found at: {}", peerUrl);
×
224
                    EntityUtils.consumeQuietly(resp.getEntity());
×
225
                    continue;
×
226
                } else if (!nrStatus.getValue().equals("ready") && !nrStatus.getValue().equals("updating")) {
×
227
                    log.info("Peer in non-ready state: {} {}", peerUrl, nrStatus.getValue());
×
228
                    EntityUtils.consumeQuietly(resp.getEntity());
×
229
                    continue;
×
230
                }
231
                InputStream is = resp.getEntity().getContent();
×
232
                return NanopubStream.fromByteStream(is).getAsNanopubs().onClose(() -> {
×
233
                    try {
234
                        resp.close();
×
235
                    } catch (IOException e) {
×
236
                        log.debug("Error closing HTTP response", e);
×
237
                    }
×
238
                });
×
239
            } catch (UnsupportedOperationException | IOException ex) {
×
240
                log.info("Request failed: ", ex);
×
241
            }
242
        }
×
243
        return Stream.empty();
×
244
    }
245

246
    public static Nanopub retrieveNanopub(ClientSession mongoSession, String nanopubId) {
247
        Nanopub np = retrieveLocalNanopub(mongoSession, nanopubId);
×
248
        int tryCount = 0;
×
249
        while (np == null) {
×
250
            if (tryCount > 10) {
×
251
                throw new RuntimeException("Could not load nanopub: " + nanopubId);
×
252
            } else if (tryCount > 0) {
×
253
                try {
254
                    Thread.sleep(100);
×
255
                } catch (InterruptedException ex) {
×
256
                    log.info("Thread was interrupted", ex);
×
257
                }
×
258
            }
259
            log.info("Loading {}", nanopubId);
×
260

261
            // TODO Reach out to other Nanopub Registries here:
262
            np = getNanopub(nanopubId);
×
263
            if (np != null) {
×
264
                RegistryDB.loadNanopub(mongoSession, np);
×
265
            } else {
266
                tryCount = tryCount + 1;
×
267
            }
268
        }
269
        return np;
×
270
    }
271

272
    public static Nanopub retrieveLocalNanopub(ClientSession mongoSession, String nanopubId) {
273
        String ac = TrustyUriUtils.getArtifactCode(nanopubId);
×
274
        MongoCursor<Document> cursor = RegistryDB.get(mongoSession, Collection.NANOPUBS.toString(), new Document("_id", ac));
×
275
        if (!cursor.hasNext()) return null;
×
276
        try {
277
            // Parse from Jelly, not TriG (it's faster)
278
            return JellyUtils.readFromDB(((Binary) cursor.next().get("jelly")).getData());
×
279
        } catch (RDF4JException | MalformedNanopubException ex) {
×
280
            log.info("Exception reading Jelly", ex);
×
281
            return null;
×
282
        }
283
    }
284

285
    // TODO Provide this method in nanopub-java (GetNanopub)
286
    private static Nanopub getNanopub(String uriOrArtifactCode) {
287
        List<String> peerUrls = new ArrayList<>(Utils.getPeerUrls());
×
288
        Collections.shuffle(peerUrls);
×
289
        String ac = GetNanopub.getArtifactCode(uriOrArtifactCode);
×
290
        if (!ac.startsWith(RdfModule.MODULE_ID)) {
×
291
            throw new IllegalArgumentException("Not a trusty URI of type RA");
×
292
        }
293
        while (!peerUrls.isEmpty()) {
×
294
            String peerUrl = peerUrls.removeFirst();
×
295
            try {
296
                Nanopub np = get(ac, peerUrl, NanopubUtils.getHttpClient());
×
297
                if (np != null) {
×
298
                    return np;
×
299
                }
300
            } catch (IOException | RDF4JException | MalformedNanopubException ex) {
×
301
                // ignore
302
            }
×
303
        }
×
304
        return null;
×
305
    }
306

307
    // TODO Provide this method in nanopub-java (GetNanopub)
308
    private static Nanopub get(String artifactCode, String registryUrl, HttpClient httpClient)
309
            throws IOException, RDF4JException, MalformedNanopubException {
310
        HttpGet get = null;
×
311
        // TODO Get in Jelly format:
312
        String getUrl = registryUrl + "np/" + artifactCode;
×
313
        try {
314
            get = new HttpGet(getUrl);
×
315
        } catch (IllegalArgumentException ex) {
×
316
            throw new IOException("invalid URL: " + getUrl);
×
317
        }
×
318
        get.setHeader("Accept", "application/trig");
×
319
        InputStream in = null;
×
320
        try {
321
            HttpResponse resp = httpClient.execute(get);
×
322
            if (!wasSuccessful(resp)) {
×
323
                EntityUtils.consumeQuietly(resp.getEntity());
×
324
                throw new IOException(resp.getStatusLine().toString());
×
325
            }
326
            in = resp.getEntity().getContent();
×
327
            Nanopub nanopub = new NanopubImpl(in, RDFFormat.TRIG);
×
328
            if (!TrustyNanopubUtils.isValidTrustyNanopub(nanopub)) {
×
329
                throw new MalformedNanopubException("Nanopub is not trusty");
×
330
            }
331
            return nanopub;
×
332
        } finally {
333
            if (in != null) in.close();
×
334
        }
335
    }
336

337
    private static boolean wasSuccessful(HttpResponse resp) {
338
        int c = resp.getStatusLine().getStatusCode();
×
339
        return c >= 200 && c < 300;
×
340
    }
341

342
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc