• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

knowledgepixels / nanopub-registry / 20568810065

29 Dec 2025 08:52AM UTC coverage: 7.6% (-0.02%) from 7.615%
20568810065

push

github

ashleycaselli
refactor: replace string literals with Collection enum constants

41 of 588 branches covered (6.97%)

Branch coverage included in aggregate %.

142 of 1820 relevant lines covered (7.8%)

1.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
src/main/java/com/knowledgepixels/registry/Task.java
1
package com.knowledgepixels.registry;
2

3
import com.mongodb.client.ClientSession;
4
import com.mongodb.client.FindIterable;
5
import com.mongodb.client.MongoCollection;
6
import com.mongodb.client.MongoCursor;
7
import net.trustyuri.TrustyUriUtils;
8
import org.apache.commons.lang.Validate;
9
import org.bson.Document;
10
import org.eclipse.rdf4j.model.IRI;
11
import org.eclipse.rdf4j.model.Statement;
12
import org.nanopub.Nanopub;
13
import org.nanopub.extra.index.IndexUtils;
14
import org.nanopub.extra.index.NanopubIndex;
15
import org.nanopub.extra.security.KeyDeclaration;
16
import org.nanopub.extra.setting.IntroNanopub;
17
import org.nanopub.extra.setting.NanopubSetting;
18
import org.slf4j.Logger;
19
import org.slf4j.LoggerFactory;
20

21
import java.io.Serializable;
22
import java.time.ZonedDateTime;
23
import java.util.*;
24
import java.util.concurrent.atomic.AtomicLong;
25

26
import static com.knowledgepixels.registry.EntryStatus.*;
27
import static com.knowledgepixels.registry.NanopubLoader.*;
28
import static com.knowledgepixels.registry.RegistryDB.*;
29
import static com.knowledgepixels.registry.ServerStatus.*;
30
import static com.mongodb.client.model.Filters.eq;
31
import static com.mongodb.client.model.Sorts.*;
32

33
public enum Task implements Serializable {
×
34

35
    INIT_DB {
×
36
        public void run(ClientSession s, Document taskDoc) {
37
            setServerStatus(s, launching);
×
38

39
            increaseStateCounter(s);
×
40
            if (RegistryDB.isInitialized(s)) throw new RuntimeException("DB already initialized");
×
41
            setValue(s, Collection.SERVER_INFO.toString(), "setupId", Math.abs(Utils.getRandom().nextLong()));
×
42
            setValue(s, Collection.SERVER_INFO.toString(), "testInstance", "true".equals(System.getenv("REGISTRY_TEST_INSTANCE")));
×
43
            schedule(s, LOAD_CONFIG);
×
44
        }
×
45

46
    },
47

48
    LOAD_CONFIG {
×
49
        public void run(ClientSession s, Document taskDoc) {
50
            if (getServerStatus(s) != launching) {
×
51
                throw new IllegalTaskStatusException("Illegal status for this task: " + getServerStatus(s));
×
52
            }
53

54
            if (System.getenv("REGISTRY_COVERAGE_TYPES") != null) {
×
55
                setValue(s, Collection.SERVER_INFO.toString(), "coverageTypes", System.getenv("REGISTRY_COVERAGE_TYPES"));
×
56
            }
57
            if (System.getenv("REGISTRY_COVERAGE_AGENTS") != null) {
×
58
                setValue(s, Collection.SERVER_INFO.toString(), "coverageAgents", System.getenv("REGISTRY_COVERAGE_AGENTS"));
×
59
            }
60
            schedule(s, LOAD_SETTING);
×
61
        }
×
62

63
    },
64

65
    LOAD_SETTING {
×
66
        public void run(ClientSession s, Document taskDoc) throws Exception {
67
            if (getServerStatus(s) != launching) {
×
68
                throw new IllegalTaskStatusException("Illegal status for this task: " + getServerStatus(s));
×
69
            }
70

71
            NanopubSetting settingNp = Utils.getSetting();
×
72
            String settingId = TrustyUriUtils.getArtifactCode(settingNp.getNanopub().getUri().stringValue());
×
73
            setValue(s, Collection.SETTING.toString(), "original", settingId);
×
74
            setValue(s, Collection.SETTING.toString(), "current", settingId);
×
75
            loadNanopub(s, settingNp.getNanopub());
×
76
            List<Document> bootstrapServices = new ArrayList<>();
×
77
            for (IRI i : settingNp.getBootstrapServices()) {
×
78
                bootstrapServices.add(new Document("_id", i.stringValue()));
×
79
            }
×
80
            // potentially currently hardcoded in the nanopub lib
81
            setValue(s, Collection.SETTING.toString(), "bootstrap-services", bootstrapServices);
×
82

83
            if (!"false".equals(System.getenv("REGISTRY_PERFORM_FULL_LOAD"))) {
×
84
                schedule(s, LOAD_FULL.withDelay(60 * 1000));
×
85
            }
86

87
            setServerStatus(s, coreLoading);
×
88
            schedule(s, INIT_COLLECTIONS);
×
89
        }
×
90

91
    },
92

93
    INIT_COLLECTIONS {
×
94

95
        // DB read from:
96
        // DB write to:  trustPaths, endorsements, accounts
97
        // This state is periodically executed
98

99
        public void run(ClientSession s, Document taskDoc) throws Exception {
100
            if (getServerStatus(s) != coreLoading && getServerStatus(s) != updating) {
×
101
                throw new IllegalTaskStatusException("Illegal status for this task: " + getServerStatus(s));
×
102
            }
103

104
            RegistryDB.initLoadingCollections(s);
×
105

106
            // since this may take long, we start with postfix "_loading"
107
            // and only at completion it's changed to trustPath, endorsements, accounts
108
            insert(s, "trustPaths_loading",
×
109
                    new Document("_id", "$")
110
                            .append("sorthash", "")
×
111
                            .append("agent", "$")
×
112
                            .append("pubkey", "$")
×
113
                            .append("depth", 0)
×
114
                            .append("ratio", 1.0d)
×
115
                            .append("type", "extended")
×
116
            );
117

118
            NanopubIndex agentIndex = IndexUtils.castToIndex(NanopubLoader.retrieveNanopub(s, Utils.getSetting().getAgentIntroCollection().stringValue()));
×
119
            loadNanopub(s, agentIndex);
×
120
            for (IRI el : agentIndex.getElements()) {
×
121
                String declarationAc = TrustyUriUtils.getArtifactCode(el.stringValue());
×
122
                Validate.notNull(declarationAc);
×
123

124
                insert(s, "endorsements_loading",
×
125
                        new Document("agent", "$")
126
                                .append("pubkey", "$")
×
127
                                .append("endorsedNanopub", declarationAc)
×
128
                                .append("source", getValue(s, Collection.SETTING.toString(), "current").toString())
×
129
                                .append("status", toRetrieve.getValue())
×
130

131
                );
132

133
            }
×
134
            insert(s, "accounts_loading",
×
135
                    new Document("agent", "$")
136
                            .append("pubkey", "$")
×
137
                            .append("status", visited.getValue())
×
138
                            .append("depth", 0)
×
139
            );
140

141
            log.info("Starting iteration at depth 0");
×
142
            schedule(s, LOAD_DECLARATIONS.with("depth", 1));
×
143
        }
×
144

145
        // At the end of this task, the base agent is initialized:
146
        // ------------------------------------------------------------
147
        //
148
        //              $$$$ ----endorses----> [intro]
149
        //              base                (to-retrieve)
150
        //              $$$$
151
        //            (visited)
152
        //
153
        //              [0] trust path
154
        //
155
        // ------------------------------------------------------------
156
        // Only one endorses-link to an introduction is shown here,
157
        // but there are typically several.
158

159
    },
160

161
    LOAD_DECLARATIONS {
×
162

163
        // In general, we have at this point accounts with
164
        // endorsement links to unvisited agent introductions:
165
        // ------------------------------------------------------------
166
        //
167
        //         o      ----endorses----> [intro]
168
        //    --> /#\  /o\___            (to-retrieve)
169
        //        / \  \_/^^^
170
        //         (visited)
171
        //
172
        //    ========[X] trust path
173
        //
174
        // ------------------------------------------------------------
175

176
        // DB read from: endorsements, trustEdges, accounts
177
        // DB write to:  endorsements, trustEdges, accounts
178

179
        public void run(ClientSession s, Document taskDoc) {
180

181
            int depth = taskDoc.getInteger("depth");
×
182

183
            if (has(s, "endorsements_loading", new Document("status", toRetrieve.getValue()))) {
×
184
                Document d = getOne(s, "endorsements_loading",
×
185
                        new DbEntryWrapper(toRetrieve).getDocument());
×
186

187
                IntroNanopub agentIntro = getAgentIntro(s, d.getString("endorsedNanopub"));
×
188
                if (agentIntro != null) {
×
189
                    String agentId = agentIntro.getUser().stringValue();
×
190

191
                    for (KeyDeclaration kd : agentIntro.getKeyDeclarations()) {
×
192
                        String sourceAgent = d.getString("agent");
×
193
                        Validate.notNull(sourceAgent);
×
194
                        String sourcePubkey = d.getString("pubkey");
×
195
                        Validate.notNull(sourcePubkey);
×
196
                        String sourceAc = d.getString("source");
×
197
                        Validate.notNull(sourceAc);
×
198
                        String agentPubkey = Utils.getHash(kd.getPublicKeyString());
×
199
                        Validate.notNull(agentPubkey);
×
200
                        Document trustEdge = new Document("fromAgent", sourceAgent)
×
201
                                .append("fromPubkey", sourcePubkey)
×
202
                                .append("toAgent", agentId)
×
203
                                .append("toPubkey", agentPubkey)
×
204
                                .append("source", sourceAc);
×
205
                        if (!has(s, "trustEdges", trustEdge)) {
×
206
                            boolean invalidated = has(s, "invalidations", new Document("invalidatedNp", sourceAc).append("invalidatingPubkey", sourcePubkey));
×
207
                            insert(s, "trustEdges", trustEdge.append("invalidated", invalidated));
×
208
                        }
209

210
                        Document agent = new Document("agent", agentId).append("pubkey", agentPubkey);
×
211
                        if (!has(s, "accounts_loading", agent)) {
×
212
                            insert(s, "accounts_loading", agent.append("status", seen.getValue()).append("depth", depth));
×
213
                        }
214
                    }
×
215

216
                    set(s, "endorsements_loading", d.append("status", retrieved.getValue()));
×
217
                } else {
×
218
                    set(s, "endorsements_loading", d.append("status", discarded.getValue()));
×
219
                }
220

221
                schedule(s, LOAD_DECLARATIONS.with("depth", depth));
×
222

223
            } else {
×
224
                schedule(s, EXPAND_TRUST_PATHS.with("depth", depth));
×
225
            }
226
        }
×
227

228
        // At the end of this step, the key declarations in the agent
229
        // introductions are loaded and the corresponding trust edges
230
        // established:
231
        // ------------------------------------------------------------
232
        //
233
        //        o      ----endorses----> [intro]
234
        //   --> /#\  /o\___                o
235
        //       / \  \_/^^^ ---trusts---> /#\  /o\___
236
        //        (visited)                / \  \_/^^^
237
        //                                   (seen)
238
        //
239
        //   ========[X] trust path
240
        //
241
        // ------------------------------------------------------------
242
        // Only one trust edge per introduction is shown here, but
243
        // there can be several.
244

245
    },
246

247
    EXPAND_TRUST_PATHS {
×
248

249
        // DB read from: accounts, trustPaths, trustEdges
250
        // DB write to:  accounts, trustPaths
251

252
        public void run(ClientSession s, Document taskDoc) {
253

254
            int depth = taskDoc.getInteger("depth");
×
255

256
            Document d = getOne(s, "accounts_loading",
×
257
                    new Document("status", visited.getValue())
×
258
                            .append("depth", depth - 1)
×
259
            );
260

261
            if (d != null) {
×
262

263
                String agentId = d.getString("agent");
×
264
                Validate.notNull(agentId);
×
265
                String pubkeyHash = d.getString("pubkey");
×
266
                Validate.notNull(pubkeyHash);
×
267

268
                Document trustPath = collection("trustPaths_loading").find(s,
×
269
                        new Document("agent", agentId).append("pubkey", pubkeyHash).append("type", "extended").append("depth", depth - 1)
×
270
                ).sort(orderBy(descending("ratio"), ascending("sorthash"))).first();
×
271

272
                if (trustPath == null) {
×
273
                    // Check it again in next iteration:
274
                    set(s, "accounts_loading", d.append("depth", depth));
×
275
                } else {
276
                    // Only first matching trust path is considered
277

278
                    Map<String, Document> newPaths = new HashMap<>();
×
279
                    Map<String, Set<String>> pubkeySets = new HashMap<>();
×
280
                    String currentSetting = getValue(s, Collection.SETTING.toString(), "current").toString();
×
281

282
                    MongoCursor<Document> edgeCursor = get(s, "trustEdges",
×
283
                            new Document("fromAgent", agentId)
284
                                    .append("fromPubkey", pubkeyHash)
×
285
                                    .append("invalidated", false)
×
286
                    );
287
                    while (edgeCursor.hasNext()) {
×
288
                        Document e = edgeCursor.next();
×
289

290
                        String agent = e.getString("toAgent");
×
291
                        Validate.notNull(agent);
×
292
                        String pubkey = e.getString("toPubkey");
×
293
                        Validate.notNull(pubkey);
×
294
                        String pathId = trustPath.getString("_id") + " " + agent + "|" + pubkey;
×
295
                        newPaths.put(pathId,
×
296
                                new Document("_id", pathId)
297
                                        .append("sorthash", Utils.getHash(currentSetting + " " + pathId))
×
298
                                        .append("agent", agent)
×
299
                                        .append("pubkey", pubkey)
×
300
                                        .append("depth", depth)
×
301
                                        .append("type", "extended")
×
302
                        );
303
                        if (!pubkeySets.containsKey(agent)) pubkeySets.put(agent, new HashSet<>());
×
304
                        pubkeySets.get(agent).add(pubkey);
×
305
                    }
×
306
                    for (String pathId : newPaths.keySet()) {
×
307
                        Document pd = newPaths.get(pathId);
×
308
                        // first divide by agents; then for each agent, divide by number of pubkeys:
309
                        double newRatio = (trustPath.getDouble("ratio") * 0.9) / pubkeySets.size() / pubkeySets.get(pd.getString("agent")).size();
×
310
                        insert(s, "trustPaths_loading", pd.append("ratio", newRatio));
×
311
                    }
×
312
                    set(s, "trustPaths_loading", trustPath.append("type", "primary"));
×
313
                    set(s, "accounts_loading", d.append("status", expanded.getValue()));
×
314
                }
315
                schedule(s, EXPAND_TRUST_PATHS.with("depth", depth));
×
316

317
            } else {
×
318

319
                schedule(s, LOAD_CORE.with("depth", depth).append("load-count", 0));
×
320

321
            }
322

323
        }
×
324

325
        // At the end of this step, trust paths are updated to include
326
        // the new accounts:
327
        // ------------------------------------------------------------
328
        //
329
        //         o      ----endorses----> [intro]
330
        //    --> /#\  /o\___                o
331
        //        / \  \_/^^^ ---trusts---> /#\  /o\___
332
        //        (expanded)                / \  \_/^^^
333
        //                                    (seen)
334
        //
335
        //    ========[X]=====================[X+1] trust path
336
        //
337
        // ------------------------------------------------------------
338
        // Only one trust path is shown here, but they branch out if
339
        // several trust edges are present.
340

341
    },
342

343
    LOAD_CORE {
×
344

345
        // From here on, we refocus on the head of the trust paths:
346
        // ------------------------------------------------------------
347
        //
348
        //         o
349
        //    --> /#\  /o\___
350
        //        / \  \_/^^^
351
        //          (seen)
352
        //
353
        //    ========[X] trust path
354
        //
355
        // ------------------------------------------------------------
356

357
        // DB read from: accounts, trustPaths, endorsements, lists
358
        // DB write to:  accounts, endorsements, lists
359

360
        public void run(ClientSession s, Document taskDoc) {
361

362
            int depth = taskDoc.getInteger("depth");
×
363
            int loadCount = taskDoc.getInteger("load-count");
×
364

365
            Document agentAccount = getOne(s, "accounts_loading",
×
366
                    new Document("depth", depth).append("status", seen.getValue()));
×
367
            final String agentId;
368
            final String pubkeyHash;
369
            final Document trustPath;
370
            if (agentAccount != null) {
×
371
                agentId = agentAccount.getString("agent");
×
372
                Validate.notNull(agentId);
×
373
                pubkeyHash = agentAccount.getString("pubkey");
×
374
                Validate.notNull(pubkeyHash);
×
375
                trustPath = getOne(s, "trustPaths_loading",
×
376
                        new Document("depth", depth)
×
377
                                .append("agent", agentId)
×
378
                                .append("pubkey", pubkeyHash)
×
379
                );
380
            } else {
381
                agentId = null;
×
382
                pubkeyHash = null;
×
383
                trustPath = null;
×
384
            }
385

386
            if (trustPath == null) {
×
387
                schedule(s, FINISH_ITERATION.with("depth", depth).append("load-count", loadCount));
×
388
            } else if (trustPath.getDouble("ratio") < MIN_TRUST_PATH_RATIO) {
×
389
                set(s, "accounts_loading", agentAccount.append("status", skipped.getValue()));
×
390
                Document d = new Document("pubkey", pubkeyHash).append("type", INTRO_TYPE_HASH);
×
391
                if (!has(s, "lists", d)) {
×
392
                    insert(s, "lists", d.append("status", encountered.getValue()));
×
393
                }
394
                schedule(s, LOAD_CORE.with("depth", depth).append("load-count", loadCount + 1));
×
395
            } else {
×
396
                // TODO check intro limit
397
                Document introList = new Document()
×
398
                        .append("pubkey", pubkeyHash)
×
399
                        .append("type", INTRO_TYPE_HASH)
×
400
                        .append("status", loading.getValue());
×
401
                if (!has(s, "lists", new Document("pubkey", pubkeyHash).append("type", INTRO_TYPE_HASH))) {
×
402
                    insert(s, "lists", introList);
×
403
                }
404

405
                try (var stream = NanopubLoader.retrieveNanopubsFromPeers(INTRO_TYPE_HASH, pubkeyHash)) {
×
406
                    stream.forEach(m -> {
×
407
                        if (!m.isSuccess())
×
408
                            throw new AbortingTaskException("Failed to download nanopub; aborting task...");
×
409
                        loadNanopub(s, m.getNanopub(), pubkeyHash, INTRO_TYPE);
×
410
                    });
×
411
                }
412

413
                set(s, "lists", introList.append("status", loaded.getValue()));
×
414

415
                // TODO check endorsement limit
416
                Document endorseList = new Document()
×
417
                        .append("pubkey", pubkeyHash)
×
418
                        .append("type", ENDORSE_TYPE_HASH)
×
419
                        .append("status", loading.getValue());
×
420
                if (!has(s, "lists", new Document("pubkey", pubkeyHash).append("type", ENDORSE_TYPE_HASH))) {
×
421
                    insert(s, "lists", endorseList);
×
422
                }
423

424
                try (var stream = NanopubLoader.retrieveNanopubsFromPeers(ENDORSE_TYPE_HASH, pubkeyHash)) {
×
425
                    stream.forEach(m -> {
×
426
                        if (!m.isSuccess())
×
427
                            throw new AbortingTaskException("Failed to download nanopub; aborting task...");
×
428
                        Nanopub nanopub = m.getNanopub();
×
429
                        loadNanopub(s, nanopub, pubkeyHash, ENDORSE_TYPE);
×
430
                        String sourceNpId = TrustyUriUtils.getArtifactCode(nanopub.getUri().stringValue());
×
431
                        Validate.notNull(sourceNpId);
×
432
                        for (Statement st : nanopub.getAssertion()) {
×
433
                            if (!st.getPredicate().equals(Utils.APPROVES_OF)) continue;
×
434
                            if (!(st.getObject() instanceof IRI)) continue;
×
435
                            if (!agentId.equals(st.getSubject().stringValue())) continue;
×
436
                            String objStr = st.getObject().stringValue();
×
437
                            if (!TrustyUriUtils.isPotentialTrustyUri(objStr)) continue;
×
438
                            String endorsedNpId = TrustyUriUtils.getArtifactCode(objStr);
×
439
                            Validate.notNull(endorsedNpId);
×
440
                            Document endorsement = new Document("agent", agentId)
×
441
                                    .append("pubkey", pubkeyHash)
×
442
                                    .append("endorsedNanopub", endorsedNpId)
×
443
                                    .append("source", sourceNpId);
×
444
                            if (!has(s, "endorsements_loading", endorsement)) {
×
445
                                insert(s, "endorsements_loading",
×
446
                                        endorsement.append("status", toRetrieve.getValue()));
×
447
                            }
448
                        }
×
449
                    });
×
450
                }
451

452
                set(s, "lists", endorseList.append("status", loaded.getValue()));
×
453

454
                Document df = new Document("pubkey", pubkeyHash).append("type", "$");
×
455
                if (!has(s, "lists", df)) insert(s, "lists",
×
456
                        df.append("status", encountered.getValue()));
×
457

458
                set(s, "accounts_loading", agentAccount.append("status", visited.getValue()));
×
459

460
                schedule(s, LOAD_CORE.with("depth", depth).append("load-count", loadCount + 1));
×
461
            }
462

463
        }
×
464

465
        // At the end of this step, we have added new endorsement
466
        // links to yet-to-retrieve agent introductions:
467
        // ------------------------------------------------------------
468
        //
469
        //         o      ----endorses----> [intro]
470
        //    --> /#\  /o\___            (to-retrieve)
471
        //        / \  \_/^^^
472
        //         (visited)
473
        //
474
        //    ========[X] trust path
475
        //
476
        // ------------------------------------------------------------
477
        // Only one endorsement is shown here, but there are typically
478
        // several.
479

480
    },
481

482
    FINISH_ITERATION {
×
483
        public void run(ClientSession s, Document taskDoc) {
484

485
            int depth = taskDoc.getInteger("depth");
×
486
            int loadCount = taskDoc.getInteger("load-count");
×
487

488
            if (loadCount == 0) {
×
489
                log.info("No new cores loaded; finishing iteration");
×
490
                schedule(s, CALCULATE_TRUST_SCORES);
×
491
            } else if (depth == MAX_TRUST_PATH_DEPTH) {
×
492
                log.info("Maximum depth reached: {}", depth);
×
493
                schedule(s, CALCULATE_TRUST_SCORES);
×
494
            } else {
495
                log.info("Progressing iteration at depth {}", depth + 1);
×
496
                schedule(s, LOAD_DECLARATIONS.with("depth", depth + 1));
×
497
            }
498

499
        }
×
500

501
    },
502

503
    CALCULATE_TRUST_SCORES {
×
504

505
        // DB read from: accounts, trustPaths
506
        // DB write to:  accounts
507

508
        public void run(ClientSession s, Document taskDoc) {
509

510
            Document d = getOne(s, "accounts_loading", new Document("status", expanded.getValue()));
×
511

512
            if (d == null) {
×
513
                schedule(s, AGGREGATE_AGENTS);
×
514
            } else {
515
                double ratio = 0.0;
×
516
                Map<String, Boolean> seenPathElements = new HashMap<>();
×
517
                int pathCount = 0;
×
518
                MongoCursor<Document> trustPaths = collection("trustPaths_loading").find(s,
×
519
                        new Document("agent", d.get("agent").toString()).append("pubkey", d.get("pubkey").toString())
×
520
                ).sort(orderBy(ascending("depth"), descending("ratio"), ascending("sorthash"))).cursor();
×
521
                while (trustPaths.hasNext()) {
×
522
                    Document trustPath = trustPaths.next();
×
523
                    ratio += trustPath.getDouble("ratio");
×
524
                    boolean independentPath = true;
×
525
                    String[] pathElements = trustPath.getString("_id").split(" ");
×
526
                    // Iterate over path elements, ignoring first (root) and last (this agent/pubkey):
527
                    for (int i = 1; i < pathElements.length - 1; i++) {
×
528
                        String p = pathElements[i];
×
529
                        if (seenPathElements.containsKey(p)) {
×
530
                            independentPath = false;
×
531
                            break;
×
532
                        }
533
                        seenPathElements.put(p, true);
×
534
                    }
535
                    if (independentPath) pathCount += 1;
×
536
                }
×
537
                double rawQuota = GLOBAL_QUOTA * ratio;
×
538
                int quota = (int) rawQuota;
×
539
                if (rawQuota < MIN_USER_QUOTA) {
×
540
                    quota = MIN_USER_QUOTA;
×
541
                } else if (rawQuota > MAX_USER_QUOTA) {
×
542
                    quota = MAX_USER_QUOTA;
×
543
                }
544
                set(s, "accounts_loading",
×
545
                        d.append("status", processed.getValue())
×
546
                                .append("ratio", ratio)
×
547
                                .append("pathCount", pathCount)
×
548
                                .append("quota", quota)
×
549
                );
550
                schedule(s, CALCULATE_TRUST_SCORES);
×
551
            }
552

553
        }
×
554

555
    },
556

557
    AGGREGATE_AGENTS {
×
558

559
        // DB read from: accounts, agents
560
        // DB write to:  accounts, agents
561

562
        public void run(ClientSession s, Document taskDoc) {
563

564
            Document a = getOne(s, "accounts_loading", new Document("status", processed.getValue()));
×
565
            if (a == null) {
×
566
                schedule(s, ASSIGN_PUBKEYS);
×
567
            } else {
568
                Document agentId = new Document("agent", a.get("agent").toString()).append("status", processed.getValue());
×
569
                int count = 0;
×
570
                int pathCountSum = 0;
×
571
                double totalRatio = 0.0d;
×
572
                MongoCursor<Document> agentAccounts = collection("accounts_loading").find(s, agentId).cursor();
×
573
                while (agentAccounts.hasNext()) {
×
574
                    Document d = agentAccounts.next();
×
575
                    count++;
×
576
                    pathCountSum += d.getInteger("pathCount");
×
577
                    totalRatio += d.getDouble("ratio");
×
578
                }
×
579
                collection("accounts_loading").updateMany(s, agentId, new Document("$set",
×
580
                        new DbEntryWrapper(aggregated).getDocument()));
×
581
                insert(s, "agents_loading",
×
582
                        agentId.append("accountCount", count)
×
583
                                .append("avgPathCount", (double) pathCountSum / count)
×
584
                                .append("totalRatio", totalRatio)
×
585
                );
586
                schedule(s, AGGREGATE_AGENTS);
×
587
            }
588

589
        }
×
590

591
    },
592

593
    ASSIGN_PUBKEYS {
×
594

595
        // DB read from: accounts
596
        // DB write to:  accounts
597

598
        public void run(ClientSession s, Document taskDoc) {
599

600
            Document a = getOne(s, "accounts_loading", new DbEntryWrapper(aggregated).getDocument());
×
601
            if (a == null) {
×
602
                schedule(s, DETERMINE_UPDATES);
×
603
            } else {
604
                Document pubkeyId = new Document("pubkey", a.get("pubkey").toString());
×
605
                if (collection("accounts_loading").countDocuments(s, pubkeyId) == 1) {
×
606
                    collection("accounts_loading").updateMany(s, pubkeyId,
×
607
                            new Document("$set", new DbEntryWrapper(approved).getDocument()));
×
608
                } else {
609
                    // TODO At the moment all get marked as 'contested'; implement more nuanced algorithm
610
                    collection("accounts_loading").updateMany(s, pubkeyId, new Document("$set",
×
611
                            new DbEntryWrapper(contested).getDocument()));
×
612
                }
613
                schedule(s, ASSIGN_PUBKEYS);
×
614
            }
615

616
        }
×
617

618
    },
619

620
    DETERMINE_UPDATES {
×
621

622
        // DB read from: accounts
623
        // DB write to:  accounts
624

625
        public void run(ClientSession s, Document taskDoc) {
626

627
            // TODO Handle contested accounts properly:
628
            for (Document d : collection("accounts_loading").find(
×
629
                    new DbEntryWrapper(approved).getDocument())) {
×
630
                // TODO Consider quota too:
631
                Document accountId = new Document("agent", d.get("agent").toString()).append("pubkey", d.get("pubkey").toString());
×
632
                if (collection(Collection.ACCOUNTS.toString()) == null || !has(s, Collection.ACCOUNTS.toString(),
×
633
                        accountId.append("status", loaded.getValue()))) {
×
634
                    set(s, "accounts_loading", d.append("status", toLoad.getValue()));
×
635
                } else {
636
                    set(s, "accounts_loading", d.append("status", loaded.getValue()));
×
637
                }
638
            }
×
639
            schedule(s, FINALIZE_TRUST_STATE);
×
640

641
        }
×
642

643
    },
644

645
    FINALIZE_TRUST_STATE {
×
646
        // We do this is a separate task/transaction, because if we do it at the beginning of RELEASE_DATA, that task hangs and cannot
647
        // properly re-run (as some renaming outside of transactions will have taken place).
648
        public void run(ClientSession s, Document taskDoc) {
649
            String newTrustStateHash = RegistryDB.calculateTrustStateHash(s);
×
650
            String previousTrustStateHash = (String) getValue(s, Collection.SERVER_INFO.toString(), "trustStateHash");  // may be null
×
651
            setValue(s, Collection.SERVER_INFO.toString(), "lastTrustStateUpdate", ZonedDateTime.now().toString());
×
652

653
            schedule(s, RELEASE_DATA.with("newTrustStateHash", newTrustStateHash).append("previousTrustStateHash", previousTrustStateHash));
×
654
        }
×
655

656
    },
657

658
    RELEASE_DATA {
×
659
        public void run(ClientSession s, Document taskDoc) {
660
            ServerStatus status = getServerStatus(s);
×
661

662
            String newTrustStateHash = taskDoc.get("newTrustStateHash").toString();
×
663
            String previousTrustStateHash = taskDoc.getString("previousTrustStateHash");  // may be null
×
664

665
            // Renaming collections is run outside of a transaction, but is idempotent operation, so can safely be retried if task fails:
666
            rename("accounts_loading", Collection.ACCOUNTS.toString());
×
667
            rename("trustPaths_loading", "trustPaths");
×
668
            rename("agents_loading", Collection.AGENTS.toString());
×
669
            rename("endorsements_loading", "endorsements");
×
670

671
            if (previousTrustStateHash == null || !previousTrustStateHash.equals(newTrustStateHash)) {
×
672
                increaseStateCounter(s);
×
673
                setValue(s, Collection.SERVER_INFO.toString(), "trustStateHash", newTrustStateHash);
×
674
                insert(s, "debug_trustPaths", new Document()
×
675
                        .append("trustStateTxt", DebugPage.getTrustPathsTxt(s))
×
676
                        .append("trustStateHash", newTrustStateHash)
×
677
                        .append("trustStateCounter", getValue(s, Collection.SERVER_INFO.toString(), "trustStateCounter"))
×
678
                );
679
            }
680

681
            if (status == coreLoading) {
×
682
                setServerStatus(s, coreReady);
×
683
            } else {
684
                setServerStatus(s, ready);
×
685
            }
686

687
            // Run update after 1h:
688
            schedule(s, UPDATE.withDelay(60 * 60 * 1000));
×
689
        }
×
690

691
    },
692

693
    UPDATE {
×
694
        public void run(ClientSession s, Document taskDoc) {
695

696
            ServerStatus status = getServerStatus(s);
×
697
            if (status == ready || status == coreReady) {
×
698
                setServerStatus(s, updating);
×
699
                schedule(s, INIT_COLLECTIONS);
×
700
            } else {
701
                log.info("Postponing update; currently in status {}", status);
×
702
                schedule(s, UPDATE.withDelay(10 * 60 * 1000));
×
703
            }
704

705
        }
×
706

707
    },
708

709
    LOAD_FULL {
×
710
        public void run(ClientSession s, Document taskDoc) {
711
            if ("false".equals(System.getenv("REGISTRY_PERFORM_FULL_LOAD"))) return;
×
712

713
            ServerStatus status = getServerStatus(s);
×
714
            if (status != coreReady && status != ready) {
×
715
                log.info("Server currently not ready; checking again later");
×
716
                schedule(s, LOAD_FULL.withDelay(60 * 1000));
×
717
                return;
×
718
            }
719

720
            Document a = getOne(s, Collection.ACCOUNTS.toString(), new DbEntryWrapper(toLoad).getDocument());
×
721
            if (a == null) {
×
722
                log.info("Nothing to load");
×
723
                if (status == coreReady) {
×
724
                    log.info("Full load finished");
×
725
                    setServerStatus(s, ready);
×
726
                }
727
                log.info("Scheduling optional loading checks");
×
728
                schedule(s, CHECK_MORE_PUBKEYS.withDelay(100));
×
729
            } else {
730
                final String ph = a.getString("pubkey");
×
731
                if (!ph.equals("$")) {
×
732
                    try (var stream = NanopubLoader.retrieveNanopubsFromPeers("$", ph)) {
×
733
                        long startTime = System.nanoTime();
×
734
                        AtomicLong loaded = new AtomicLong(0);
×
735
                        stream.forEach(m -> {
×
736
                            if (!m.isSuccess())
×
737
                                throw new AbortingTaskException("Failed to download nanopub; aborting task...");
×
738
                            loadNanopub(s, m.getNanopub(), ph, "$");
×
739
                            loaded.incrementAndGet();
×
740
                        });
×
741
                        double timeSeconds = (System.nanoTime() - startTime) * 1e-9;
×
742
                        log.info("Loaded {} nanopubs in {}s, {} np/s",
×
743
                                loaded.get(), timeSeconds, String.format("%.2f", loaded.get() / timeSeconds));
×
744
                    }
745
                }
746

747
                Document l = getOne(s, "lists", new Document().append("pubkey", ph).append("type", "$"));
×
748
                if (l != null) set(s, "lists", l.append("status", loaded.getValue()));
×
749
                set(s, Collection.ACCOUNTS.toString(), a.append("status", loaded.getValue()));
×
750

751
                schedule(s, LOAD_FULL.withDelay(100));
×
752
            }
753
        }
×
754

755
        @Override
756
        public boolean runAsTransaction() {
757
            // TODO Make this a transaction once we connect to other Nanopub Registry instances:
758
            return false;
×
759
        }
760

761
    },
762

763
    CHECK_MORE_PUBKEYS {
×
764
        public void run(ClientSession s, Document taskDoc) {
765
            try {
766
                for (String pubkeyHash : Utils.retrieveListFromJsonUrl(Utils.getRandomPeer() + "pubkeys.json")) {
×
767
                    Validate.notNull(pubkeyHash);
×
768
                    Document d = new Document("pubkey", pubkeyHash).append("type", INTRO_TYPE_HASH);
×
769
                    if (!has(s, "lists", d)) {
×
770
                        insert(s, "lists", d.append("status", encountered.getValue()));
×
771
                    }
772
                }
×
773
            } catch (Exception ex) {
×
774
                throw new RuntimeException(ex);
×
775
            }
×
776

777
            schedule(s, RUN_OPTIONAL_LOAD.withDelay(100));
×
778
        }
×
779

780
    },
781

782
    RUN_OPTIONAL_LOAD {
×
783
        public void run(ClientSession s, Document taskDoc) {
784
            Document di = getOne(s, "lists", new Document("type", INTRO_TYPE_HASH).append("status", encountered.getValue()));
×
785
            if (di != null) {
×
786
                final String pubkeyHash = di.getString("pubkey");
×
787
                Validate.notNull(pubkeyHash);
×
788
                log.info("Optional core loading: {}", pubkeyHash);
×
789

790
                try (var stream = NanopubLoader.retrieveNanopubsFromPeers(INTRO_TYPE_HASH, pubkeyHash)) {
×
791
                    stream.forEach(m -> {
×
792
                        if (!m.isSuccess())
×
793
                            throw new AbortingTaskException("Failed to download nanopub; aborting task...");
×
794
                        loadNanopub(s, m.getNanopub(), pubkeyHash, INTRO_TYPE);
×
795
                    });
×
796
                }
797
                set(s, "lists", di.append("status", loaded.getValue()));
×
798

799
                try (var stream = NanopubLoader.retrieveNanopubsFromPeers(ENDORSE_TYPE_HASH, pubkeyHash)) {
×
800
                    stream.forEach(m -> {
×
801
                        if (!m.isSuccess())
×
802
                            throw new AbortingTaskException("Failed to download nanopub; aborting task...");
×
803
                        loadNanopub(s, m.getNanopub(), pubkeyHash, ENDORSE_TYPE);
×
804
                    });
×
805
                }
806

807
                Document de = new Document("pubkey", pubkeyHash).append("type", ENDORSE_TYPE_HASH);
×
808
                if (has(s, "lists", de)) {
×
809
                    set(s, "lists", de.append("status", loaded.getValue()));
×
810
                } else {
811
                    insert(s, "lists", de.append("status", loaded.getValue()));
×
812
                }
813

814
                Document df = new Document("pubkey", pubkeyHash).append("type", "$");
×
815
                if (!has(s, "lists", df)) insert(s, "lists", df.append("status", encountered.getValue()));
×
816

817
                schedule(s, CHECK_NEW.withDelay(100));
×
818
                return;
×
819
            }
820

821
            Document df = getOne(s, "lists", new Document("type", "$").append("status", encountered.getValue()));
×
822
            if (df != null) {
×
823
                final String pubkeyHash = df.getString("pubkey");
×
824
                log.info("Optional full loading: {}", pubkeyHash);
×
825

826
                try (var stream = NanopubLoader.retrieveNanopubsFromPeers("$", pubkeyHash)) {
×
827
                    stream.forEach(m -> {
×
828
                        if (!m.isSuccess())
×
829
                            throw new AbortingTaskException("Failed to download nanopub; aborting task...");
×
830
                        loadNanopub(s, m.getNanopub(), pubkeyHash, "$");
×
831
                    });
×
832
                }
833

834
                set(s, "lists", df.append("status", loaded.getValue()));
×
835
            }
836

837
            schedule(s, CHECK_NEW.withDelay(100));
×
838
        }
×
839

840
    },
841

842
    CHECK_NEW {
×
843
        public void run(ClientSession s, Document taskDoc) {
844
            // TODO Replace this legacy connection with checks at other Nanopub Registries:
845
            LegacyConnector.checkForNewNanopubs(s);
×
846
            // TODO Somehow throttle the loading of such potentially non-approved nanopubs
847

848
            schedule(s, LOAD_FULL.withDelay(100));
×
849
        }
×
850

851
    };
852

853
    private static final Logger log = LoggerFactory.getLogger(Task.class);
×
854

855
    public abstract void run(ClientSession s, Document taskDoc) throws Exception;
856

857
    public boolean runAsTransaction() {
858
        return true;
×
859
    }
860

861
    private Document doc() {
862
        return withDelay(0l);
×
863
    }
864

865
    private Document withDelay(long delay) {
866
        return new Document()
×
867
                .append("not-before", System.currentTimeMillis() + delay)
×
868
                .append("action", name());
×
869
    }
870

871
    private Document with(String key, Object value) {
872
        return doc().append(key, value);
×
873
    }
874

875
    // TODO Move these to setting:
876
    private static final int MAX_TRUST_PATH_DEPTH = 10;
877
    private static final double MIN_TRUST_PATH_RATIO = 0.00000001;
878
    //private static final double MIN_TRUST_PATH_RATIO = 0.01; // For testing
879
    private static final int GLOBAL_QUOTA = 100000000;
880
    private static final int MIN_USER_QUOTA = 100;
881
    private static final int MAX_USER_QUOTA = 10000;
882

883
    private static MongoCollection<Document> tasks = collection("tasks");
×
884

885
    /**
886
     * The super important base entry point!
887
     */
888
    static void runTasks() {
889
        try (ClientSession s = RegistryDB.getClient().startSession()) {
×
890
            if (!RegistryDB.isInitialized(s)) {
×
891
                schedule(s, INIT_DB); // does not yet execute, only schedules
×
892
            }
893

894
            while (true) {
895
                FindIterable<Document> taskResult = tasks.find(s).sort(ascending("not-before"));
×
896
                Document taskDoc = taskResult.first();
×
897
                long sleepTime = 10;
×
898
                if (taskDoc != null && taskDoc.getLong("not-before") < System.currentTimeMillis()) {
×
899
                    Task task = valueOf(taskDoc.getString("action"));
×
900
                    log.info("Running task: {}", task.name());
×
901
                    if (task.runAsTransaction()) {
×
902
                        try {
903
                            s.startTransaction();
×
904
                            log.info("Transaction started");
×
905
                            runTask(task, taskDoc);
×
906
                            s.commitTransaction();
×
907
                            log.info("Transaction committed");
×
908
                        } catch (Exception ex) {
×
909
                            log.info("Aborting transaction", ex);
×
910
                            abortTransaction(s, ex.getMessage());
×
911
                            log.info("Transaction aborted");
×
912
                            sleepTime = 1000;
×
913
                        } finally {
914
                            cleanTransactionWithRetry(s);
×
915
                        }
×
916
                    } else {
917
                        try {
918
                            runTask(task, taskDoc);
×
919
                        } catch (Exception ex) {
×
920
                            log.info("Transaction failed", ex);
×
921
                        }
×
922
                    }
923
                }
924
                try {
925
                    Thread.sleep(sleepTime);
×
926
                } catch (InterruptedException ex) {
×
927
                    // ignore
928
                }
×
929
            }
×
930
        }
931
    }
932

933
    static void runTask(Task task, Document taskDoc) throws Exception {
934
        try (ClientSession s = RegistryDB.getClient().startSession()) {
×
935
            task.run(s, taskDoc);
×
936
            tasks.deleteOne(s, eq("_id", taskDoc.get("_id")));
×
937
        }
938
    }
×
939

940
    public static void abortTransaction(ClientSession mongoSession, String message) {
941
        boolean successful = false;
×
942
        while (!successful) {
×
943
            try {
944
                if (mongoSession.hasActiveTransaction()) {
×
945
                    mongoSession.abortTransaction();
×
946
                }
947
                successful = true;
×
948
            } catch (Exception ex) {
×
949
                log.info("Aborting transaction failed. ", ex);
×
950
                try {
951
                    Thread.sleep(1000);
×
952
                } catch (InterruptedException iex) {
×
953
                    // ignore
954
                }
×
955
            }
×
956
        }
957
    }
×
958

959
    public synchronized static void cleanTransactionWithRetry(ClientSession mongoSession) {
960
        boolean successful = false;
×
961
        while (!successful) {
×
962
            try {
963
                if (mongoSession.hasActiveTransaction()) {
×
964
                    mongoSession.abortTransaction();
×
965
                }
966
                successful = true;
×
967
            } catch (Exception ex) {
×
968
                log.info("Cleaning transaction failed. ", ex);
×
969
                try {
970
                    Thread.sleep(1000);
×
971
                } catch (InterruptedException iex) {
×
972
                    // ignore
973
                }
×
974
            }
×
975
        }
976
    }
×
977

978
    private static IntroNanopub getAgentIntro(ClientSession mongoSession, String nanopubId) {
979
        IntroNanopub agentIntro = new IntroNanopub(NanopubLoader.retrieveNanopub(mongoSession, nanopubId));
×
980
        if (agentIntro.getUser() == null) return null;
×
981
        loadNanopub(mongoSession, agentIntro.getNanopub());
×
982
        return agentIntro;
×
983
    }
984

985
    private static void setServerStatus(ClientSession mongoSession, ServerStatus status) {
986
        setValue(mongoSession, Collection.SERVER_INFO.toString(), "status", status.toString());
×
987
    }
×
988

989
    private static ServerStatus getServerStatus(ClientSession mongoSession) {
990
        Object status = getValue(mongoSession, Collection.SERVER_INFO.toString(), "status");
×
991
        if (status == null) {
×
992
            throw new RuntimeException("Illegal DB state: serverInfo status unavailable");
×
993
        }
994
        return ServerStatus.valueOf(status.toString());
×
995
    }
996

997
    private static void schedule(ClientSession mongoSession, Task task) {
998
        schedule(mongoSession, task.doc());
×
999
    }
×
1000

1001
    private static void schedule(ClientSession mongoSession, Document taskDoc) {
1002
        log.info("Scheduling task: {}", taskDoc.get("action"));
×
1003
        tasks.insertOne(mongoSession, taskDoc);
×
1004
    }
×
1005

1006
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc