• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

knowledgepixels / nanopub-registry / 25001949631

27 Apr 2026 02:47PM UTC coverage: 31.779% (-0.03%) from 31.808%
25001949631

Pull #113

github

web-flow
Merge 7b4fc273f into 377ce4430
Pull Request #113: feat: stamp foaf:name + dct:created of declaring intro on accounts

294 of 1028 branches covered (28.6%)

Branch coverage included in aggregate %.

851 of 2575 relevant lines covered (33.05%)

5.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

10.95
src/main/java/com/knowledgepixels/registry/Task.java
1
package com.knowledgepixels.registry;
2

3
import com.knowledgepixels.registry.db.IndexInitializer;
4
import com.mongodb.client.ClientSession;
5
import com.mongodb.client.FindIterable;
6
import com.mongodb.client.MongoCollection;
7
import com.mongodb.client.MongoCursor;
8
import com.mongodb.client.model.ReplaceOptions;
9
import net.trustyuri.TrustyUriUtils;
10
import org.apache.commons.lang.Validate;
11
import org.bson.Document;
12
import org.eclipse.rdf4j.model.IRI;
13
import org.eclipse.rdf4j.model.Statement;
14
import org.nanopub.Nanopub;
15
import org.nanopub.SimpleTimestampPattern;
16
import org.nanopub.extra.index.IndexUtils;
17
import org.nanopub.extra.index.NanopubIndex;
18
import org.nanopub.extra.security.KeyDeclaration;
19
import org.nanopub.extra.setting.IntroNanopub;
20
import org.nanopub.extra.setting.NanopubSetting;
21
import org.slf4j.Logger;
22
import org.slf4j.LoggerFactory;
23

24
import java.io.Serializable;
25
import java.time.ZonedDateTime;
26
import java.util.*;
27
import java.util.concurrent.atomic.AtomicLong;
28

29
import static com.knowledgepixels.registry.EntryStatus.*;
30
import static com.knowledgepixels.registry.NanopubLoader.*;
31
import static com.knowledgepixels.registry.RegistryDB.*;
32
import static com.knowledgepixels.registry.ServerStatus.*;
33
import static com.mongodb.client.model.Filters.eq;
34
import static com.mongodb.client.model.Sorts.*;
35

36
public enum Task implements Serializable {
6✔
37

38
    INIT_DB {
33✔
39
        public void run(ClientSession s, Document taskDoc) {
40
            setServerStatus(s, launching);
9✔
41

42
            increaseStateCounter(s);
6✔
43
            if (RegistryDB.isInitialized(s)) {
9!
44
                throw new RuntimeException("DB already initialized");
×
45
            }
46
            setValue(s, Collection.SERVER_INFO.toString(), "setupId", Math.abs(Utils.getRandom().nextLong()));
27✔
47
            setValue(s, Collection.SERVER_INFO.toString(), "testInstance", "true".equals(System.getenv("REGISTRY_TEST_INSTANCE")));
30✔
48
            schedule(s, LOAD_CONFIG);
9✔
49
        }
3✔
50

51
    },
52

53
    LOAD_CONFIG {
33✔
54
        public void run(ClientSession s, Document taskDoc) {
55
            if (getServerStatus(s) != launching) {
12!
56
                throw new IllegalTaskStatusException("Illegal status for this task: " + getServerStatus(s));
×
57
            }
58

59
            if (System.getenv("REGISTRY_COVERAGE_TYPES") != null) {
9!
60
                setValue(s, Collection.SERVER_INFO.toString(), "coverageTypes", System.getenv("REGISTRY_COVERAGE_TYPES"));
×
61
            }
62
            if (System.getenv("REGISTRY_COVERAGE_AGENTS") != null) {
9!
63
                setValue(s, Collection.SERVER_INFO.toString(), "coverageAgents", System.getenv("REGISTRY_COVERAGE_AGENTS"));
×
64
            }
65
            schedule(s, LOAD_SETTING);
9✔
66
        }
3✔
67

68
    },
69

70
    LOAD_SETTING {
33✔
71
        public void run(ClientSession s, Document taskDoc) throws Exception {
72
            if (getServerStatus(s) != launching) {
12!
73
                throw new IllegalTaskStatusException("Illegal status for this task: " + getServerStatus(s));
×
74
            }
75

76
            NanopubSetting settingNp = Utils.getSetting();
6✔
77
            String settingId = TrustyUriUtils.getArtifactCode(settingNp.getNanopub().getUri().stringValue());
18✔
78
            setValue(s, Collection.SETTING.toString(), "original", settingId);
18✔
79
            setValue(s, Collection.SETTING.toString(), "current", settingId);
18✔
80
            loadNanopub(s, settingNp.getNanopub());
15✔
81
            List<Document> bootstrapServices = new ArrayList<>();
12✔
82
            for (IRI i : settingNp.getBootstrapServices()) {
33✔
83
                bootstrapServices.add(new Document("_id", i.stringValue()));
27✔
84
            }
3✔
85
            // potentially currently hardcoded in the nanopub lib
86
            setValue(s, Collection.SETTING.toString(), "bootstrap-services", bootstrapServices);
18✔
87

88
            if (!"false".equals(System.getenv("REGISTRY_PERFORM_FULL_LOAD"))) {
15!
89
                schedule(s, LOAD_FULL);
9✔
90
            }
91

92
            setServerStatus(s, coreLoading);
9✔
93
            schedule(s, INIT_COLLECTIONS);
9✔
94
        }
3✔
95

96
    },
97

98
    INIT_COLLECTIONS {
33✔
99

100
        // DB read from:
101
        // DB write to:  trustPaths, endorsements, accounts
102
        // This state is periodically executed
103

104
        public void run(ClientSession s, Document taskDoc) throws Exception {
105
            if (getServerStatus(s) != coreLoading && getServerStatus(s) != updating) {
×
106
                throw new IllegalTaskStatusException("Illegal status for this task: " + getServerStatus(s));
×
107
            }
108

109
            IndexInitializer.initLoadingCollections(s);
×
110

111
            if ("false".equals(System.getenv("REGISTRY_ENABLE_TRUST_CALCULATION"))) {
×
112
                log.info("Trust calculation disabled; skipping to FINALIZE_TRUST_STATE");
×
113
                for (Map.Entry<String, Integer> entry : AgentFilter.getExplicitPubkeys().entrySet()) {
×
114
                    String pubkeyHash = entry.getKey();
×
115
                    int quota = entry.getValue();
×
116
                    Document account = new Document("agent", "")
×
117
                            .append("pubkey", pubkeyHash)
×
118
                            .append("status", toLoad.getValue())
×
119
                            .append("depth", 0)
×
120
                            .append("quota", quota);
×
121
                    if (!has(s, "accounts_loading", new Document("pubkey", pubkeyHash))) {
×
122
                        insert(s, "accounts_loading", account);
×
123
                        log.info("Seeded explicit pubkey as account: {}", pubkeyHash);
×
124
                    }
125
                }
×
126
                schedule(s, FINALIZE_TRUST_STATE);
×
127
                return;
×
128
            }
129

130
            // since this may take long, we start with postfix "_loading"
131
            // and only at completion it's changed to trustPath, endorsements, accounts
132
            insert(s, "trustPaths_loading",
×
133
                    new Document("_id", "$")
134
                            .append("sorthash", "")
×
135
                            .append("agent", "$")
×
136
                            .append("pubkey", "$")
×
137
                            .append("depth", 0)
×
138
                            .append("ratio", 1.0d)
×
139
                            .append("type", "extended")
×
140
            );
141

142
            NanopubIndex agentIndex = IndexUtils.castToIndex(NanopubLoader.retrieveNanopub(s, Utils.getSetting().getAgentIntroCollection().stringValue()));
×
143
            loadNanopub(s, agentIndex);
×
144
            for (IRI el : agentIndex.getElements()) {
×
145
                String declarationAc = TrustyUriUtils.getArtifactCode(el.stringValue());
×
146
                Validate.notNull(declarationAc);
×
147

148
                insert(s, "endorsements_loading",
×
149
                        new Document("agent", "$")
150
                                .append("pubkey", "$")
×
151
                                .append("endorsedNanopub", declarationAc)
×
152
                                .append("source", getValue(s, Collection.SETTING.toString(), "current").toString())
×
153
                                .append("status", toRetrieve.getValue())
×
154

155
                );
156

157
            }
×
158
            insert(s, "accounts_loading",
×
159
                    new Document("agent", "$")
160
                            .append("pubkey", "$")
×
161
                            .append("status", visited.getValue())
×
162
                            .append("depth", 0)
×
163
            );
164

165
            log.info("Starting iteration at depth 0");
×
166
            schedule(s, LOAD_DECLARATIONS.with("depth", 1));
×
167
        }
×
168

169
        // At the end of this task, the base agent is initialized:
170
        // ------------------------------------------------------------
171
        //
172
        //              $$$$ ----endorses----> [intro]
173
        //              base                (to-retrieve)
174
        //              $$$$
175
        //            (visited)
176
        //
177
        //              [0] trust path
178
        //
179
        // ------------------------------------------------------------
180
        // Only one endorses-link to an introduction is shown here,
181
        // but there are typically several.
182

183
    },
184

185
    LOAD_DECLARATIONS {
33✔
186

187
        // In general, we have at this point accounts with
188
        // endorsement links to unvisited agent introductions:
189
        // ------------------------------------------------------------
190
        //
191
        //         o      ----endorses----> [intro]
192
        //    --> /#\  /o\___            (to-retrieve)
193
        //        / \  \_/^^^
194
        //         (visited)
195
        //
196
        //    ========[X] trust path
197
        //
198
        // ------------------------------------------------------------
199

200
        // DB read from: endorsements, trustEdges, accounts
201
        // DB write to:  endorsements, trustEdges, accounts
202

203
        public void run(ClientSession s, Document taskDoc) {
204

205
            int depth = taskDoc.getInteger("depth");
×
206

207
            while (true) {
208
                Document d = getOne(s, "endorsements_loading",
×
209
                        new DbEntryWrapper(toRetrieve).getDocument());
×
210
                if (d == null) break;
×
211

212
                IntroNanopub agentIntro = getAgentIntro(s, d.getString("endorsedNanopub"));
×
213
                if (agentIntro != null) {
×
214
                    String agentId = agentIntro.getUser().stringValue();
×
215
                    // foaf:name + dct:created of the intro nanopub. Same name applies to every
216
                    // KeyDeclaration in the intro, so resolve once outside the inner loop.
217
                    String introName = Utils.extractIntroName(agentIntro);
×
218
                    Calendar introCreatedCal = SimpleTimestampPattern.getCreationTime(agentIntro.getNanopub());
×
219
                    Date introCreatedAt = (introCreatedCal == null) ? null : introCreatedCal.getTime();
×
220

221
                    for (KeyDeclaration kd : agentIntro.getKeyDeclarations()) {
×
222
                        String sourceAgent = d.getString("agent");
×
223
                        Validate.notNull(sourceAgent);
×
224
                        String sourcePubkey = d.getString("pubkey");
×
225
                        Validate.notNull(sourcePubkey);
×
226
                        String sourceAc = d.getString("source");
×
227
                        Validate.notNull(sourceAc);
×
228
                        String agentPubkey = Utils.getHash(kd.getPublicKeyString());
×
229
                        Validate.notNull(agentPubkey);
×
230
                        Document trustEdge = new Document("fromAgent", sourceAgent)
×
231
                                .append("fromPubkey", sourcePubkey)
×
232
                                .append("toAgent", agentId)
×
233
                                .append("toPubkey", agentPubkey)
×
234
                                .append("source", sourceAc);
×
235
                        if (!has(s, "trustEdges", trustEdge)) {
×
236
                            boolean invalidated = has(s, "invalidations", new Document("invalidatedNp", sourceAc).append("invalidatingPubkey", sourcePubkey));
×
237
                            insert(s, "trustEdges", trustEdge.append("invalidated", invalidated));
×
238
                        }
239

240
                        Document agent = new Document("agent", agentId).append("pubkey", agentPubkey);
×
241
                        Document existing = collection("accounts_loading").find(s, agent).first();
×
242
                        if (existing == null) {
×
243
                            insert(s, "accounts_loading", agent
×
244
                                    .append("status", seen.getValue())
×
245
                                    .append("depth", depth)
×
246
                                    .append("name", introName)
×
247
                                    .append("nameCreatedAt", introCreatedAt));
×
248
                        } else if (introName != null) {
×
249
                            // Per-(agent, pubkey) name policy: keep the name from the intro
250
                            // with the latest dct:created. First write wins when no current
251
                            // timestamp exists; otherwise compare and replace iff strictly newer.
252
                            Date currentCreatedAt = existing.getDate("nameCreatedAt");
×
253
                            if (currentCreatedAt == null
×
254
                                    || (introCreatedAt != null && introCreatedAt.after(currentCreatedAt))) {
×
255
                                set(s, "accounts_loading", existing
×
256
                                        .append("name", introName)
×
257
                                        .append("nameCreatedAt", introCreatedAt));
×
258
                            }
259
                        }
260
                    }
×
261

262
                    set(s, "endorsements_loading", d.append("status", retrieved.getValue()));
×
263
                } else {
×
264
                    set(s, "endorsements_loading", d.append("status", discarded.getValue()));
×
265
                }
266
            }
×
267
            schedule(s, EXPAND_TRUST_PATHS.with("depth", depth));
×
268
        }
×
269

270
        // At the end of this step, the key declarations in the agent
271
        // introductions are loaded and the corresponding trust edges
272
        // established:
273
        // ------------------------------------------------------------
274
        //
275
        //        o      ----endorses----> [intro]
276
        //   --> /#\  /o\___                o
277
        //       / \  \_/^^^ ---trusts---> /#\  /o\___
278
        //        (visited)                / \  \_/^^^
279
        //                                   (seen)
280
        //
281
        //   ========[X] trust path
282
        //
283
        // ------------------------------------------------------------
284
        // Only one trust edge per introduction is shown here, but
285
        // there can be several.
286

287
    },
288

289
    EXPAND_TRUST_PATHS {
33✔
290

291
        // DB read from: accounts, trustPaths, trustEdges
292
        // DB write to:  accounts, trustPaths
293

294
        public void run(ClientSession s, Document taskDoc) {
295

296
            int depth = taskDoc.getInteger("depth");
×
297

298
            while (true) {
299
                Document d = getOne(s, "accounts_loading",
×
300
                        new Document("status", visited.getValue())
×
301
                                .append("depth", depth - 1)
×
302
                );
303
                if (d == null) break;
×
304

305
                String agentId = d.getString("agent");
×
306
                Validate.notNull(agentId);
×
307
                String pubkeyHash = d.getString("pubkey");
×
308
                Validate.notNull(pubkeyHash);
×
309

310
                Document trustPath = collection("trustPaths_loading").find(s,
×
311
                        new Document("agent", agentId).append("pubkey", pubkeyHash).append("type", "extended").append("depth", depth - 1)
×
312
                ).sort(orderBy(descending("ratio"), ascending("sorthash"))).first();
×
313

314
                if (trustPath == null) {
×
315
                    // Check it again in next iteration:
316
                    set(s, "accounts_loading", d.append("depth", depth));
×
317
                } else {
318
                    // Only first matching trust path is considered
319

320
                    Map<String, Document> newPaths = new HashMap<>();
×
321
                    Map<String, Set<String>> pubkeySets = new HashMap<>();
×
322
                    String currentSetting = getValue(s, Collection.SETTING.toString(), "current").toString();
×
323

324
                    try (MongoCursor<Document> edgeCursor = get(s, "trustEdges",
×
325
                            new Document("fromAgent", agentId)
326
                                    .append("fromPubkey", pubkeyHash)
×
327
                                    .append("invalidated", false)
×
328
                    )) {
329
                        while (edgeCursor.hasNext()) {
×
330
                            Document e = edgeCursor.next();
×
331

332
                            String agent = e.getString("toAgent");
×
333
                            Validate.notNull(agent);
×
334
                            String pubkey = e.getString("toPubkey");
×
335
                            Validate.notNull(pubkey);
×
336
                            String pathId = trustPath.getString("_id") + " " + agent + "|" + pubkey;
×
337
                            newPaths.put(pathId,
×
338
                                    new Document("_id", pathId)
339
                                            .append("sorthash", Utils.getHash(currentSetting + " " + pathId))
×
340
                                            .append("agent", agent)
×
341
                                            .append("pubkey", pubkey)
×
342
                                            .append("depth", depth)
×
343
                                            .append("type", "extended")
×
344
                            );
345
                            if (!pubkeySets.containsKey(agent)) pubkeySets.put(agent, new HashSet<>());
×
346
                            pubkeySets.get(agent).add(pubkey);
×
347
                        }
×
348
                    }
349
                    for (String pathId : newPaths.keySet()) {
×
350
                        Document pd = newPaths.get(pathId);
×
351
                        // first divide by agents; then for each agent, divide by number of pubkeys:
352
                        double newRatio = (trustPath.getDouble("ratio") * 0.9) / pubkeySets.size() / pubkeySets.get(pd.getString("agent")).size();
×
353
                        insert(s, "trustPaths_loading", pd.append("ratio", newRatio));
×
354
                    }
×
355
                    // Retain only 10% of the ratio — the other 90% was distributed to children
356
                    double retainedRatio = trustPath.getDouble("ratio") * 0.1;
×
357
                    set(s, "trustPaths_loading", trustPath.append("type", "primary").append("ratio", retainedRatio));
×
358
                    set(s, "accounts_loading", d.append("status", expanded.getValue()));
×
359
                }
360
            }
×
361
            schedule(s, LOAD_CORE.with("depth", depth).append("load-count", 0));
×
362

363
        }
×
364

365
        // At the end of this step, trust paths are updated to include
366
        // the new accounts:
367
        // ------------------------------------------------------------
368
        //
369
        //         o      ----endorses----> [intro]
370
        //    --> /#\  /o\___                o
371
        //        / \  \_/^^^ ---trusts---> /#\  /o\___
372
        //        (expanded)                / \  \_/^^^
373
        //                                    (seen)
374
        //
375
        //    ========[X]=====================[X+1] trust path
376
        //
377
        // ------------------------------------------------------------
378
        // Only one trust path is shown here, but they branch out if
379
        // several trust edges are present.
380

381
    },
382

383
    LOAD_CORE {
33✔
384

385
        // From here on, we refocus on the head of the trust paths:
386
        // ------------------------------------------------------------
387
        //
388
        //         o
389
        //    --> /#\  /o\___
390
        //        / \  \_/^^^
391
        //          (seen)
392
        //
393
        //    ========[X] trust path
394
        //
395
        // ------------------------------------------------------------
396

397
        // DB read from: accounts, trustPaths, endorsements, lists
398
        // DB write to:  accounts, endorsements, lists
399

400
        public void run(ClientSession s, Document taskDoc) {
401

402
            int depth = taskDoc.getInteger("depth");
×
403
            int loadCount = taskDoc.getInteger("load-count");
×
404

405
            Document agentAccount = getOne(s, "accounts_loading",
×
406
                    new Document("depth", depth).append("status", seen.getValue()));
×
407
            final String agentId;
408
            final String pubkeyHash;
409
            final Document trustPath;
410
            if (agentAccount != null) {
×
411
                agentId = agentAccount.getString("agent");
×
412
                Validate.notNull(agentId);
×
413
                pubkeyHash = agentAccount.getString("pubkey");
×
414
                Validate.notNull(pubkeyHash);
×
415
                trustPath = getOne(s, "trustPaths_loading",
×
416
                        new Document("depth", depth)
×
417
                                .append("agent", agentId)
×
418
                                .append("pubkey", pubkeyHash)
×
419
                );
420
            } else {
421
                agentId = null;
×
422
                pubkeyHash = null;
×
423
                trustPath = null;
×
424
            }
425

426
            if (agentAccount == null) {
×
427
                schedule(s, FINISH_ITERATION.with("depth", depth).append("load-count", loadCount));
×
428
            } else if (trustPath == null) {
×
429
                // Account was seen but has no trust path at this depth; skip it
430
                set(s, "accounts_loading", agentAccount.append("status", skipped.getValue()));
×
431
                schedule(s, LOAD_CORE.with("depth", depth).append("load-count", loadCount));
×
432
            } else if (trustPath.getDouble("ratio") < MIN_TRUST_PATH_RATIO) {
×
433
                set(s, "accounts_loading", agentAccount.append("status", skipped.getValue()));
×
434
                Document d = new Document("pubkey", pubkeyHash).append("type", INTRO_TYPE_HASH);
×
435
                if (!has(s, "lists", d)) {
×
436
                    insert(s, "lists", d.append("status", encountered.getValue()));
×
437
                }
438
                schedule(s, LOAD_CORE.with("depth", depth).append("load-count", loadCount + 1));
×
439
            } else {
×
440
                // TODO check intro limit
441
                Document introList = new Document()
×
442
                        .append("pubkey", pubkeyHash)
×
443
                        .append("type", INTRO_TYPE_HASH)
×
444
                        .append("status", loading.getValue());
×
445
                if (!has(s, "lists", new Document("pubkey", pubkeyHash).append("type", INTRO_TYPE_HASH))) {
×
446
                    insert(s, "lists", introList);
×
447
                }
448

449
                // No checksum skip in LOAD_CORE: the endorsement extraction logic (below) needs to
450
                // see every nanopub to populate endorsements_loading, which is rebuilt from scratch each UPDATE.
451
                try (var stream = NanopubLoader.retrieveNanopubsFromPeers(INTRO_TYPE_HASH, pubkeyHash)) {
×
452
                    NanopubLoader.loadStreamInParallel(stream, np -> {
×
453
                        try (ClientSession ws = RegistryDB.getClient().startSession()) {
×
454
                            loadNanopub(ws, np, pubkeyHash, INTRO_TYPE);
×
455
                        }
456
                    });
×
457
                }
458

459
                set(s, "lists", introList.append("status", loaded.getValue()));
×
460

461
                // TODO check endorsement limit
462
                Document endorseList = new Document()
×
463
                        .append("pubkey", pubkeyHash)
×
464
                        .append("type", ENDORSE_TYPE_HASH)
×
465
                        .append("status", loading.getValue());
×
466
                if (!has(s, "lists", new Document("pubkey", pubkeyHash).append("type", ENDORSE_TYPE_HASH))) {
×
467
                    insert(s, "lists", endorseList);
×
468
                }
469

470
                try (var stream = NanopubLoader.retrieveNanopubsFromPeers(ENDORSE_TYPE_HASH, pubkeyHash)) {
×
471
                    stream.forEach(m -> {
×
472
                        if (!m.isSuccess())
×
473
                            throw new AbortingTaskException("Failed to download nanopub; aborting task...");
×
474
                        Nanopub nanopub = m.getNanopub();
×
475
                        loadNanopub(s, nanopub, pubkeyHash, ENDORSE_TYPE);
×
476
                        String sourceNpId = TrustyUriUtils.getArtifactCode(nanopub.getUri().stringValue());
×
477
                        Validate.notNull(sourceNpId);
×
478
                        for (Statement st : nanopub.getAssertion()) {
×
479
                            if (!st.getPredicate().equals(Utils.APPROVES_OF)) continue;
×
480
                            if (!(st.getObject() instanceof IRI)) continue;
×
481
                            if (!agentId.equals(st.getSubject().stringValue())) continue;
×
482
                            String objStr = st.getObject().stringValue();
×
483
                            if (!TrustyUriUtils.isPotentialTrustyUri(objStr)) continue;
×
484
                            String endorsedNpId = TrustyUriUtils.getArtifactCode(objStr);
×
485
                            Validate.notNull(endorsedNpId);
×
486
                            Document endorsement = new Document("agent", agentId)
×
487
                                    .append("pubkey", pubkeyHash)
×
488
                                    .append("endorsedNanopub", endorsedNpId)
×
489
                                    .append("source", sourceNpId);
×
490
                            if (!has(s, "endorsements_loading", endorsement)) {
×
491
                                insert(s, "endorsements_loading",
×
492
                                        endorsement.append("status", toRetrieve.getValue()));
×
493
                            }
494
                        }
×
495
                    });
×
496
                }
497

498
                set(s, "lists", endorseList.append("status", loaded.getValue()));
×
499

500
                Document df = new Document("pubkey", pubkeyHash).append("type", "$");
×
501
                if (!has(s, "lists", df)) insert(s, "lists",
×
502
                        df.append("status", encountered.getValue()));
×
503

504
                set(s, "accounts_loading", agentAccount.append("status", visited.getValue()));
×
505

506
                schedule(s, LOAD_CORE.with("depth", depth).append("load-count", loadCount + 1));
×
507
            }
508

509
        }
×
510

511
        // At the end of this step, we have added new endorsement
512
        // links to yet-to-retrieve agent introductions:
513
        // ------------------------------------------------------------
514
        //
515
        //         o      ----endorses----> [intro]
516
        //    --> /#\  /o\___            (to-retrieve)
517
        //        / \  \_/^^^
518
        //         (visited)
519
        //
520
        //    ========[X] trust path
521
        //
522
        // ------------------------------------------------------------
523
        // Only one endorsement is shown here, but there are typically
524
        // several.
525

526
    },
527

528
    FINISH_ITERATION {
33✔
529
        public void run(ClientSession s, Document taskDoc) {
530

531
            int depth = taskDoc.getInteger("depth");
×
532
            int loadCount = taskDoc.getInteger("load-count");
×
533

534
            if (loadCount == 0) {
×
535
                log.info("No new cores loaded; finishing iteration");
×
536
                schedule(s, CALCULATE_TRUST_SCORES);
×
537
            } else if (depth == MAX_TRUST_PATH_DEPTH) {
×
538
                log.info("Maximum depth reached: {}", depth);
×
539
                schedule(s, CALCULATE_TRUST_SCORES);
×
540
            } else {
541
                log.info("Progressing iteration at depth {}", depth + 1);
×
542
                schedule(s, LOAD_DECLARATIONS.with("depth", depth + 1));
×
543
            }
544

545
        }
×
546

547
    },
548

549
    CALCULATE_TRUST_SCORES {
33✔
550

551
        // DB read from: accounts, trustPaths
552
        // DB write to:  accounts
553

554
        public void run(ClientSession s, Document taskDoc) {
555

556
            while (true) {
557
                Document d = getOne(s, "accounts_loading", new Document("status", expanded.getValue()));
×
558
                if (d == null) break;
×
559

560
                double ratio = 0.0;
×
561
                Map<String, Boolean> seenPathElements = new HashMap<>();
×
562
                int pathCount = 0;
×
563
                try (MongoCursor<Document> trustPaths = collection("trustPaths_loading").find(s,
×
564
                        new Document("agent", d.get("agent").toString()).append("pubkey", d.get("pubkey").toString())
×
565
                ).sort(orderBy(ascending("depth"), descending("ratio"), ascending("sorthash"))).cursor()) {
×
566
                    while (trustPaths.hasNext()) {
×
567
                        Document trustPath = trustPaths.next();
×
568
                        ratio += trustPath.getDouble("ratio");
×
569
                        boolean independentPath = true;
×
570
                        String[] pathElements = trustPath.getString("_id").split(" ");
×
571
                        // Iterate over path elements, ignoring first (root) and last (this agent/pubkey):
572
                        for (int i = 1; i < pathElements.length - 1; i++) {
×
573
                            String p = pathElements[i];
×
574
                            if (seenPathElements.containsKey(p)) {
×
575
                                independentPath = false;
×
576
                                break;
×
577
                            }
578
                            seenPathElements.put(p, true);
×
579
                        }
580
                        if (independentPath) pathCount += 1;
×
581
                    }
×
582
                }
583
                double rawQuota = GLOBAL_QUOTA * ratio;
×
584
                int quota = (int) rawQuota;
×
585
                if (rawQuota < MIN_USER_QUOTA) {
×
586
                    quota = MIN_USER_QUOTA;
×
587
                } else if (rawQuota > MAX_USER_QUOTA) {
×
588
                    quota = MAX_USER_QUOTA;
×
589
                }
590
                set(s, "accounts_loading",
×
591
                        d.append("status", processed.getValue())
×
592
                                .append("ratio", ratio)
×
593
                                .append("pathCount", pathCount)
×
594
                                .append("quota", quota)
×
595
                );
596
            }
×
597
            schedule(s, AGGREGATE_AGENTS);
×
598

599
        }
×
600

601
    },
602

603
    AGGREGATE_AGENTS {
33✔
604

605
        // DB read from: accounts, agents
606
        // DB write to:  accounts, agents
607

608
        public void run(ClientSession s, Document taskDoc) {
609

610
            while (true) {
611
                Document a = getOne(s, "accounts_loading", new Document("status", processed.getValue()));
×
612
                if (a == null) break;
×
613

614
                Document agentId = new Document("agent", a.get("agent").toString()).append("status", processed.getValue());
×
615
                int count = 0;
×
616
                int pathCountSum = 0;
×
617
                double totalRatio = 0.0d;
×
618
                // Canonical-name resolution across the agent's approved keys: pick the
619
                // row with MAX(ratio); ties broken on lex-min name for determinism.
620
                // Per-(agent, pubkey) name was already chosen by LOAD_ENDORSEMENTS as
621
                // "latest declaring intro wins"; this layer just folds across keys.
622
                String chosenName = null;
×
623
                double chosenRatio = Double.NEGATIVE_INFINITY;
×
624
                try (MongoCursor<Document> agentAccounts = collection("accounts_loading").find(s, agentId).cursor()) {
×
625
                    while (agentAccounts.hasNext()) {
×
626
                        Document d = agentAccounts.next();
×
627
                        count++;
×
628
                        pathCountSum += d.getInteger("pathCount");
×
629
                        double r = d.getDouble("ratio");
×
630
                        totalRatio += r;
×
631
                        String n = d.getString("name");
×
632
                        if (n != null && (r > chosenRatio
×
633
                                || (r == chosenRatio && (chosenName == null || n.compareTo(chosenName) < 0)))) {
×
634
                            chosenName = n;
×
635
                            chosenRatio = r;
×
636
                        }
637
                    }
×
638
                }
639
                collection("accounts_loading").updateMany(s, agentId, new Document("$set",
×
640
                        new DbEntryWrapper(aggregated).getDocument()));
×
641
                insert(s, "agents_loading",
×
642
                        agentId.append("accountCount", count)
×
643
                                .append("avgPathCount", (double) pathCountSum / count)
×
644
                                .append("totalRatio", totalRatio)
×
645
                                .append("name", chosenName)
×
646
                );
647
            }
×
648
            schedule(s, ASSIGN_PUBKEYS);
×
649

650
        }
×
651

652
    },
653

654
    ASSIGN_PUBKEYS {
33✔
655

656
        // DB read from: accounts
657
        // DB write to:  accounts
658

659
        public void run(ClientSession s, Document taskDoc) {
660

661
            while (true) {
662
                Document a = getOne(s, "accounts_loading", new DbEntryWrapper(aggregated).getDocument());
×
663
                if (a == null) break;
×
664

665
                Document pubkeyId = new Document("pubkey", a.get("pubkey").toString());
×
666
                if (collection("accounts_loading").countDocuments(s, pubkeyId) == 1) {
×
667
                    collection("accounts_loading").updateMany(s, pubkeyId,
×
668
                            new Document("$set", new DbEntryWrapper(approved).getDocument()));
×
669
                } else {
670
                    // TODO At the moment all get marked as 'contested'; implement more nuanced algorithm
671
                    collection("accounts_loading").updateMany(s, pubkeyId, new Document("$set",
×
672
                            new DbEntryWrapper(contested).getDocument()));
×
673
                }
674
            }
×
675
            schedule(s, DETERMINE_UPDATES);
×
676

677
        }
×
678

679
    },
680

681
    DETERMINE_UPDATES {
33✔
682

683
        // DB read from: accounts
684
        // DB write to:  accounts
685

686
        public void run(ClientSession s, Document taskDoc) {
687

688
            // TODO Handle contested accounts properly:
689
            for (Document d : collection("accounts_loading").find(
×
690
                    new DbEntryWrapper(approved).getDocument())) {
×
691
                // TODO Consider quota too:
692
                Document accountId = new Document("agent", d.get("agent").toString()).append("pubkey", d.get("pubkey").toString());
×
693
                if (collection(Collection.ACCOUNTS.toString()) == null || !has(s, Collection.ACCOUNTS.toString(),
×
694
                        accountId.append("status", loaded.getValue()))) {
×
695
                    set(s, "accounts_loading", d.append("status", toLoad.getValue()));
×
696
                } else {
697
                    set(s, "accounts_loading", d.append("status", loaded.getValue()));
×
698
                }
699
            }
×
700
            schedule(s, FINALIZE_TRUST_STATE);
×
701

702
        }
×
703

704
    },
705

706
    FINALIZE_TRUST_STATE {
33✔
707
        // We do this is a separate task/transaction, because if we do it at the beginning of RELEASE_DATA, that task hangs and cannot
708
        // properly re-run (as some renaming outside of transactions will have taken place).
709
        public void run(ClientSession s, Document taskDoc) {
710
            String newTrustStateHash = RegistryDB.calculateTrustStateHash(s);
×
711
            String previousTrustStateHash = (String) getValue(s, Collection.SERVER_INFO.toString(), "trustStateHash");  // may be null
×
712
            setValue(s, Collection.SERVER_INFO.toString(), "lastTrustStateUpdate", ZonedDateTime.now().toString());
×
713

714
            schedule(s, RELEASE_DATA.with("newTrustStateHash", newTrustStateHash).append("previousTrustStateHash", previousTrustStateHash));
×
715
        }
×
716

717
    },
718

719
    RELEASE_DATA {
33✔
720

721
        private static final int TRUST_STATE_SNAPSHOT_RETENTION = 100;
722

723
        public void run(ClientSession s, Document taskDoc) {
724
            ServerStatus status = getServerStatus(s);
×
725

726
            String newTrustStateHash = taskDoc.get("newTrustStateHash").toString();
×
727
            String previousTrustStateHash = taskDoc.getString("previousTrustStateHash");  // may be null
×
728

729
            // Renaming collections is run outside of a transaction, but is idempotent operation, so can safely be retried if task fails:
730
            rename("accounts_loading", Collection.ACCOUNTS.toString());
×
731
            rename("trustPaths_loading", "trustPaths");
×
732
            rename("agents_loading", Collection.AGENTS.toString());
×
733
            rename("endorsements_loading", "endorsements");
×
734

735
            if (previousTrustStateHash == null || !previousTrustStateHash.equals(newTrustStateHash)) {
×
736
                increaseStateCounter(s);
×
737
                setValue(s, Collection.SERVER_INFO.toString(), "trustStateHash", newTrustStateHash);
×
738
                Object trustStateCounter = getValue(s, Collection.SERVER_INFO.toString(), "trustStateCounter");
×
739
                insert(s, "debug_trustPaths", new Document()
×
740
                        .append("trustStateTxt", DebugPage.getTrustPathsTxt(s))
×
741
                        .append("trustStateHash", newTrustStateHash)
×
742
                        .append("trustStateCounter", trustStateCounter)
×
743
                );
744

745
                // Structured hash-keyed snapshot for consumer mirroring (#107).
746
                // Reads the accounts collection just renamed from accounts_loading above (:697).
747
                List<Document> snapshotAccounts = new ArrayList<>();
×
748
                for (Document a : collection(Collection.ACCOUNTS.toString()).find(s)) {
×
749
                    String pubkey = a.getString("pubkey");
×
750
                    if ("$".equals(pubkey)) continue;
×
751
                    snapshotAccounts.add(new Document()
×
752
                            .append("pubkey", pubkey)
×
753
                            .append("agent", a.getString("agent"))
×
754
                            .append("name", a.getString("name"))
×
755
                            .append("nameCreatedAt", a.get("nameCreatedAt"))
×
756
                            .append("status", a.getString("status"))
×
757
                            .append("depth", a.get("depth"))
×
758
                            .append("pathCount", a.get("pathCount"))
×
759
                            .append("ratio", a.get("ratio"))
×
760
                            .append("quota", a.get("quota")));
×
761
                }
×
762
                Document snapshot = new Document()
×
763
                        .append("_id", newTrustStateHash)
×
764
                        .append("trustStateCounter", trustStateCounter)
×
765
                        .append("createdAt", ZonedDateTime.now().toString())
×
766
                        .append("accounts", snapshotAccounts);
×
767
                collection(Collection.TRUST_STATE_SNAPSHOTS.toString()).replaceOne(
×
768
                        s,
769
                        new Document("_id", newTrustStateHash),
770
                        snapshot,
771
                        new ReplaceOptions().upsert(true));
×
772

773
                // Prune beyond retention: collect _ids of snapshots past the Nth most recent, delete them.
774
                // trustStateCounter is monotonically increasing (see increaseStateCounter above), so ordering is well-defined.
775
                List<Object> toPrune = new ArrayList<>();
×
776
                try (MongoCursor<Document> stale = collection(Collection.TRUST_STATE_SNAPSHOTS.toString())
×
777
                        .find(s)
×
778
                        .sort(descending("trustStateCounter"))
×
779
                        .skip(TRUST_STATE_SNAPSHOT_RETENTION)
×
780
                        .projection(new Document("_id", 1))
×
781
                        .cursor()) {
×
782
                    while (stale.hasNext()) {
×
783
                        toPrune.add(stale.next().get("_id"));
×
784
                    }
785
                }
786
                if (!toPrune.isEmpty()) {
×
787
                    collection(Collection.TRUST_STATE_SNAPSHOTS.toString()).deleteMany(
×
788
                            s, new Document("_id", new Document("$in", toPrune)));
789
                }
790
            }
791

792
            if (status == coreLoading) {
×
793
                setServerStatus(s, coreReady);
×
794
            } else {
795
                setServerStatus(s, ready);
×
796
            }
797

798
            // Run update after 1h:
799
            schedule(s, UPDATE.withDelay(60 * 60 * 1000));
×
800
        }
×
801

802
    },
803

804
    UPDATE {
33✔
805
        public void run(ClientSession s, Document taskDoc) {
806
            ServerStatus status = getServerStatus(s);
×
807
            if (status == ready || status == coreReady) {
×
808
                setServerStatus(s, updating);
×
809
                schedule(s, INIT_COLLECTIONS);
×
810
            } else {
811
                log.info("Postponing update; currently in status {}", status);
×
812
                schedule(s, UPDATE.withDelay(10 * 60 * 1000));
×
813
            }
814

815
        }
×
816

817
    },
818

819
    LOAD_FULL {
33✔
820
        public void run(ClientSession s, Document taskDoc) {
821
            if ("false".equals(System.getenv("REGISTRY_PERFORM_FULL_LOAD"))) return;
15!
822

823
            ServerStatus status = getServerStatus(s);
9✔
824
            if (status != coreReady && status != ready && status != updating) {
27!
825
                log.info("Server currently not ready; checking again later");
9✔
826
                schedule(s, LOAD_FULL.withDelay(1000));
15✔
827
                return;
3✔
828
            }
829

830
            Document a = getOne(s, Collection.ACCOUNTS.toString(), new DbEntryWrapper(toLoad).getDocument());
×
831
            if (a == null) {
×
832
                log.info("Nothing to load");
×
833
                if (status == coreReady) {
×
834
                    log.info("Full load finished");
×
835
                    setServerStatus(s, ready);
×
836
                }
837
                log.info("Scheduling optional loading checks");
×
838
                schedule(s, RUN_OPTIONAL_LOAD.withDelay(100));
×
839
            } else {
840
                final String ph = a.getString("pubkey");
×
841
                boolean quotaReached = false;
×
842
                if (!ph.equals("$")) {
×
843
                    if (!AgentFilter.isAllowed(s, ph)) {
×
844
                        log.info("Skipping pubkey {} (not covered by agent filter)", ph);
×
845
                        set(s, Collection.ACCOUNTS.toString(), a.append("status", skipped.getValue()));
×
846
                        schedule(s, LOAD_FULL.withDelay(100));
×
847
                        return;
×
848
                    }
849
                    if (AgentFilter.isOverQuota(s, ph)) {
×
850
                        log.info("Skipping pubkey {} (quota exceeded)", ph);
×
851
                        quotaReached = true;
×
852
                    } else {
853
                        long startTime = System.nanoTime();
×
854
                        AtomicLong totalLoaded = new AtomicLong(0);
×
855

856
                        // Load per covered type (or "$" if no restriction) with checksum skip-ahead
857
                        for (String typeHash : getLoadTypeHashes(s, ph)) {
×
858
                            String checksums = buildChecksumFallbacks(s, ph, typeHash);
×
859
                            try (var stream = NanopubLoader.retrieveNanopubsFromPeers(typeHash, ph, checksums)) {
×
860
                                NanopubLoader.loadStreamInParallel(stream, np -> {
×
861
                                    if (!CoverageFilter.isCovered(np)) return;
×
862
                                    try (ClientSession ws = RegistryDB.getClient().startSession()) {
×
863
                                        if (!AgentFilter.isOverQuota(ws, ph)) {
×
864
                                            loadNanopub(ws, np, ph, "$");
×
865
                                            totalLoaded.incrementAndGet();
×
866
                                        }
867
                                    }
868
                                });
×
869
                            }
870
                        }
×
871

872
                        double timeSeconds = (System.nanoTime() - startTime) * 1e-9;
×
873
                        log.info("Loaded {} nanopubs in {}s, {} np/s",
×
874
                                totalLoaded.get(), timeSeconds, String.format("%.2f", totalLoaded.get() / timeSeconds));
×
875

876
                        if (AgentFilter.isOverQuota(s, ph)) {
×
877
                            quotaReached = true;
×
878
                        }
879
                    }
880
                }
881

882
                Document l = getOne(s, "lists", new Document().append("pubkey", ph).append("type", "$"));
×
883
                if (l != null) set(s, "lists", l.append("status", loaded.getValue()));
×
884
                EntryStatus accountStatus = quotaReached ? capped : loaded;
×
885
                int effectiveQuota = AgentFilter.getQuota(s, ph);
×
886
                if (effectiveQuota >= 0) {
×
887
                    a.append("quota", effectiveQuota);
×
888
                }
889
                set(s, Collection.ACCOUNTS.toString(), a.append("status", accountStatus.getValue()));
×
890

891
                schedule(s, LOAD_FULL.withDelay(100));
×
892
            }
893
        }
×
894

895
        @Override
896
        public boolean runAsTransaction() {
897
            // TODO Make this a transaction once we connect to other Nanopub Registry instances:
898
            return false;
×
899
        }
900

901
    },
902

903
    RUN_OPTIONAL_LOAD {
33✔
904

905
        private static final int BATCH_SIZE = Integer.parseInt(
15✔
906
                Utils.getEnv("REGISTRY_OPTIONAL_LOAD_BATCH_SIZE", "100"));
3✔
907

908
        public void run(ClientSession s, Document taskDoc) {
909
            if ("false".equals(System.getenv("REGISTRY_ENABLE_OPTIONAL_LOAD"))) {
×
910
                schedule(s, CHECK_NEW.withDelay(500));
×
911
                return;
×
912
            }
913

914
            AtomicLong totalLoaded = new AtomicLong(0);
×
915

916
            // Phase 1: Process encountered intro lists (core loading)
917
            while (totalLoaded.get() < BATCH_SIZE) {
×
918
                Document di = getOne(s, "lists", new Document("type", INTRO_TYPE_HASH).append("status", encountered.getValue()));
×
919
                if (di == null) break;
×
920

921
                final String pubkeyHash = di.getString("pubkey");
×
922
                Validate.notNull(pubkeyHash);
×
923
                log.info("Optional core loading: {}", pubkeyHash);
×
924

925
                String introChecksums = buildChecksumFallbacks(s, pubkeyHash, INTRO_TYPE_HASH);
×
926
                try (var stream = NanopubLoader.retrieveNanopubsFromPeers(INTRO_TYPE_HASH, pubkeyHash, introChecksums)) {
×
927
                    NanopubLoader.loadStreamInParallel(stream, np -> {
×
928
                        try (ClientSession ws = RegistryDB.getClient().startSession()) {
×
929
                            loadNanopub(ws, np, pubkeyHash, INTRO_TYPE);
×
930
                            totalLoaded.incrementAndGet();
×
931
                        }
932
                    });
×
933
                }
934
                set(s, "lists", di.append("status", loaded.getValue()));
×
935

936
                String endorseChecksums = buildChecksumFallbacks(s, pubkeyHash, ENDORSE_TYPE_HASH);
×
937
                try (var stream = NanopubLoader.retrieveNanopubsFromPeers(ENDORSE_TYPE_HASH, pubkeyHash, endorseChecksums)) {
×
938
                    NanopubLoader.loadStreamInParallel(stream, np -> {
×
939
                        try (ClientSession ws = RegistryDB.getClient().startSession()) {
×
940
                            loadNanopub(ws, np, pubkeyHash, ENDORSE_TYPE);
×
941
                            totalLoaded.incrementAndGet();
×
942
                        }
943
                    });
×
944
                }
945

946
                Document de = new Document("pubkey", pubkeyHash).append("type", ENDORSE_TYPE_HASH);
×
947
                if (has(s, "lists", de)) {
×
948
                    set(s, "lists", de.append("status", loaded.getValue()));
×
949
                } else {
950
                    insert(s, "lists", de.append("status", loaded.getValue()));
×
951
                }
952

953
                Document df = new Document("pubkey", pubkeyHash).append("type", "$");
×
954
                if (!has(s, "lists", df)) insert(s, "lists", df.append("status", encountered.getValue()));
×
955
            }
×
956

957
            // Phase 2: Process encountered full lists (if budget remains)
958
            while (totalLoaded.get() < BATCH_SIZE) {
×
959
                Document df = getOne(s, "lists", new Document("type", "$").append("status", encountered.getValue()));
×
960
                if (df == null) break;
×
961

962
                final String pubkeyHash = df.getString("pubkey");
×
963
                log.info("Optional full loading: {}", pubkeyHash);
×
964

965
                // Load per covered type (or "$" if no restriction) with checksum skip-ahead
966
                for (String typeHash : getLoadTypeHashes(s, pubkeyHash)) {
×
967
                    String checksums = buildChecksumFallbacks(s, pubkeyHash, typeHash);
×
968
                    try (var stream = NanopubLoader.retrieveNanopubsFromPeers(typeHash, pubkeyHash, checksums)) {
×
969
                        NanopubLoader.loadStreamInParallel(stream, np -> {
×
970
                            if (!CoverageFilter.isCovered(np)) return;
×
971
                            try (ClientSession ws = RegistryDB.getClient().startSession()) {
×
972
                                loadNanopub(ws, np, pubkeyHash, "$");
×
973
                                totalLoaded.incrementAndGet();
×
974
                            }
975
                        });
×
976
                    }
977
                }
×
978

979
                set(s, "lists", df.append("status", loaded.getValue()));
×
980

981
                // Backfill nanopubs stored locally during the transitional period (i.e. before
982
                // the $ list was loaded). Such nanopubs were stored in the nanopubs collection by
983
                // simpleLoad() but never added to listEntries; add them to the $ list now.
984
                log.info("Backfilling locally stored nanopubs for pubkey: {}", pubkeyHash);
×
985
                try (MongoCursor<Document> npCursor = collection(Collection.NANOPUBS.toString())
×
986
                        .find(s, new Document("pubkey", pubkeyHash)).cursor()) {
×
987
                    while (npCursor.hasNext()) {
×
988
                        String fullId = npCursor.next().getString("fullId");
×
989
                        if (fullId == null) continue;
×
990
                        try (ClientSession ws = RegistryDB.getClient().startSession()) {
×
991
                            Nanopub np = NanopubLoader.retrieveLocalNanopub(ws, fullId);
×
992
                            if (np != null && CoverageFilter.isCovered(np)) {
×
993
                                loadNanopub(ws, np, pubkeyHash, "$");
×
994
                                totalLoaded.incrementAndGet();
×
995
                            }
996
                        } catch (Exception ex) {
×
997
                            log.info("Error backfilling nanopub {}: {}", fullId, ex.getMessage());
×
998
                        }
×
999
                    }
×
1000
                }
1001
            }
×
1002

1003
            if (totalLoaded.get() > 0) {
×
1004
                log.info("Optional load batch completed: {} nanopubs across multiple pubkeys", totalLoaded.get());
×
1005
            }
1006

1007
            if (prioritizeAllPubkeys()) {
×
1008
                // Check if there are more pubkeys waiting to be processed
1009
                boolean moreWork = has(s, "lists", new Document("type", INTRO_TYPE_HASH).append("status", encountered.getValue()))
×
1010
                        || has(s, "lists", new Document("type", "$").append("status", encountered.getValue()));
×
1011
                if (moreWork) {
×
1012
                    // Continue processing without a full CHECK_NEW cycle in between.
1013
                    // CHECK_NEW will run naturally once all encountered lists are processed.
1014
                    schedule(s, RUN_OPTIONAL_LOAD.withDelay(10));
×
1015
                } else {
1016
                    schedule(s, CHECK_NEW.withDelay(500));
×
1017
                }
1018
            } else {
×
1019
                // Throttled: yield to CHECK_NEW after each batch to prioritize approved pubkeys
1020
                schedule(s, CHECK_NEW.withDelay(500));
×
1021
            }
1022
        }
×
1023

1024
    },
1025

1026
    CHECK_NEW {
33✔
1027
        public void run(ClientSession s, Document taskDoc) {
1028
            RegistryPeerConnector.checkPeers(s);
×
1029
            // Keep legacy connection during transition period:
1030
            LegacyConnector.checkForNewNanopubs(s);
×
1031
            // TODO Somehow throttle the loading of such potentially non-approved nanopubs
1032

1033
            schedule(s, LOAD_FULL.withDelay(100));
×
1034
        }
×
1035

1036
        @Override
1037
        public boolean runAsTransaction() {
1038
            // Peer sync includes long-running streaming fetches that would exceed
1039
            // MongoDB's transaction timeout; each operation is individually safe.
1040
            return false;
×
1041
        }
1042

1043
    };
1044

1045
    private static final Logger log = LoggerFactory.getLogger(Task.class);
9✔
1046

1047
    public abstract void run(ClientSession s, Document taskDoc) throws Exception;
1048

1049
    public boolean runAsTransaction() {
1050
        return true;
×
1051
    }
1052

1053
    Document asDocument() {
1054
        return withDelay(0L);
12✔
1055
    }
1056

1057
    private Document withDelay(long delay) {
1058
        // TODO Rename "not-before" to "notBefore" for consistency with other field names
1059
        return new Document()
15✔
1060
                .append("not-before", System.currentTimeMillis() + delay)
21✔
1061
                .append("action", name());
6✔
1062
    }
1063

1064
    private Document with(String key, Object value) {
1065
        return asDocument().append(key, value);
×
1066
    }
1067

1068
    private static boolean prioritizeAllPubkeys() {
1069
        return "true".equals(System.getenv("REGISTRY_PRIORITIZE_ALL_PUBKEYS"));
×
1070
    }
1071

1072
    /**
1073
     * Returns the type hashes to load for a given pubkey. When coverage is unrestricted,
1074
     * returns just "$" (all types in one request). When restricted, returns each covered
1075
     * type hash for per-type fetching with checksum skip-ahead.
1076
     *
1077
     * TODO: Fetching "$" from peers with type restrictions will only return their covered
1078
     * types, not all types. To get full coverage, we'd need to fetch per-type from such peers.
1079
     * Additionally, checksum-based skip-ahead won't work correctly against such peers, because
1080
     * their "$" list has different checksums due to the differing type subset. This means full
1081
     * re-downloads on every cycle. Per-type fetching would solve both issues.
1082
     */
1083
    private static java.util.List<String> getLoadTypeHashes(ClientSession s, String pubkeyHash) {
1084
        if (CoverageFilter.coversAllTypes()) {
×
1085
            return java.util.List.of("$");
×
1086
        }
1087
        return java.util.List.copyOf(CoverageFilter.getCoveredTypeHashes());
×
1088
    }
1089

1090
    // TODO Move these to setting:
1091
    private static final int MAX_TRUST_PATH_DEPTH = 10;
1092
    private static final double MIN_TRUST_PATH_RATIO = 0.00000001;
1093
    //private static final double MIN_TRUST_PATH_RATIO = 0.01; // For testing
1094
    private static final int GLOBAL_QUOTA = Integer.parseInt(
12✔
1095
            Utils.getEnv("REGISTRY_GLOBAL_QUOTA", "1000000000"));
3✔
1096
    private static final int MIN_USER_QUOTA = Integer.parseInt(
12✔
1097
            Utils.getEnv("REGISTRY_MIN_USER_QUOTA", "1000"));
3✔
1098
    private static final int MAX_USER_QUOTA = Integer.parseInt(
12✔
1099
            Utils.getEnv("REGISTRY_MAX_USER_QUOTA", "100000"));
3✔
1100

1101
    private static MongoCollection<Document> tasksCollection = collection(Collection.TASKS.toString());
15✔
1102

1103
    private static volatile String currentTaskName;
1104
    private static volatile long currentTaskStartTime;
1105

1106
    public static String getCurrentTaskName() {
1107
        return currentTaskName;
×
1108
    }
1109

1110
    public static long getCurrentTaskStartTime() {
1111
        return currentTaskStartTime;
×
1112
    }
1113

1114
    /**
1115
     * The super important base entry point!
1116
     */
1117
    static void runTasks() {
1118
        try (ClientSession s = RegistryDB.getClient().startSession()) {
×
1119
            if (!RegistryDB.isInitialized(s)) {
×
1120
                schedule(s, INIT_DB); // does not yet execute, only schedules
×
1121
            }
1122

1123
            while (true) {
1124
                FindIterable<Document> taskResult = tasksCollection.find(s).sort(ascending("not-before"));
×
1125
                Document taskDoc = taskResult.first();
×
1126
                long sleepTime = 10;
×
1127
                if (taskDoc != null && taskDoc.getLong("not-before") < System.currentTimeMillis()) {
×
1128
                    Task task = valueOf(taskDoc.getString("action"));
×
1129
                    log.info("Running task: {}", task.name());
×
1130
                    if (task.runAsTransaction()) {
×
1131
                        try {
1132
                            s.startTransaction();
×
1133
                            log.info("Transaction started");
×
1134
                            runTask(task, taskDoc);
×
1135
                            s.commitTransaction();
×
1136
                            log.info("Transaction committed");
×
1137
                        } catch (Exception ex) {
×
1138
                            log.info("Aborting transaction", ex);
×
1139
                            abortTransaction(s, ex.getMessage());
×
1140
                            log.info("Transaction aborted");
×
1141
                            sleepTime = 1000;
×
1142
                        } finally {
1143
                            cleanTransactionWithRetry(s);
×
1144
                        }
×
1145
                    } else {
1146
                        try {
1147
                            runTask(task, taskDoc);
×
1148
                        } catch (Exception ex) {
×
1149
                            log.info("Transaction failed", ex);
×
1150
                        }
×
1151
                    }
1152
                }
1153
                try {
1154
                    Thread.sleep(sleepTime);
×
1155
                } catch (InterruptedException ex) {
×
1156
                    // ignore
1157
                }
×
1158
            }
×
1159
        }
1160
    }
1161

1162
    static void runTask(Task task, Document taskDoc) throws Exception {
1163
        try (ClientSession s = RegistryDB.getClient().startSession()) {
9✔
1164
            log.info("Executing task: {}", task.name());
15✔
1165
            currentTaskName = task.name();
9✔
1166
            currentTaskStartTime = System.currentTimeMillis();
6✔
1167
            task.run(s, taskDoc);
12✔
1168
            tasksCollection.deleteOne(s, eq("_id", taskDoc.get("_id")));
27✔
1169
            log.info("Task {} completed and removed from queue.", task.name());
15✔
1170
        } finally {
1171
            currentTaskName = null;
6✔
1172
        }
1173
    }
3✔
1174

1175
    public static void abortTransaction(ClientSession mongoSession, String message) {
1176
        boolean successful = false;
×
1177
        while (!successful) {
×
1178
            try {
1179
                if (mongoSession.hasActiveTransaction()) {
×
1180
                    mongoSession.abortTransaction();
×
1181
                }
1182
                successful = true;
×
1183
            } catch (Exception ex) {
×
1184
                log.info("Aborting transaction failed. ", ex);
×
1185
                try {
1186
                    Thread.sleep(1000);
×
1187
                } catch (InterruptedException iex) {
×
1188
                    // ignore
1189
                }
×
1190
            }
×
1191
        }
1192
    }
×
1193

1194
    public synchronized static void cleanTransactionWithRetry(ClientSession mongoSession) {
1195
        boolean successful = false;
×
1196
        while (!successful) {
×
1197
            try {
1198
                if (mongoSession.hasActiveTransaction()) {
×
1199
                    mongoSession.abortTransaction();
×
1200
                }
1201
                successful = true;
×
1202
            } catch (Exception ex) {
×
1203
                log.info("Cleaning transaction failed. ", ex);
×
1204
                try {
1205
                    Thread.sleep(1000);
×
1206
                } catch (InterruptedException iex) {
×
1207
                    // ignore
1208
                }
×
1209
            }
×
1210
        }
1211
    }
×
1212

1213
    private static IntroNanopub getAgentIntro(ClientSession mongoSession, String nanopubId) {
1214
        IntroNanopub agentIntro = new IntroNanopub(NanopubLoader.retrieveNanopub(mongoSession, nanopubId));
×
1215
        if (agentIntro.getUser() == null) return null;
×
1216
        loadNanopub(mongoSession, agentIntro.getNanopub());
×
1217
        return agentIntro;
×
1218
    }
1219

1220

1221
    private static void setServerStatus(ClientSession mongoSession, ServerStatus status) {
1222
        setValue(mongoSession, Collection.SERVER_INFO.toString(), "status", status.toString());
21✔
1223
    }
3✔
1224

1225
    private static ServerStatus getServerStatus(ClientSession mongoSession) {
1226
        Object status = getValue(mongoSession, Collection.SERVER_INFO.toString(), "status");
18✔
1227
        if (status == null) {
6!
1228
            throw new RuntimeException("Illegal DB state: serverInfo status unavailable");
×
1229
        }
1230
        return ServerStatus.valueOf(status.toString());
12✔
1231
    }
1232

1233
    private static void schedule(ClientSession mongoSession, Task task) {
1234
        schedule(mongoSession, task.asDocument());
12✔
1235
    }
3✔
1236

1237
    private static void schedule(ClientSession mongoSession, Document taskDoc) {
1238
        log.info("Scheduling task: {}", taskDoc.getString("action"));
18✔
1239
        tasksCollection.insertOne(mongoSession, taskDoc);
12✔
1240
    }
3✔
1241

1242
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc