• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

knowledgepixels / nanopub-query / 22912978734

10 Mar 2026 04:27PM UTC coverage: 70.256% (+0.7%) from 69.573%
22912978734

push

github

tkuhn
fix(NanopubLoader): add max retry limit to prevent loading from hanging indefinitely

Infinite retry loops in loadNanopubToRepo, loadNanopubToLatest,
loadInvalidateStatements, getInvalidatingStatements, and loadNoteToRepo
could block the loading pipeline permanently on persistent errors.
After 30 retries (5 minutes), the error now propagates to loadUpdates()
which logs it and recovers on the next scheduled poll.

Closes #57

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

216 of 336 branches covered (64.29%)

Branch coverage included in aggregate %.

606 of 834 relevant lines covered (72.66%)

10.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.56
src/main/java/com/knowledgepixels/query/NanopubLoader.java
1
package com.knowledgepixels.query;
2

3
import net.trustyuri.TrustyUriUtils;
4
import org.apache.http.client.HttpClient;
5
import org.apache.http.impl.client.HttpClientBuilder;
6
import org.eclipse.rdf4j.common.exception.RDF4JException;
7
import org.eclipse.rdf4j.common.transaction.IsolationLevels;
8
import org.eclipse.rdf4j.model.*;
9
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
10
import org.eclipse.rdf4j.model.vocabulary.DCTERMS;
11
import org.eclipse.rdf4j.model.vocabulary.RDFS;
12
import org.eclipse.rdf4j.query.BindingSet;
13
import org.eclipse.rdf4j.query.QueryLanguage;
14
import org.eclipse.rdf4j.query.TupleQuery;
15
import org.eclipse.rdf4j.query.TupleQueryResult;
16
import org.eclipse.rdf4j.repository.RepositoryConnection;
17
import org.nanopub.Nanopub;
18
import org.nanopub.NanopubUtils;
19
import org.nanopub.SimpleCreatorPattern;
20
import org.nanopub.SimpleTimestampPattern;
21
import org.nanopub.extra.security.KeyDeclaration;
22
import org.nanopub.extra.security.MalformedCryptoElementException;
23
import org.nanopub.extra.security.NanopubSignatureElement;
24
import org.nanopub.extra.security.SignatureUtils;
25
import org.nanopub.extra.server.GetNanopub;
26
import org.nanopub.extra.setting.IntroNanopub;
27
import org.nanopub.vocabulary.NP;
28
import org.nanopub.vocabulary.NPA;
29
import org.nanopub.vocabulary.NPX;
30
import org.nanopub.vocabulary.PAV;
31
import org.slf4j.Logger;
32
import org.slf4j.LoggerFactory;
33

34
import java.security.GeneralSecurityException;
35
import java.util.*;
36
import java.util.concurrent.ExecutionException;
37
import java.util.concurrent.Executors;
38
import java.util.concurrent.Future;
39
import java.util.concurrent.ThreadPoolExecutor;
40
import java.util.function.Consumer;
41

42
/**
43
 * Utility class for loading nanopublications into the database.
44
 */
45
public class NanopubLoader {
46

47
    private static HttpClient httpClient;
48
    private static final ThreadPoolExecutor loadingPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
12✔
49
    private static final int MAX_RETRIES = 30;
50
    private static final int RETRY_DELAY_MS = 10000;
51
    private Nanopub np;
52
    private NanopubSignatureElement el = null;
9✔
53
    private List<Statement> metaStatements = new ArrayList<>();
15✔
54
    private List<Statement> nanopubStatements = new ArrayList<>();
15✔
55
    private List<Statement> literalStatements = new ArrayList<>();
15✔
56
    private List<Statement> invalidateStatements = new ArrayList<>();
15✔
57
    private List<Statement> textStatements, allStatements;
58
    private Calendar timestamp = null;
9✔
59
    private Statement pubkeyStatement, pubkeyStatementX;
60
    private List<String> notes = new ArrayList<>();
15✔
61
    private boolean aborted = false;
9✔
62
    private static final Logger log = LoggerFactory.getLogger(NanopubLoader.class);
9✔
63

64

65
    NanopubLoader(Nanopub np, long counter) {
6✔
66
        this.np = np;
9✔
67
        if (counter >= 0) {
12✔
68
            log.info("Loading {}: {}", counter, np.getUri());
24✔
69
        } else {
70
            log.info("Loading: {}", np.getUri());
15✔
71
        }
72

73
        // TODO Ensure proper synchronization and DB rollbacks
74

75
        // TODO Check for null characters ("\0"), which can cause problems in Virtuoso.
76

77
        String ac = TrustyUriUtils.getArtifactCode(np.getUri().toString());
15✔
78
        if (!np.getHeadUri().toString().contains(ac) || !np.getAssertionUri().toString().contains(ac) || !np.getProvenanceUri().toString().contains(ac) || !np.getPubinfoUri().toString().contains(ac)) {
72!
79
            notes.add("could not load nanopub as not all graphs contained the artifact code");
×
80
            aborted = true;
×
81
            return;
×
82
        }
83

84
        try {
85
            el = SignatureUtils.getSignatureElement(np);
12✔
86
        } catch (MalformedCryptoElementException ex) {
×
87
            notes.add("Signature error");
×
88
        }
3✔
89
        if (!hasValidSignature(el)) {
12✔
90
            aborted = true;
9✔
91
            return;
3✔
92
        }
93

94
        pubkeyStatement = vf.createStatement(np.getUri(), NPA.HAS_VALID_SIGNATURE_FOR_PUBLIC_KEY, vf.createLiteral(el.getPublicKeyString()), NPA.GRAPH);
39✔
95
        // @ADMIN-TRIPLE-TABLE@ NANOPUB, npa:hasValidSignatureForPublicKey, FULL_PUBKEY, npa:graph, meta, full pubkey if signature is valid
96
        metaStatements.add(pubkeyStatement);
18✔
97
        pubkeyStatementX = vf.createStatement(np.getUri(), NPA.HAS_VALID_SIGNATURE_FOR_PUBLIC_KEY_HASH, vf.createLiteral(Utils.createHash(el.getPublicKeyString())), NPA.GRAPH);
42✔
98
        // @ADMIN-TRIPLE-TABLE@ NANOPUB, npa:hasValidSignatureForPublicKeyHash, PUBKEY_HASH, npa:graph, meta, hex-encoded SHA256 hash if signature is valid
99
        metaStatements.add(pubkeyStatementX);
18✔
100

101
        if (el.getSigners().size() == 1) {  // > 1 is deprecated
18!
102
            metaStatements.add(vf.createStatement(np.getUri(), NPX.SIGNED_BY, el.getSigners().iterator().next(), NPA.GRAPH));
48✔
103
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, npx:signedBy, SIGNER, npa:graph, meta, ID of signer
104
        }
105

106
        Set<IRI> subIris = new HashSet<>();
12✔
107
        Set<IRI> otherNps = new HashSet<>();
12✔
108
        Set<IRI> invalidated = new HashSet<>();
12✔
109
        Set<IRI> retracted = new HashSet<>();
12✔
110
        Set<IRI> superseded = new HashSet<>();
12✔
111
        String combinedLiterals = "";
6✔
112
        for (Statement st : NanopubUtils.getStatements(np)) {
33✔
113
            nanopubStatements.add(st);
15✔
114

115
            if (st.getPredicate().toString().contains(ac)) {
18!
116
                subIris.add(st.getPredicate());
×
117
            } else {
118
                IRI b = getBaseTrustyUri(st.getPredicate());
12✔
119
                if (b != null) otherNps.add(b);
6!
120
            }
121
            if (st.getPredicate().equals(NPX.RETRACTS) && st.getObject() instanceof IRI) {
15!
122
                retracted.add((IRI) st.getObject());
×
123
            }
124
            if (st.getPredicate().equals(NPX.INVALIDATES) && st.getObject() instanceof IRI) {
15!
125
                invalidated.add((IRI) st.getObject());
×
126
            }
127
            if (st.getSubject().equals(np.getUri()) && st.getObject() instanceof IRI) {
30✔
128
                if (st.getPredicate().equals(NPX.SUPERSEDES)) {
15✔
129
                    superseded.add((IRI) st.getObject());
18✔
130
                }
131
                if (st.getObject().toString().matches(".*[^A-Za-z0-9\\-_]RA[A-Za-z0-9\\-_]{43}")) {
18✔
132
                    metaStatements.add(vf.createStatement(np.getUri(), st.getPredicate(), st.getObject(), NPA.NETWORK_GRAPH));
39✔
133
                    // @ADMIN-TRIPLE-TABLE@ NANOPUB1, RELATION, NANOPUB2, npa:networkGraph, meta, any inter-nanopub relation found in NANOPUB1
134
                }
135
                if (st.getContext().equals(np.getPubinfoUri())) {
18✔
136
                    if (st.getPredicate().equals(NPX.INTRODUCES) || st.getPredicate().equals(NPX.DESCRIBES) || st.getPredicate().equals(NPX.EMBEDS)) {
45!
137
                        metaStatements.add(vf.createStatement(np.getUri(), st.getPredicate(), st.getObject(), NPA.GRAPH));
39✔
138
                        // @ADMIN-TRIPLE-TABLE@ NANOPUB, npx:introduces, THING, npa:graph, meta, when such a triple is present in pubinfo of NANOPUB
139
                        // @ADMIN-TRIPLE-TABLE@ NANOPUB, npx:describes, THING, npa:graph, meta, when such a triple is present in pubinfo of NANOPUB
140
                        // @ADMIN-TRIPLE-TABLE@ NANOPUB, npx:embeds, THING, npa:graph, meta, when such a triple is present in pubinfo of NANOPUB
141
                    }
142
                }
143
            }
144
            if (st.getSubject().toString().contains(ac)) {
18✔
145
                subIris.add((IRI) st.getSubject());
21✔
146
            } else {
147
                IRI b = getBaseTrustyUri(st.getSubject());
12✔
148
                if (b != null) otherNps.add(b);
6!
149
            }
150
            if (st.getObject() instanceof IRI) {
12✔
151
                if (st.getObject().toString().contains(ac)) {
18✔
152
                    subIris.add((IRI) st.getObject());
21✔
153
                } else {
154
                    IRI b = getBaseTrustyUri(st.getObject());
12✔
155
                    if (b != null) otherNps.add(b);
18✔
156
                }
3✔
157
            } else {
158
                combinedLiterals += st.getObject().stringValue().replaceAll("\\s+", " ") + "\n";
27✔
159
//                                if (st.getSubject().equals(np.getUri()) && !st.getSubject().equals(HAS_FILTER_LITERAL)) {
160
//                                        literalStatements.add(vf.createStatement(np.getUri(), st.getPredicate(), st.getObject(), LITERAL_GRAPH));
161
//                                } else {
162
//                                        literalStatements.add(vf.createStatement(np.getUri(), HAS_LITERAL, st.getObject(), LITERAL_GRAPH));
163
//                                }
164
            }
165
        }
3✔
166
        subIris.remove(np.getUri());
15✔
167
        subIris.remove(np.getAssertionUri());
15✔
168
        subIris.remove(np.getProvenanceUri());
15✔
169
        subIris.remove(np.getPubinfoUri());
15✔
170
        for (IRI i : subIris) {
30✔
171
            metaStatements.add(vf.createStatement(np.getUri(), NPA.HAS_SUB_IRI, i, NPA.GRAPH));
33✔
172
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, npa:hasSubIri, SUB_IRI, npa:graph, meta, for any IRI minted in the namespace of the NANOPUB
173
        }
3✔
174
        for (IRI i : otherNps) {
30✔
175
            metaStatements.add(vf.createStatement(np.getUri(), NPA.REFERS_TO_NANOPUB, i, NPA.NETWORK_GRAPH));
33✔
176
            // @ADMIN-TRIPLE-TABLE@ NANOPUB1, npa:refersToNanopub, NANOPUB2, npa:networkGraph, meta, generic inter-nanopub relation
177
        }
3✔
178
        for (IRI i : invalidated) {
18!
179
            invalidateStatements.add(vf.createStatement(np.getUri(), NPX.INVALIDATES, i, NPA.GRAPH));
×
180
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, npx:invalidates, INVALIDATED_NANOPUB, npa:graph, meta, if the NANOPUB retracts or supersedes another nanopub
181
        }
×
182
        for (IRI i : retracted) {
18!
183
            invalidateStatements.add(vf.createStatement(np.getUri(), NPX.INVALIDATES, i, NPA.GRAPH));
×
184
            metaStatements.add(vf.createStatement(np.getUri(), NPX.RETRACTS, i, NPA.GRAPH));
×
185
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, npx:retracts, RETRACTED_NANOPUB, npa:graph, meta, if the NANOPUB retracts another nanopub
186
        }
×
187
        for (IRI i : superseded) {
30✔
188
            invalidateStatements.add(vf.createStatement(np.getUri(), NPX.INVALIDATES, i, NPA.GRAPH));
33✔
189
            metaStatements.add(vf.createStatement(np.getUri(), NPX.SUPERSEDES, i, NPA.GRAPH));
33✔
190
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, npx:supersedes, SUPERSEDED_NANOPUB, npa:graph, meta, if the NANOPUB supersedes another nanopub
191
        }
3✔
192

193
        metaStatements.add(vf.createStatement(np.getUri(), NPA.HAS_HEAD_GRAPH, np.getHeadUri(), NPA.GRAPH));
36✔
194
        // @ADMIN-TRIPLE-TABLE@ NANOPUB, npa:hasHeadGraph, HEAD_GRAPH, npa:graph, meta, direct link to the head graph of the NANOPUB
195
        metaStatements.add(vf.createStatement(np.getUri(), NPA.HAS_GRAPH, np.getHeadUri(), NPA.GRAPH));
36✔
196
        // @ADMIN-TRIPLE-TABLE@ NANOPUB, npa:hasGraph, GRAPH, npa:graph, meta, generic link to all four graphs of the given NANOPUB
197
        metaStatements.add(vf.createStatement(np.getUri(), NP.HAS_ASSERTION, np.getAssertionUri(), NPA.GRAPH));
36✔
198
        // @ADMIN-TRIPLE-TABLE@ NANOPUB, np:hasAssertion, ASSERTION_GRAPH, npa:graph, meta, direct link to the assertion graph of the NANOPUB
199
        metaStatements.add(vf.createStatement(np.getUri(), NPA.HAS_GRAPH, np.getAssertionUri(), NPA.GRAPH));
36✔
200
        metaStatements.add(vf.createStatement(np.getUri(), NP.HAS_PROVENANCE, np.getProvenanceUri(), NPA.GRAPH));
36✔
201
        // @ADMIN-TRIPLE-TABLE@ NANOPUB, np:hasProvenance, PROVENANCE_GRAPH, npa:graph, meta, direct link to the provenance graph of the NANOPUB
202
        metaStatements.add(vf.createStatement(np.getUri(), NPA.HAS_GRAPH, np.getProvenanceUri(), NPA.GRAPH));
36✔
203
        metaStatements.add(vf.createStatement(np.getUri(), NP.HAS_PUBINFO, np.getPubinfoUri(), NPA.GRAPH));
36✔
204
        // @ADMIN-TRIPLE-TABLE@ NANOPUB, np:hasPublicationInfo, PUBINFO_GRAPH, npa:graph, meta, direct link to the pubinfo graph of the NANOPUB
205
        metaStatements.add(vf.createStatement(np.getUri(), NPA.HAS_GRAPH, np.getPubinfoUri(), NPA.GRAPH));
36✔
206

207
        String artifactCode = TrustyUriUtils.getArtifactCode(np.getUri().stringValue());
15✔
208
        metaStatements.add(vf.createStatement(np.getUri(), NPA.ARTIFACT_CODE, vf.createLiteral(artifactCode), NPA.GRAPH));
39✔
209
        // @ADMIN-TRIPLE-TABLE@ NANOPUB, npa:artifactCode, ARTIFACT_CODE, npa:graph, meta, artifact code starting with 'RA...'
210

211
        if (isIntroNanopub(np)) {
9✔
212
            IntroNanopub introNp = new IntroNanopub(np);
15✔
213
            metaStatements.add(vf.createStatement(np.getUri(), NPA.IS_INTRODUCTION_OF, introNp.getUser(), NPA.GRAPH));
36✔
214
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, npa:isIntroductionOf, AGENT, npa:graph, meta, linking intro nanopub to the agent it is introducing
215
            for (KeyDeclaration kc : introNp.getKeyDeclarations()) {
33✔
216
                metaStatements.add(vf.createStatement(np.getUri(), NPA.DECLARES_PUBKEY, vf.createLiteral(kc.getPublicKeyString()), NPA.GRAPH));
42✔
217
                // @ADMIN-TRIPLE-TABLE@ NANOPUB, npa:declaresPubkey, FULL_PUBKEY, npa:graph, meta, full pubkey declared by the given intro NANOPUB
218
            }
3✔
219
        }
220

221
        try {
222
            timestamp = SimpleTimestampPattern.getCreationTime(np);
12✔
223
        } catch (IllegalArgumentException ex) {
×
224
            notes.add("Illegal date/time");
×
225
        }
3✔
226
        if (timestamp != null) {
9!
227
            metaStatements.add(vf.createStatement(np.getUri(), DCTERMS.CREATED, vf.createLiteral(timestamp.getTime()), NPA.GRAPH));
45✔
228
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, dct:created, CREATION_DATE, npa:graph, meta, normalized creation timestamp
229
        }
230

231
        String literalFilter = "_pubkey_" + Utils.createHash(el.getPublicKeyString());
18✔
232
        for (IRI typeIri : NanopubUtils.getTypes(np)) {
33✔
233
            metaStatements.add(vf.createStatement(np.getUri(), NPX.HAS_NANOPUB_TYPE, typeIri, NPA.GRAPH));
33✔
234
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, npx:hasNanopubType, NANOPUB_TYPE, npa:graph, meta, type of NANOPUB
235
            literalFilter += " _type_" + Utils.createHash(typeIri);
15✔
236
        }
3✔
237
        String label = NanopubUtils.getLabel(np);
9✔
238
        if (label != null) {
6!
239
            metaStatements.add(vf.createStatement(np.getUri(), RDFS.LABEL, vf.createLiteral(label), NPA.GRAPH));
39✔
240
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, rdfs:label, LABEL, npa:graph, meta, label of NANOPUB
241
        }
242
        String description = NanopubUtils.getDescription(np);
9✔
243
        if (description != null) {
6✔
244
            metaStatements.add(vf.createStatement(np.getUri(), DCTERMS.DESCRIPTION, vf.createLiteral(description), NPA.GRAPH));
39✔
245
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, dct:description, LABEL, npa:graph, meta, description of NANOPUB
246
        }
247
        for (IRI creatorIri : SimpleCreatorPattern.getCreators(np)) {
33✔
248
            metaStatements.add(vf.createStatement(np.getUri(), DCTERMS.CREATOR, creatorIri, NPA.GRAPH));
33✔
249
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, dct:creator, CREATOR, npa:graph, meta, creator of NANOPUB (can be several)
250
        }
3✔
251
        for (IRI authorIri : SimpleCreatorPattern.getAuthors(np)) {
21!
252
            metaStatements.add(vf.createStatement(np.getUri(), PAV.AUTHORED_BY, authorIri, NPA.GRAPH));
×
253
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, pav:authoredBy, AUTHOR, npa:graph, meta, author of NANOPUB (can be several)
254
        }
×
255

256
        if (!combinedLiterals.isEmpty()) {
9!
257
            literalStatements.add(vf.createStatement(np.getUri(), NPA.HAS_FILTER_LITERAL, vf.createLiteral(literalFilter + "\n" + combinedLiterals), NPA.GRAPH));
45✔
258
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, npa:hasFilterLiteral, FILTER_LITERAL, npa:graph, literal, auxiliary literal for filtering by type and pubkey in text repo
259
        }
260

261
        // Any statements that express that the currently processed nanopub is already invalidated:
262
        List<Statement> invalidatingStatements = getInvalidatingStatements(np.getUri());
12✔
263

264
        metaStatements.addAll(invalidateStatements);
18✔
265

266
        allStatements = new ArrayList<>(nanopubStatements);
21✔
267
        allStatements.addAll(metaStatements);
18✔
268
        allStatements.addAll(invalidatingStatements);
15✔
269

270
        textStatements = new ArrayList<>(literalStatements);
21✔
271
        textStatements.addAll(metaStatements);
18✔
272
        textStatements.addAll(invalidatingStatements);
15✔
273
    }
3✔
274

275
    /**
276
     * Get the HTTP client used for fetching nanopublications.
277
     *
278
     * @return the HTTP client
279
     */
280
    static HttpClient getHttpClient() {
281
        if (httpClient == null) {
6✔
282
            httpClient = HttpClientBuilder.create().setDefaultRequestConfig(Utils.getHttpRequestConfig()).build();
15✔
283
        }
284
        return httpClient;
6✔
285
    }
286

287
    /**
288
     * Load the given nanopublication into the database.
289
     *
290
     * @param nanopubUri Nanopublication identifier (URI)
291
     */
292
    public static void load(String nanopubUri) {
293
        if (isNanopubLoaded(nanopubUri)) {
9!
294
            log.info("Already loaded: {}", nanopubUri);
×
295
        } else {
296
            Nanopub np = GetNanopub.get(nanopubUri, getHttpClient());
12✔
297
            load(np, -1);
9✔
298
        }
299
    }
3✔
300

301
    /**
302
     * Load a nanopub into the database.
303
     *
304
     * @param np      the nanopub to load
305
     * @param counter the load counter, only used for logging (or -1 if not known)
306
     * @throws RDF4JException if the loading fails
307
     */
308
    public static void load(Nanopub np, long counter) throws RDF4JException {
309
        NanopubLoader loader = new NanopubLoader(np, counter);
18✔
310
        loader.executeLoading();
6✔
311
    }
3✔
312

313
    @GeneratedFlagForDependentElements
314
    private void executeLoading() {
315
        var runningTasks = new ArrayList<Future<?>>();
316
        Consumer<Runnable> runTask = t -> runningTasks.add(loadingPool.submit(t));
×
317

318
        for (String note : notes) {
319
            loadNoteToRepo(np.getUri(), note);
320
        }
321

322
        if (!aborted) {
323
            // Submit all tasks except the "meta" task
324
            if (timestamp != null) {
325
                if (new Date().getTime() - timestamp.getTimeInMillis() < THIRTY_DAYS) {
326
                    runTask.accept(() -> loadNanopubToLatest(allStatements));
×
327
                }
328
            }
329

330
            runTask.accept(() -> loadNanopubToRepo(np.getUri(), textStatements, "text"));
×
331
            runTask.accept(() -> loadNanopubToRepo(np.getUri(), allStatements, "full"));
×
332
            // Note: "meta" task is deferred until all other tasks complete successfully
333

334
            runTask.accept(() -> loadNanopubToRepo(np.getUri(), allStatements, "pubkey_" + Utils.createHash(el.getPublicKeyString())));
×
335
            //                loadNanopubToRepo(np.getUri(), textStatements, "text-pubkey_" + Utils.createHash(el.getPublicKeyString()));
336
            for (IRI typeIri : NanopubUtils.getTypes(np)) {
337
                // Exclude locally minted IRIs:
338
                if (typeIri.stringValue().startsWith(np.getUri().stringValue())) continue;
339
                if (!typeIri.stringValue().matches("https?://.*")) continue;
340
                runTask.accept(() -> loadNanopubToRepo(np.getUri(), allStatements, "type_" + Utils.createHash(typeIri)));
×
341
                //                        loadNanopubToRepo(np.getUri(), textStatements, "text-type_" + Utils.createHash(typeIri));
342
            }
343
            //                for (IRI creatorIri : SimpleCreatorPattern.getCreators(np)) {
344
            //                        // Exclude locally minted IRIs:
345
            //                        if (creatorIri.stringValue().startsWith(np.getUri().stringValue())) continue;
346
            //                        if (!creatorIri.stringValue().matches("https?://.*")) continue;
347
            //                        loadNanopubToRepo(np.getUri(), allStatements, "user_" + Utils.createHash(creatorIri));
348
            //                        loadNanopubToRepo(np.getUri(), textStatements, "text-user_" + Utils.createHash(creatorIri));
349
            //                }
350
            //                for (IRI authorIri : SimpleCreatorPattern.getAuthors(np)) {
351
            //                        // Exclude locally minted IRIs:
352
            //                        if (authorIri.stringValue().startsWith(np.getUri().stringValue())) continue;
353
            //                        if (!authorIri.stringValue().matches("https?://.*")) continue;
354
            //                        loadNanopubToRepo(np.getUri(), allStatements, "user_" + Utils.createHash(authorIri));
355
            //                        loadNanopubToRepo(np.getUri(), textStatements, "text-user_" + Utils.createHash(authorIri));
356
            //                }
357

358
            for (Statement st : invalidateStatements) {
359
                runTask.accept(() -> loadInvalidateStatements(np, el.getPublicKeyString(), st, pubkeyStatement, pubkeyStatementX));
×
360
            }
361

362
            // Wait for all non-meta tasks to complete successfully before submitting the meta task
363
            for (var task : runningTasks) {
364
                try {
365
                    task.get();
366
                } catch (ExecutionException | InterruptedException ex) {
367
                    throw new RuntimeException("Error in nanopub loading thread", ex.getCause());
368
                }
369
            }
370

371
            // Now submit and wait for the "meta" task after all other tasks have completed successfully
372
            Future<?> metaTask = loadingPool.submit(() -> loadNanopubToRepo(np.getUri(), metaStatements, "meta"));
×
373
            try {
374
                metaTask.get();
375
            } catch (ExecutionException | InterruptedException ex) {
376
                throw new RuntimeException("Error in nanopub loading thread (meta task)", ex.getCause());
377
            }
378
        }
379
    }
380

381
    private static Long lastUpdateOfLatestRepo = null;
6✔
382
    private static long THIRTY_DAYS = 1000L * 60 * 60 * 24 * 30;
6✔
383
    private static long ONE_HOUR = 1000L * 60 * 60;
6✔
384

385
    @GeneratedFlagForDependentElements
386
    private static void loadNanopubToLatest(List<Statement> statements) {
387
        boolean success = false;
388
        int retries = 0;
389
        while (!success) {
390
            RepositoryConnection conn = TripleStore.get().getRepoConnection("last30d");
391
            try (conn) {
392
                // Read committed, because deleting old nanopubs is idempotent. Inserts do not collide
393
                // with deletes, because we are not inserting old nanopubs.
394
                conn.begin(IsolationLevels.READ_COMMITTED);
395
                conn.add(statements);
396
                if (lastUpdateOfLatestRepo == null || new Date().getTime() - lastUpdateOfLatestRepo > ONE_HOUR) {
397
                    log.trace("Remove old nanopubs...");
398
                    Literal thirtyDaysAgo = vf.createLiteral(new Date(new Date().getTime() - THIRTY_DAYS));
399
                    TupleQuery q = conn.prepareTupleQuery(QueryLanguage.SPARQL, "SELECT * { graph <" + NPA.GRAPH + "> { " + "?np <" + DCTERMS.CREATED + "> ?date . " + "filter ( ?date < ?thirtydaysago ) " + "} }");
400
                    q.setBinding("thirtydaysago", thirtyDaysAgo);
401
                    try (TupleQueryResult r = q.evaluate()) {
402
                        while (r.hasNext()) {
403
                            BindingSet b = r.next();
404
                            IRI oldNpId = (IRI) b.getBinding("np").getValue();
405
                            log.trace("Remove old nanopub: {}", oldNpId);
406
                            for (Value v : Utils.getObjectsForPattern(conn, NPA.GRAPH, oldNpId, NPA.HAS_GRAPH)) {
407
                                // Remove all four nanopub graphs:
408
                                conn.remove((Resource) null, (IRI) null, (Value) null, (IRI) v);
409
                            }
410
                            // Remove nanopubs in admin graphs:
411
                            conn.remove(oldNpId, null, null, NPA.GRAPH);
412
                            conn.remove(oldNpId, null, null, NPA.NETWORK_GRAPH);
413
                        }
414
                    }
415
                    lastUpdateOfLatestRepo = new Date().getTime();
416
                }
417
                conn.commit();
418
                success = true;
419
            } catch (Exception ex) {
420
                log.info("Could not get environment variable", ex);
421
                if (conn.isActive()) conn.rollback();
422
            }
423
            if (!success) {
424
                retries++;
425
                if (retries >= MAX_RETRIES) {
426
                    throw new RuntimeException("Failed to load nanopub to last30d repo after " + MAX_RETRIES + " retries");
427
                }
428
                log.info("Retrying in 10 seconds (attempt {}/{})...", retries, MAX_RETRIES);
429
                try {
430
                    Thread.sleep(RETRY_DELAY_MS);
431
                } catch (InterruptedException x) {
432
                }
433
            }
434
        }
435
    }
436

437
    @GeneratedFlagForDependentElements
438
    private static void loadNanopubToRepo(IRI npId, List<Statement> statements, String repoName) {
439
        boolean success = false;
440
        int retries = 0;
441
        while (!success) {
442
            RepositoryConnection conn = TripleStore.get().getRepoConnection(repoName);
443
            try (conn) {
444
                // Serializable, because write skew would cause the chain of hashes to be broken.
445
                // The inserts must be done serially.
446
                conn.begin(IsolationLevels.SERIALIZABLE);
447
                var repoStatus = fetchRepoStatus(conn, npId);
448
                if (repoStatus.isLoaded) {
449
                    log.info("Already loaded: {}", npId);
450
                } else {
451
                    String newChecksum = NanopubUtils.updateXorChecksum(npId, repoStatus.checksum);
452
                    conn.remove(NPA.THIS_REPO, NPA.HAS_NANOPUB_COUNT, null, NPA.GRAPH);
453
                    conn.remove(NPA.THIS_REPO, NPA.HAS_NANOPUB_CHECKSUM, null, NPA.GRAPH);
454
                    conn.add(NPA.THIS_REPO, NPA.HAS_NANOPUB_COUNT, vf.createLiteral(repoStatus.count + 1), NPA.GRAPH);
455
                    // @ADMIN-TRIPLE-TABLE@ REPO, npa:hasNanopubCount, NANOPUB_COUNT, npa:graph, admin, number of nanopubs loaded
456
                    conn.add(NPA.THIS_REPO, NPA.HAS_NANOPUB_CHECKSUM, vf.createLiteral(newChecksum), NPA.GRAPH);
457
                    // @ADMIN-TRIPLE-TABLE@ REPO, npa:hasNanopubChecksum, NANOPUB_CHECKSUM, npa:graph, admin, checksum of all loaded nanopubs (order-independent XOR checksum on trusty URIs in Base64 notation)
458
                    conn.add(npId, NPA.HAS_LOAD_NUMBER, vf.createLiteral(repoStatus.count), NPA.GRAPH);
459
                    // @ADMIN-TRIPLE-TABLE@ NANOPUB, npa:hasLoadNumber, LOAD_NUMBER, npa:graph, admin, the sequential number at which this NANOPUB was loaded
460
                    conn.add(npId, NPA.HAS_LOAD_CHECKSUM, vf.createLiteral(newChecksum), NPA.GRAPH);
461
                    // @ADMIN-TRIPLE-TABLE@ NANOPUB, npa:hasLoadChecksum, LOAD_CHECKSUM, npa:graph, admin, the checksum of all loaded nanopubs after loading the given NANOPUB
462
                    conn.add(npId, NPA.HAS_LOAD_TIMESTAMP, vf.createLiteral(new Date()), NPA.GRAPH);
463
                    // @ADMIN-TRIPLE-TABLE@ NANOPUB, npa:hasLoadTimestamp, LOAD_TIMESTAMP, npa:graph, admin, the time point at which this NANOPUB was loaded
464
                    conn.add(statements);
465
                }
466
                conn.commit();
467
                success = true;
468
            } catch (Exception ex) {
469
                log.info("Could not load nanopub to repo. ", ex);
470
                if (conn.isActive()) conn.rollback();
471
            }
472
            if (!success) {
473
                retries++;
474
                if (retries >= MAX_RETRIES) {
475
                    throw new RuntimeException("Failed to load nanopub " + npId + " to repo " + repoName + " after " + MAX_RETRIES + " retries");
476
                }
477
                log.info("Retrying in 10 seconds (attempt {}/{})...", retries, MAX_RETRIES);
478
                try {
479
                    Thread.sleep(RETRY_DELAY_MS);
480
                } catch (InterruptedException x) {
481
                }
482
            }
483
        }
484
    }
485

486
    private record RepoStatus(boolean isLoaded, long count, String checksum) {
×
487
    }
488

489
    /**
490
     * To execute before loading a nanopub: check if the nanopub is already loaded and what is the
491
     * current load counter and checksum. This effectively batches three queries into one.
492
     * This method must be called from within a transaction.
493
     *
494
     * @param conn repo connection
495
     * @param npId nanopub ID
496
     * @return the current status
497
     */
498
    @GeneratedFlagForDependentElements
499
    private static RepoStatus fetchRepoStatus(RepositoryConnection conn, IRI npId) {
500
        var result = conn.prepareTupleQuery(QueryLanguage.SPARQL, REPO_STATUS_QUERY_TEMPLATE.formatted(npId)).evaluate();
501
        try (result) {
502
            if (!result.hasNext()) {
503
                // This may happen if the repo was created, but is completely empty.
504
                return new RepoStatus(false, 0, NanopubUtils.INIT_CHECKSUM);
505
            }
506
            var row = result.next();
507
            return new RepoStatus(row.hasBinding("loadNumber"), Long.parseLong(row.getBinding("count").getValue().stringValue()), row.getBinding("checksum").getValue().stringValue());
508
        }
509
    }
510

511
    @GeneratedFlagForDependentElements
512
    private static void loadInvalidateStatements(Nanopub thisNp, String thisPubkey, Statement invalidateStatement, Statement pubkeyStatement, Statement pubkeyStatementX) {
513
        boolean success = false;
514
        int retries = 0;
515
        while (!success) {
516
            List<RepositoryConnection> connections = new ArrayList<>();
517
            RepositoryConnection metaConn = TripleStore.get().getRepoConnection("meta");
518
            try {
519
                IRI invalidatedNpId = (IRI) invalidateStatement.getObject();
520
                // Basic isolation because here we only read append-only data.
521
                metaConn.begin(IsolationLevels.READ_COMMITTED);
522

523
                Value pubkeyValue = Utils.getObjectForPattern(metaConn, NPA.GRAPH, invalidatedNpId, NPA.HAS_VALID_SIGNATURE_FOR_PUBLIC_KEY);
524
                if (pubkeyValue != null) {
525
                    String pubkey = pubkeyValue.stringValue();
526

527
                    if (!pubkey.equals(thisPubkey)) {
528
                        //log.info("Adding invalidation expressed in " + thisNp.getUri() + " also to repo for pubkey " + pubkey);
529
                        connections.add(loadStatements("pubkey_" + Utils.createHash(pubkey), invalidateStatement, pubkeyStatement, pubkeyStatementX));
530
//                                                connections.add(loadStatements("text-pubkey_" + Utils.createHash(pubkey), invalidateStatement, pubkeyStatement));
531
                    }
532

533
                    for (Value v : Utils.getObjectsForPattern(metaConn, NPA.GRAPH, invalidatedNpId, NPX.HAS_NANOPUB_TYPE)) {
534
                        IRI typeIri = (IRI) v;
535
                        // TODO Avoid calling getTypes and getCreators multiple times:
536
                        if (!NanopubUtils.getTypes(thisNp).contains(typeIri)) {
537
                            //log.info("Adding invalidation expressed in " + thisNp.getUri() + " also to repo for type " + typeIri);
538
                            connections.add(loadStatements("type_" + Utils.createHash(typeIri), invalidateStatement, pubkeyStatement, pubkeyStatementX));
539
//                                                        connections.add(loadStatements("text-type_" + Utils.createHash(typeIri), invalidateStatement, pubkeyStatement));
540
                        }
541
                    }
542

543
//                                        for (Value v : Utils.getObjectsForPattern(metaConn, NPA.GRAPH, invalidatedNpId, DCTERMS.CREATOR)) {
544
//                                                IRI creatorIri = (IRI) v;
545
//                                                if (!SimpleCreatorPattern.getCreators(thisNp).contains(creatorIri)) {
546
//                                                        //log.info("Adding invalidation expressed in " + thisNp.getUri() + " also to repo for user " + creatorIri);
547
//                                                        connections.add(loadStatements("user_" + Utils.createHash(creatorIri), invalidateStatement, pubkeyStatement));
548
//                                                        connections.add(loadStatements("text-user_" + Utils.createHash(creatorIri), invalidateStatement, pubkeyStatement));
549
//                                                }
550
//                                        }
551
                }
552

553
                metaConn.commit();
554
                // TODO handle case that some commits succeed and some fail
555
                for (RepositoryConnection c : connections) c.commit();
556
                success = true;
557
            } catch (Exception ex) {
558
                log.info("Could not load invalidate statements. ", ex);
559
                if (metaConn.isActive()) metaConn.rollback();
560
                for (RepositoryConnection c : connections) {
561
                    if (c.isActive()) c.rollback();
562
                }
563
            } finally {
564
                metaConn.close();
565
                for (RepositoryConnection c : connections) c.close();
566
            }
567
            if (!success) {
568
                retries++;
569
                if (retries >= MAX_RETRIES) {
570
                    throw new RuntimeException("Failed to load invalidate statements for " + thisNp.getUri() + " after " + MAX_RETRIES + " retries");
571
                }
572
                log.info("Retrying in 10 seconds (attempt {}/{})...", retries, MAX_RETRIES);
573
                try {
574
                    Thread.sleep(RETRY_DELAY_MS);
575
                } catch (InterruptedException x) {
576
                }
577
            }
578
        }
579
    }
580

581
    @GeneratedFlagForDependentElements
582
    private static RepositoryConnection loadStatements(String repoName, Statement... statements) {
583
        RepositoryConnection conn = TripleStore.get().getRepoConnection(repoName);
584
        // Basic isolation: we only append new statements
585
        conn.begin(IsolationLevels.READ_COMMITTED);
586
        for (Statement st : statements) {
587
            conn.add(st);
588
        }
589
        return conn;
590
    }
591

592
    @GeneratedFlagForDependentElements
593
    static List<Statement> getInvalidatingStatements(IRI npId) {
594
        List<Statement> invalidatingStatements = new ArrayList<>();
595
        boolean success = false;
596
        int retries = 0;
597
        while (!success) {
598
            RepositoryConnection conn = TripleStore.get().getRepoConnection("meta");
599
            try (conn) {
600
                // Basic isolation because here we only read append-only data.
601
                conn.begin(IsolationLevels.READ_COMMITTED);
602

603
                TupleQueryResult r = conn.prepareTupleQuery(QueryLanguage.SPARQL, "SELECT * { graph <" + NPA.GRAPH + "> { " + "?np <" + NPX.INVALIDATES + "> <" + npId + "> ; <" + NPA.HAS_VALID_SIGNATURE_FOR_PUBLIC_KEY + "> ?pubkey . " + "} }").evaluate();
604
                try (r) {
605
                    while (r.hasNext()) {
606
                        BindingSet b = r.next();
607
                        invalidatingStatements.add(vf.createStatement((IRI) b.getBinding("np").getValue(), NPX.INVALIDATES, npId, NPA.GRAPH));
608
                        invalidatingStatements.add(vf.createStatement((IRI) b.getBinding("np").getValue(), NPA.HAS_VALID_SIGNATURE_FOR_PUBLIC_KEY, b.getBinding("pubkey").getValue(), NPA.GRAPH));
609
                    }
610
                }
611
                conn.commit();
612
                success = true;
613
            } catch (Exception ex) {
614
                log.info("Could not load invalidating statements. ", ex);
615
                if (conn.isActive()) conn.rollback();
616
            }
617
            if (!success) {
618
                retries++;
619
                if (retries >= MAX_RETRIES) {
620
                    throw new RuntimeException("Failed to get invalidating statements for " + npId + " after " + MAX_RETRIES + " retries");
621
                }
622
                log.info("Retrying in 10 seconds (attempt {}/{})...", retries, MAX_RETRIES);
623
                try {
624
                    Thread.sleep(RETRY_DELAY_MS);
625
                } catch (InterruptedException x) {
626
                }
627
            }
628
        }
629
        return invalidatingStatements;
630
    }
631

632
    @GeneratedFlagForDependentElements
633
    private static void loadNoteToRepo(Resource subj, String note) {
634
        boolean success = false;
635
        int retries = 0;
636
        while (!success) {
637
            RepositoryConnection conn = TripleStore.get().getAdminRepoConnection();
638
            try (conn) {
639
                List<Statement> statements = new ArrayList<>();
640
                statements.add(vf.createStatement(subj, NPA.NOTE, vf.createLiteral(note), NPA.GRAPH));
641
                conn.add(statements);
642
                success = true;
643
            } catch (Exception ex) {
644
                log.info("Could not load note to repo. ", ex);
645
            }
646
            if (!success) {
647
                retries++;
648
                if (retries >= MAX_RETRIES) {
649
                    throw new RuntimeException("Failed to load note to repo for " + subj + " after " + MAX_RETRIES + " retries");
650
                }
651
                log.info("Retrying in 10 seconds (attempt {}/{})...", retries, MAX_RETRIES);
652
                try {
653
                    Thread.sleep(RETRY_DELAY_MS);
654
                } catch (InterruptedException x) {
655
                }
656
            }
657
        }
658
    }
659

660
    static boolean hasValidSignature(NanopubSignatureElement el) {
661
        try {
662
            if (el != null && SignatureUtils.hasValidSignature(el) && el.getPublicKeyString() != null) {
24!
663
                return true;
6✔
664
            }
665
        } catch (GeneralSecurityException ex) {
3✔
666
            log.info("Error for signature element {}", el.getUri());
15✔
667
            log.info("Error", ex);
12✔
668
        }
3✔
669
        return false;
6✔
670
    }
671

672
    private static IRI getBaseTrustyUri(Value v) {
673
        if (!(v instanceof IRI)) return null;
9!
674
        String s = v.stringValue();
9✔
675
        if (!s.matches(".*[^A-Za-z0-9\\-_]RA[A-Za-z0-9\\-_]{43}([^A-Za-z0-9\\\\-_].{0,43})?")) {
12✔
676
            return null;
6✔
677
        }
678
        return vf.createIRI(s.replaceFirst("^(.*[^A-Za-z0-9\\-_]RA[A-Za-z0-9\\-_]{43})([^A-Za-z0-9\\\\-_].{0,43})?$", "$1"));
21✔
679
    }
680

681
    // TODO: Move this to nanopub library:
682
    private static boolean isIntroNanopub(Nanopub np) {
683
        for (Statement st : np.getAssertion()) {
33✔
684
            if (st.getPredicate().equals(NPX.DECLARED_BY)) return true;
21✔
685
        }
3✔
686
        return false;
6✔
687
    }
688

689
    /**
690
     * Check if a nanopub is already loaded in the admin graph.
691
     *
692
     * @param npId the nanopub ID
693
     * @return true if the nanopub is loaded, false otherwise
694
     */
695
    @GeneratedFlagForDependentElements
696
    static boolean isNanopubLoaded(String npId) {
697
        boolean loaded = false;
698
        RepositoryConnection conn = TripleStore.get().getRepoConnection("meta");
699
        try (conn) {
700
            if (Utils.getObjectForPattern(conn, NPA.GRAPH, vf.createIRI(npId), NPA.HAS_LOAD_NUMBER) != null) {
701
                loaded = true;
702
            }
703
        } catch (Exception ex) {
704
            log.info("Could no load nanopub. ", ex);
705
        }
706
        return loaded;
707
    }
708

709
    private static ValueFactory vf = SimpleValueFactory.getInstance();
6✔
710

711
    // TODO remove the constants and use the ones from the nanopub library instead
712

713
    /**
714
     * Template for the query that fetches the status of a repository.
715
     */
716
    // Template for .fetchRepoStatus
717
    private static final String REPO_STATUS_QUERY_TEMPLATE = """
84✔
718
            SELECT * { graph <%s> {
719
              OPTIONAL { <%s> <%s> ?loadNumber . }
720
              <%s> <%s> ?count ;
721
                   <%s> ?checksum .
722
            } }
723
            """.formatted(NPA.GRAPH, "%s", NPA.HAS_LOAD_NUMBER, NPA.THIS_REPO, NPA.HAS_NANOPUB_COUNT, NPA.HAS_NANOPUB_CHECKSUM);
6✔
724
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc