• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

knowledgepixels / nanopub-registry / 24600266273

18 Apr 2026 07:55AM UTC coverage: 31.774% (-0.04%) from 31.816%
24600266273

push

github

web-flow
Merge pull request #111 from knowledgepixels/perf/trust-calculation-efficiency

perf: batch trust-state tasks and shorten post-core-load delays

278 of 984 branches covered (28.25%)

Branch coverage included in aggregate %.

836 of 2522 relevant lines covered (33.15%)

5.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

11.54
src/main/java/com/knowledgepixels/registry/Task.java
1
package com.knowledgepixels.registry;
2

3
import com.knowledgepixels.registry.db.IndexInitializer;
4
import com.mongodb.client.ClientSession;
5
import com.mongodb.client.FindIterable;
6
import com.mongodb.client.MongoCollection;
7
import com.mongodb.client.MongoCursor;
8
import com.mongodb.client.model.ReplaceOptions;
9
import net.trustyuri.TrustyUriUtils;
10
import org.apache.commons.lang.Validate;
11
import org.bson.Document;
12
import org.eclipse.rdf4j.model.IRI;
13
import org.eclipse.rdf4j.model.Statement;
14
import org.nanopub.Nanopub;
15
import org.nanopub.extra.index.IndexUtils;
16
import org.nanopub.extra.index.NanopubIndex;
17
import org.nanopub.extra.security.KeyDeclaration;
18
import org.nanopub.extra.setting.IntroNanopub;
19
import org.nanopub.extra.setting.NanopubSetting;
20
import org.slf4j.Logger;
21
import org.slf4j.LoggerFactory;
22

23
import java.io.Serializable;
24
import java.time.ZonedDateTime;
25
import java.util.*;
26
import java.util.concurrent.atomic.AtomicLong;
27

28
import static com.knowledgepixels.registry.EntryStatus.*;
29
import static com.knowledgepixels.registry.NanopubLoader.*;
30
import static com.knowledgepixels.registry.RegistryDB.*;
31
import static com.knowledgepixels.registry.ServerStatus.*;
32
import static com.mongodb.client.model.Filters.eq;
33
import static com.mongodb.client.model.Sorts.*;
34

35
public enum Task implements Serializable {
6✔
36

37
    INIT_DB {
33✔
38
        public void run(ClientSession s, Document taskDoc) {
39
            setServerStatus(s, launching);
9✔
40

41
            increaseStateCounter(s);
6✔
42
            if (RegistryDB.isInitialized(s)) {
9!
43
                throw new RuntimeException("DB already initialized");
×
44
            }
45
            setValue(s, Collection.SERVER_INFO.toString(), "setupId", Math.abs(Utils.getRandom().nextLong()));
27✔
46
            setValue(s, Collection.SERVER_INFO.toString(), "testInstance", "true".equals(System.getenv("REGISTRY_TEST_INSTANCE")));
30✔
47
            schedule(s, LOAD_CONFIG);
9✔
48
        }
3✔
49

50
    },
51

52
    LOAD_CONFIG {
33✔
53
        public void run(ClientSession s, Document taskDoc) {
54
            if (getServerStatus(s) != launching) {
12!
55
                throw new IllegalTaskStatusException("Illegal status for this task: " + getServerStatus(s));
×
56
            }
57

58
            if (System.getenv("REGISTRY_COVERAGE_TYPES") != null) {
9!
59
                setValue(s, Collection.SERVER_INFO.toString(), "coverageTypes", System.getenv("REGISTRY_COVERAGE_TYPES"));
×
60
            }
61
            if (System.getenv("REGISTRY_COVERAGE_AGENTS") != null) {
9!
62
                setValue(s, Collection.SERVER_INFO.toString(), "coverageAgents", System.getenv("REGISTRY_COVERAGE_AGENTS"));
×
63
            }
64
            schedule(s, LOAD_SETTING);
9✔
65
        }
3✔
66

67
    },
68

69
    LOAD_SETTING {
33✔
70
        public void run(ClientSession s, Document taskDoc) throws Exception {
71
            if (getServerStatus(s) != launching) {
12!
72
                throw new IllegalTaskStatusException("Illegal status for this task: " + getServerStatus(s));
×
73
            }
74

75
            NanopubSetting settingNp = Utils.getSetting();
6✔
76
            String settingId = TrustyUriUtils.getArtifactCode(settingNp.getNanopub().getUri().stringValue());
18✔
77
            setValue(s, Collection.SETTING.toString(), "original", settingId);
18✔
78
            setValue(s, Collection.SETTING.toString(), "current", settingId);
18✔
79
            loadNanopub(s, settingNp.getNanopub());
15✔
80
            List<Document> bootstrapServices = new ArrayList<>();
12✔
81
            for (IRI i : settingNp.getBootstrapServices()) {
33✔
82
                bootstrapServices.add(new Document("_id", i.stringValue()));
27✔
83
            }
3✔
84
            // potentially currently hardcoded in the nanopub lib
85
            setValue(s, Collection.SETTING.toString(), "bootstrap-services", bootstrapServices);
18✔
86

87
            if (!"false".equals(System.getenv("REGISTRY_PERFORM_FULL_LOAD"))) {
15!
88
                schedule(s, LOAD_FULL);
9✔
89
            }
90

91
            setServerStatus(s, coreLoading);
9✔
92
            schedule(s, INIT_COLLECTIONS);
9✔
93
        }
3✔
94

95
    },
96

97
    INIT_COLLECTIONS {
33✔
98

99
        // DB read from:
100
        // DB write to:  trustPaths, endorsements, accounts
101
        // This state is periodically executed
102

103
        public void run(ClientSession s, Document taskDoc) throws Exception {
104
            if (getServerStatus(s) != coreLoading && getServerStatus(s) != updating) {
×
105
                throw new IllegalTaskStatusException("Illegal status for this task: " + getServerStatus(s));
×
106
            }
107

108
            IndexInitializer.initLoadingCollections(s);
×
109

110
            if ("false".equals(System.getenv("REGISTRY_ENABLE_TRUST_CALCULATION"))) {
×
111
                log.info("Trust calculation disabled; skipping to FINALIZE_TRUST_STATE");
×
112
                for (Map.Entry<String, Integer> entry : AgentFilter.getExplicitPubkeys().entrySet()) {
×
113
                    String pubkeyHash = entry.getKey();
×
114
                    int quota = entry.getValue();
×
115
                    Document account = new Document("agent", "")
×
116
                            .append("pubkey", pubkeyHash)
×
117
                            .append("status", toLoad.getValue())
×
118
                            .append("depth", 0)
×
119
                            .append("quota", quota);
×
120
                    if (!has(s, "accounts_loading", new Document("pubkey", pubkeyHash))) {
×
121
                        insert(s, "accounts_loading", account);
×
122
                        log.info("Seeded explicit pubkey as account: {}", pubkeyHash);
×
123
                    }
124
                }
×
125
                schedule(s, FINALIZE_TRUST_STATE);
×
126
                return;
×
127
            }
128

129
            // since this may take long, we start with postfix "_loading"
130
            // and only at completion it's changed to trustPath, endorsements, accounts
131
            insert(s, "trustPaths_loading",
×
132
                    new Document("_id", "$")
133
                            .append("sorthash", "")
×
134
                            .append("agent", "$")
×
135
                            .append("pubkey", "$")
×
136
                            .append("depth", 0)
×
137
                            .append("ratio", 1.0d)
×
138
                            .append("type", "extended")
×
139
            );
140

141
            NanopubIndex agentIndex = IndexUtils.castToIndex(NanopubLoader.retrieveNanopub(s, Utils.getSetting().getAgentIntroCollection().stringValue()));
×
142
            loadNanopub(s, agentIndex);
×
143
            for (IRI el : agentIndex.getElements()) {
×
144
                String declarationAc = TrustyUriUtils.getArtifactCode(el.stringValue());
×
145
                Validate.notNull(declarationAc);
×
146

147
                insert(s, "endorsements_loading",
×
148
                        new Document("agent", "$")
149
                                .append("pubkey", "$")
×
150
                                .append("endorsedNanopub", declarationAc)
×
151
                                .append("source", getValue(s, Collection.SETTING.toString(), "current").toString())
×
152
                                .append("status", toRetrieve.getValue())
×
153

154
                );
155

156
            }
×
157
            insert(s, "accounts_loading",
×
158
                    new Document("agent", "$")
159
                            .append("pubkey", "$")
×
160
                            .append("status", visited.getValue())
×
161
                            .append("depth", 0)
×
162
            );
163

164
            log.info("Starting iteration at depth 0");
×
165
            schedule(s, LOAD_DECLARATIONS.with("depth", 1));
×
166
        }
×
167

168
        // At the end of this task, the base agent is initialized:
169
        // ------------------------------------------------------------
170
        //
171
        //              $$$$ ----endorses----> [intro]
172
        //              base                (to-retrieve)
173
        //              $$$$
174
        //            (visited)
175
        //
176
        //              [0] trust path
177
        //
178
        // ------------------------------------------------------------
179
        // Only one endorses-link to an introduction is shown here,
180
        // but there are typically several.
181

182
    },
183

184
    LOAD_DECLARATIONS {
33✔
185

186
        // In general, we have at this point accounts with
187
        // endorsement links to unvisited agent introductions:
188
        // ------------------------------------------------------------
189
        //
190
        //         o      ----endorses----> [intro]
191
        //    --> /#\  /o\___            (to-retrieve)
192
        //        / \  \_/^^^
193
        //         (visited)
194
        //
195
        //    ========[X] trust path
196
        //
197
        // ------------------------------------------------------------
198

199
        // DB read from: endorsements, trustEdges, accounts
200
        // DB write to:  endorsements, trustEdges, accounts
201

202
        public void run(ClientSession s, Document taskDoc) {
203

204
            int depth = taskDoc.getInteger("depth");
×
205

206
            while (true) {
207
                Document d = getOne(s, "endorsements_loading",
×
208
                        new DbEntryWrapper(toRetrieve).getDocument());
×
209
                if (d == null) break;
×
210

211
                IntroNanopub agentIntro = getAgentIntro(s, d.getString("endorsedNanopub"));
×
212
                if (agentIntro != null) {
×
213
                    String agentId = agentIntro.getUser().stringValue();
×
214

215
                    for (KeyDeclaration kd : agentIntro.getKeyDeclarations()) {
×
216
                        String sourceAgent = d.getString("agent");
×
217
                        Validate.notNull(sourceAgent);
×
218
                        String sourcePubkey = d.getString("pubkey");
×
219
                        Validate.notNull(sourcePubkey);
×
220
                        String sourceAc = d.getString("source");
×
221
                        Validate.notNull(sourceAc);
×
222
                        String agentPubkey = Utils.getHash(kd.getPublicKeyString());
×
223
                        Validate.notNull(agentPubkey);
×
224
                        Document trustEdge = new Document("fromAgent", sourceAgent)
×
225
                                .append("fromPubkey", sourcePubkey)
×
226
                                .append("toAgent", agentId)
×
227
                                .append("toPubkey", agentPubkey)
×
228
                                .append("source", sourceAc);
×
229
                        if (!has(s, "trustEdges", trustEdge)) {
×
230
                            boolean invalidated = has(s, "invalidations", new Document("invalidatedNp", sourceAc).append("invalidatingPubkey", sourcePubkey));
×
231
                            insert(s, "trustEdges", trustEdge.append("invalidated", invalidated));
×
232
                        }
233

234
                        Document agent = new Document("agent", agentId).append("pubkey", agentPubkey);
×
235
                        if (!has(s, "accounts_loading", agent)) {
×
236
                            insert(s, "accounts_loading", agent.append("status", seen.getValue()).append("depth", depth));
×
237
                        }
238
                    }
×
239

240
                    set(s, "endorsements_loading", d.append("status", retrieved.getValue()));
×
241
                } else {
×
242
                    set(s, "endorsements_loading", d.append("status", discarded.getValue()));
×
243
                }
244
            }
×
245
            schedule(s, EXPAND_TRUST_PATHS.with("depth", depth));
×
246
        }
×
247

248
        // At the end of this step, the key declarations in the agent
249
        // introductions are loaded and the corresponding trust edges
250
        // established:
251
        // ------------------------------------------------------------
252
        //
253
        //        o      ----endorses----> [intro]
254
        //   --> /#\  /o\___                o
255
        //       / \  \_/^^^ ---trusts---> /#\  /o\___
256
        //        (visited)                / \  \_/^^^
257
        //                                   (seen)
258
        //
259
        //   ========[X] trust path
260
        //
261
        // ------------------------------------------------------------
262
        // Only one trust edge per introduction is shown here, but
263
        // there can be several.
264

265
    },
266

267
    EXPAND_TRUST_PATHS {
33✔
268

269
        // DB read from: accounts, trustPaths, trustEdges
270
        // DB write to:  accounts, trustPaths
271

272
        public void run(ClientSession s, Document taskDoc) {
273

274
            int depth = taskDoc.getInteger("depth");
×
275

276
            while (true) {
277
                Document d = getOne(s, "accounts_loading",
×
278
                        new Document("status", visited.getValue())
×
279
                                .append("depth", depth - 1)
×
280
                );
281
                if (d == null) break;
×
282

283
                String agentId = d.getString("agent");
×
284
                Validate.notNull(agentId);
×
285
                String pubkeyHash = d.getString("pubkey");
×
286
                Validate.notNull(pubkeyHash);
×
287

288
                Document trustPath = collection("trustPaths_loading").find(s,
×
289
                        new Document("agent", agentId).append("pubkey", pubkeyHash).append("type", "extended").append("depth", depth - 1)
×
290
                ).sort(orderBy(descending("ratio"), ascending("sorthash"))).first();
×
291

292
                if (trustPath == null) {
×
293
                    // Check it again in next iteration:
294
                    set(s, "accounts_loading", d.append("depth", depth));
×
295
                } else {
296
                    // Only first matching trust path is considered
297

298
                    Map<String, Document> newPaths = new HashMap<>();
×
299
                    Map<String, Set<String>> pubkeySets = new HashMap<>();
×
300
                    String currentSetting = getValue(s, Collection.SETTING.toString(), "current").toString();
×
301

302
                    try (MongoCursor<Document> edgeCursor = get(s, "trustEdges",
×
303
                            new Document("fromAgent", agentId)
304
                                    .append("fromPubkey", pubkeyHash)
×
305
                                    .append("invalidated", false)
×
306
                    )) {
307
                        while (edgeCursor.hasNext()) {
×
308
                            Document e = edgeCursor.next();
×
309

310
                            String agent = e.getString("toAgent");
×
311
                            Validate.notNull(agent);
×
312
                            String pubkey = e.getString("toPubkey");
×
313
                            Validate.notNull(pubkey);
×
314
                            String pathId = trustPath.getString("_id") + " " + agent + "|" + pubkey;
×
315
                            newPaths.put(pathId,
×
316
                                    new Document("_id", pathId)
317
                                            .append("sorthash", Utils.getHash(currentSetting + " " + pathId))
×
318
                                            .append("agent", agent)
×
319
                                            .append("pubkey", pubkey)
×
320
                                            .append("depth", depth)
×
321
                                            .append("type", "extended")
×
322
                            );
323
                            if (!pubkeySets.containsKey(agent)) pubkeySets.put(agent, new HashSet<>());
×
324
                            pubkeySets.get(agent).add(pubkey);
×
325
                        }
×
326
                    }
327
                    for (String pathId : newPaths.keySet()) {
×
328
                        Document pd = newPaths.get(pathId);
×
329
                        // first divide by agents; then for each agent, divide by number of pubkeys:
330
                        double newRatio = (trustPath.getDouble("ratio") * 0.9) / pubkeySets.size() / pubkeySets.get(pd.getString("agent")).size();
×
331
                        insert(s, "trustPaths_loading", pd.append("ratio", newRatio));
×
332
                    }
×
333
                    // Retain only 10% of the ratio — the other 90% was distributed to children
334
                    double retainedRatio = trustPath.getDouble("ratio") * 0.1;
×
335
                    set(s, "trustPaths_loading", trustPath.append("type", "primary").append("ratio", retainedRatio));
×
336
                    set(s, "accounts_loading", d.append("status", expanded.getValue()));
×
337
                }
338
            }
×
339
            schedule(s, LOAD_CORE.with("depth", depth).append("load-count", 0));
×
340

341
        }
×
342

343
        // At the end of this step, trust paths are updated to include
344
        // the new accounts:
345
        // ------------------------------------------------------------
346
        //
347
        //         o      ----endorses----> [intro]
348
        //    --> /#\  /o\___                o
349
        //        / \  \_/^^^ ---trusts---> /#\  /o\___
350
        //        (expanded)                / \  \_/^^^
351
        //                                    (seen)
352
        //
353
        //    ========[X]=====================[X+1] trust path
354
        //
355
        // ------------------------------------------------------------
356
        // Only one trust path is shown here, but they branch out if
357
        // several trust edges are present.
358

359
    },
360

361
    LOAD_CORE {
33✔
362

363
        // From here on, we refocus on the head of the trust paths:
364
        // ------------------------------------------------------------
365
        //
366
        //         o
367
        //    --> /#\  /o\___
368
        //        / \  \_/^^^
369
        //          (seen)
370
        //
371
        //    ========[X] trust path
372
        //
373
        // ------------------------------------------------------------
374

375
        // DB read from: accounts, trustPaths, endorsements, lists
376
        // DB write to:  accounts, endorsements, lists
377

378
        public void run(ClientSession s, Document taskDoc) {
379

380
            int depth = taskDoc.getInteger("depth");
×
381
            int loadCount = taskDoc.getInteger("load-count");
×
382

383
            Document agentAccount = getOne(s, "accounts_loading",
×
384
                    new Document("depth", depth).append("status", seen.getValue()));
×
385
            final String agentId;
386
            final String pubkeyHash;
387
            final Document trustPath;
388
            if (agentAccount != null) {
×
389
                agentId = agentAccount.getString("agent");
×
390
                Validate.notNull(agentId);
×
391
                pubkeyHash = agentAccount.getString("pubkey");
×
392
                Validate.notNull(pubkeyHash);
×
393
                trustPath = getOne(s, "trustPaths_loading",
×
394
                        new Document("depth", depth)
×
395
                                .append("agent", agentId)
×
396
                                .append("pubkey", pubkeyHash)
×
397
                );
398
            } else {
399
                agentId = null;
×
400
                pubkeyHash = null;
×
401
                trustPath = null;
×
402
            }
403

404
            if (agentAccount == null) {
×
405
                schedule(s, FINISH_ITERATION.with("depth", depth).append("load-count", loadCount));
×
406
            } else if (trustPath == null) {
×
407
                // Account was seen but has no trust path at this depth; skip it
408
                set(s, "accounts_loading", agentAccount.append("status", skipped.getValue()));
×
409
                schedule(s, LOAD_CORE.with("depth", depth).append("load-count", loadCount));
×
410
            } else if (trustPath.getDouble("ratio") < MIN_TRUST_PATH_RATIO) {
×
411
                set(s, "accounts_loading", agentAccount.append("status", skipped.getValue()));
×
412
                Document d = new Document("pubkey", pubkeyHash).append("type", INTRO_TYPE_HASH);
×
413
                if (!has(s, "lists", d)) {
×
414
                    insert(s, "lists", d.append("status", encountered.getValue()));
×
415
                }
416
                schedule(s, LOAD_CORE.with("depth", depth).append("load-count", loadCount + 1));
×
417
            } else {
×
418
                // TODO check intro limit
419
                Document introList = new Document()
×
420
                        .append("pubkey", pubkeyHash)
×
421
                        .append("type", INTRO_TYPE_HASH)
×
422
                        .append("status", loading.getValue());
×
423
                if (!has(s, "lists", new Document("pubkey", pubkeyHash).append("type", INTRO_TYPE_HASH))) {
×
424
                    insert(s, "lists", introList);
×
425
                }
426

427
                // No checksum skip in LOAD_CORE: the endorsement extraction logic (below) needs to
428
                // see every nanopub to populate endorsements_loading, which is rebuilt from scratch each UPDATE.
429
                try (var stream = NanopubLoader.retrieveNanopubsFromPeers(INTRO_TYPE_HASH, pubkeyHash)) {
×
430
                    NanopubLoader.loadStreamInParallel(stream, np -> {
×
431
                        try (ClientSession ws = RegistryDB.getClient().startSession()) {
×
432
                            loadNanopub(ws, np, pubkeyHash, INTRO_TYPE);
×
433
                        }
434
                    });
×
435
                }
436

437
                set(s, "lists", introList.append("status", loaded.getValue()));
×
438

439
                // TODO check endorsement limit
440
                Document endorseList = new Document()
×
441
                        .append("pubkey", pubkeyHash)
×
442
                        .append("type", ENDORSE_TYPE_HASH)
×
443
                        .append("status", loading.getValue());
×
444
                if (!has(s, "lists", new Document("pubkey", pubkeyHash).append("type", ENDORSE_TYPE_HASH))) {
×
445
                    insert(s, "lists", endorseList);
×
446
                }
447

448
                try (var stream = NanopubLoader.retrieveNanopubsFromPeers(ENDORSE_TYPE_HASH, pubkeyHash)) {
×
449
                    stream.forEach(m -> {
×
450
                        if (!m.isSuccess())
×
451
                            throw new AbortingTaskException("Failed to download nanopub; aborting task...");
×
452
                        Nanopub nanopub = m.getNanopub();
×
453
                        loadNanopub(s, nanopub, pubkeyHash, ENDORSE_TYPE);
×
454
                        String sourceNpId = TrustyUriUtils.getArtifactCode(nanopub.getUri().stringValue());
×
455
                        Validate.notNull(sourceNpId);
×
456
                        for (Statement st : nanopub.getAssertion()) {
×
457
                            if (!st.getPredicate().equals(Utils.APPROVES_OF)) continue;
×
458
                            if (!(st.getObject() instanceof IRI)) continue;
×
459
                            if (!agentId.equals(st.getSubject().stringValue())) continue;
×
460
                            String objStr = st.getObject().stringValue();
×
461
                            if (!TrustyUriUtils.isPotentialTrustyUri(objStr)) continue;
×
462
                            String endorsedNpId = TrustyUriUtils.getArtifactCode(objStr);
×
463
                            Validate.notNull(endorsedNpId);
×
464
                            Document endorsement = new Document("agent", agentId)
×
465
                                    .append("pubkey", pubkeyHash)
×
466
                                    .append("endorsedNanopub", endorsedNpId)
×
467
                                    .append("source", sourceNpId);
×
468
                            if (!has(s, "endorsements_loading", endorsement)) {
×
469
                                insert(s, "endorsements_loading",
×
470
                                        endorsement.append("status", toRetrieve.getValue()));
×
471
                            }
472
                        }
×
473
                    });
×
474
                }
475

476
                set(s, "lists", endorseList.append("status", loaded.getValue()));
×
477

478
                Document df = new Document("pubkey", pubkeyHash).append("type", "$");
×
479
                if (!has(s, "lists", df)) insert(s, "lists",
×
480
                        df.append("status", encountered.getValue()));
×
481

482
                set(s, "accounts_loading", agentAccount.append("status", visited.getValue()));
×
483

484
                schedule(s, LOAD_CORE.with("depth", depth).append("load-count", loadCount + 1));
×
485
            }
486

487
        }
×
488

489
        // At the end of this step, we have added new endorsement
490
        // links to yet-to-retrieve agent introductions:
491
        // ------------------------------------------------------------
492
        //
493
        //         o      ----endorses----> [intro]
494
        //    --> /#\  /o\___            (to-retrieve)
495
        //        / \  \_/^^^
496
        //         (visited)
497
        //
498
        //    ========[X] trust path
499
        //
500
        // ------------------------------------------------------------
501
        // Only one endorsement is shown here, but there are typically
502
        // several.
503

504
    },
505

506
    FINISH_ITERATION {
33✔
507
        public void run(ClientSession s, Document taskDoc) {
508

509
            int depth = taskDoc.getInteger("depth");
×
510
            int loadCount = taskDoc.getInteger("load-count");
×
511

512
            if (loadCount == 0) {
×
513
                log.info("No new cores loaded; finishing iteration");
×
514
                schedule(s, CALCULATE_TRUST_SCORES);
×
515
            } else if (depth == MAX_TRUST_PATH_DEPTH) {
×
516
                log.info("Maximum depth reached: {}", depth);
×
517
                schedule(s, CALCULATE_TRUST_SCORES);
×
518
            } else {
519
                log.info("Progressing iteration at depth {}", depth + 1);
×
520
                schedule(s, LOAD_DECLARATIONS.with("depth", depth + 1));
×
521
            }
522

523
        }
×
524

525
    },
526

527
    CALCULATE_TRUST_SCORES {
33✔
528

529
        // DB read from: accounts, trustPaths
530
        // DB write to:  accounts
531

532
        public void run(ClientSession s, Document taskDoc) {
533

534
            while (true) {
535
                Document d = getOne(s, "accounts_loading", new Document("status", expanded.getValue()));
×
536
                if (d == null) break;
×
537

538
                double ratio = 0.0;
×
539
                Map<String, Boolean> seenPathElements = new HashMap<>();
×
540
                int pathCount = 0;
×
541
                try (MongoCursor<Document> trustPaths = collection("trustPaths_loading").find(s,
×
542
                        new Document("agent", d.get("agent").toString()).append("pubkey", d.get("pubkey").toString())
×
543
                ).sort(orderBy(ascending("depth"), descending("ratio"), ascending("sorthash"))).cursor()) {
×
544
                    while (trustPaths.hasNext()) {
×
545
                        Document trustPath = trustPaths.next();
×
546
                        ratio += trustPath.getDouble("ratio");
×
547
                        boolean independentPath = true;
×
548
                        String[] pathElements = trustPath.getString("_id").split(" ");
×
549
                        // Iterate over path elements, ignoring first (root) and last (this agent/pubkey):
550
                        for (int i = 1; i < pathElements.length - 1; i++) {
×
551
                            String p = pathElements[i];
×
552
                            if (seenPathElements.containsKey(p)) {
×
553
                                independentPath = false;
×
554
                                break;
×
555
                            }
556
                            seenPathElements.put(p, true);
×
557
                        }
558
                        if (independentPath) pathCount += 1;
×
559
                    }
×
560
                }
561
                double rawQuota = GLOBAL_QUOTA * ratio;
×
562
                int quota = (int) rawQuota;
×
563
                if (rawQuota < MIN_USER_QUOTA) {
×
564
                    quota = MIN_USER_QUOTA;
×
565
                } else if (rawQuota > MAX_USER_QUOTA) {
×
566
                    quota = MAX_USER_QUOTA;
×
567
                }
568
                set(s, "accounts_loading",
×
569
                        d.append("status", processed.getValue())
×
570
                                .append("ratio", ratio)
×
571
                                .append("pathCount", pathCount)
×
572
                                .append("quota", quota)
×
573
                );
574
            }
×
575
            schedule(s, AGGREGATE_AGENTS);
×
576

577
        }
×
578

579
    },
580

581
    AGGREGATE_AGENTS {
33✔
582

583
        // DB read from: accounts, agents
584
        // DB write to:  accounts, agents
585

586
        public void run(ClientSession s, Document taskDoc) {
587

588
            while (true) {
589
                Document a = getOne(s, "accounts_loading", new Document("status", processed.getValue()));
×
590
                if (a == null) break;
×
591

592
                Document agentId = new Document("agent", a.get("agent").toString()).append("status", processed.getValue());
×
593
                int count = 0;
×
594
                int pathCountSum = 0;
×
595
                double totalRatio = 0.0d;
×
596
                try (MongoCursor<Document> agentAccounts = collection("accounts_loading").find(s, agentId).cursor()) {
×
597
                    while (agentAccounts.hasNext()) {
×
598
                        Document d = agentAccounts.next();
×
599
                        count++;
×
600
                        pathCountSum += d.getInteger("pathCount");
×
601
                        totalRatio += d.getDouble("ratio");
×
602
                    }
×
603
                }
604
                collection("accounts_loading").updateMany(s, agentId, new Document("$set",
×
605
                        new DbEntryWrapper(aggregated).getDocument()));
×
606
                insert(s, "agents_loading",
×
607
                        agentId.append("accountCount", count)
×
608
                                .append("avgPathCount", (double) pathCountSum / count)
×
609
                                .append("totalRatio", totalRatio)
×
610
                );
611
            }
×
612
            schedule(s, ASSIGN_PUBKEYS);
×
613

614
        }
×
615

616
    },
617

618
    ASSIGN_PUBKEYS {
33✔
619

620
        // DB read from: accounts
621
        // DB write to:  accounts
622

623
        public void run(ClientSession s, Document taskDoc) {
624

625
            while (true) {
626
                Document a = getOne(s, "accounts_loading", new DbEntryWrapper(aggregated).getDocument());
×
627
                if (a == null) break;
×
628

629
                Document pubkeyId = new Document("pubkey", a.get("pubkey").toString());
×
630
                if (collection("accounts_loading").countDocuments(s, pubkeyId) == 1) {
×
631
                    collection("accounts_loading").updateMany(s, pubkeyId,
×
632
                            new Document("$set", new DbEntryWrapper(approved).getDocument()));
×
633
                } else {
634
                    // TODO At the moment all get marked as 'contested'; implement more nuanced algorithm
635
                    collection("accounts_loading").updateMany(s, pubkeyId, new Document("$set",
×
636
                            new DbEntryWrapper(contested).getDocument()));
×
637
                }
638
            }
×
639
            schedule(s, DETERMINE_UPDATES);
×
640

641
        }
×
642

643
    },
644

645
    DETERMINE_UPDATES {
33✔
646

647
        // DB read from: accounts
648
        // DB write to:  accounts
649

650
        public void run(ClientSession s, Document taskDoc) {
651

652
            // TODO Handle contested accounts properly:
653
            for (Document d : collection("accounts_loading").find(
×
654
                    new DbEntryWrapper(approved).getDocument())) {
×
655
                // TODO Consider quota too:
656
                Document accountId = new Document("agent", d.get("agent").toString()).append("pubkey", d.get("pubkey").toString());
×
657
                if (collection(Collection.ACCOUNTS.toString()) == null || !has(s, Collection.ACCOUNTS.toString(),
×
658
                        accountId.append("status", loaded.getValue()))) {
×
659
                    set(s, "accounts_loading", d.append("status", toLoad.getValue()));
×
660
                } else {
661
                    set(s, "accounts_loading", d.append("status", loaded.getValue()));
×
662
                }
663
            }
×
664
            schedule(s, FINALIZE_TRUST_STATE);
×
665

666
        }
×
667

668
    },
669

670
    FINALIZE_TRUST_STATE {
33✔
671
        // We do this is a separate task/transaction, because if we do it at the beginning of RELEASE_DATA, that task hangs and cannot
672
        // properly re-run (as some renaming outside of transactions will have taken place).
673
        public void run(ClientSession s, Document taskDoc) {
674
            String newTrustStateHash = RegistryDB.calculateTrustStateHash(s);
×
675
            String previousTrustStateHash = (String) getValue(s, Collection.SERVER_INFO.toString(), "trustStateHash");  // may be null
×
676
            setValue(s, Collection.SERVER_INFO.toString(), "lastTrustStateUpdate", ZonedDateTime.now().toString());
×
677

678
            schedule(s, RELEASE_DATA.with("newTrustStateHash", newTrustStateHash).append("previousTrustStateHash", previousTrustStateHash));
×
679
        }
×
680

681
    },
682

683
    RELEASE_DATA {
33✔
684

685
        private static final int TRUST_STATE_SNAPSHOT_RETENTION = 100;
686

687
        public void run(ClientSession s, Document taskDoc) {
688
            ServerStatus status = getServerStatus(s);
×
689

690
            String newTrustStateHash = taskDoc.get("newTrustStateHash").toString();
×
691
            String previousTrustStateHash = taskDoc.getString("previousTrustStateHash");  // may be null
×
692

693
            // Renaming collections is run outside of a transaction, but is idempotent operation, so can safely be retried if task fails:
694
            rename("accounts_loading", Collection.ACCOUNTS.toString());
×
695
            rename("trustPaths_loading", "trustPaths");
×
696
            rename("agents_loading", Collection.AGENTS.toString());
×
697
            rename("endorsements_loading", "endorsements");
×
698

699
            if (previousTrustStateHash == null || !previousTrustStateHash.equals(newTrustStateHash)) {
×
700
                increaseStateCounter(s);
×
701
                setValue(s, Collection.SERVER_INFO.toString(), "trustStateHash", newTrustStateHash);
×
702
                Object trustStateCounter = getValue(s, Collection.SERVER_INFO.toString(), "trustStateCounter");
×
703
                insert(s, "debug_trustPaths", new Document()
×
704
                        .append("trustStateTxt", DebugPage.getTrustPathsTxt(s))
×
705
                        .append("trustStateHash", newTrustStateHash)
×
706
                        .append("trustStateCounter", trustStateCounter)
×
707
                );
708

709
                // Structured hash-keyed snapshot for consumer mirroring (#107).
710
                // Reads the accounts collection just renamed from accounts_loading above (:697).
711
                List<Document> snapshotAccounts = new ArrayList<>();
×
712
                for (Document a : collection(Collection.ACCOUNTS.toString()).find(s)) {
×
713
                    String pubkey = a.getString("pubkey");
×
714
                    if ("$".equals(pubkey)) continue;
×
715
                    snapshotAccounts.add(new Document()
×
716
                            .append("pubkey", pubkey)
×
717
                            .append("agent", a.getString("agent"))
×
718
                            .append("status", a.getString("status"))
×
719
                            .append("depth", a.get("depth"))
×
720
                            .append("pathCount", a.get("pathCount"))
×
721
                            .append("ratio", a.get("ratio"))
×
722
                            .append("quota", a.get("quota")));
×
723
                }
×
724
                Document snapshot = new Document()
×
725
                        .append("_id", newTrustStateHash)
×
726
                        .append("trustStateCounter", trustStateCounter)
×
727
                        .append("createdAt", ZonedDateTime.now().toString())
×
728
                        .append("accounts", snapshotAccounts);
×
729
                collection(Collection.TRUST_STATE_SNAPSHOTS.toString()).replaceOne(
×
730
                        s,
731
                        new Document("_id", newTrustStateHash),
732
                        snapshot,
733
                        new ReplaceOptions().upsert(true));
×
734

735
                // Prune beyond retention: collect _ids of snapshots past the Nth most recent, delete them.
736
                // trustStateCounter is monotonically increasing (see increaseStateCounter above), so ordering is well-defined.
737
                List<Object> toPrune = new ArrayList<>();
×
738
                try (MongoCursor<Document> stale = collection(Collection.TRUST_STATE_SNAPSHOTS.toString())
×
739
                        .find(s)
×
740
                        .sort(descending("trustStateCounter"))
×
741
                        .skip(TRUST_STATE_SNAPSHOT_RETENTION)
×
742
                        .projection(new Document("_id", 1))
×
743
                        .cursor()) {
×
744
                    while (stale.hasNext()) {
×
745
                        toPrune.add(stale.next().get("_id"));
×
746
                    }
747
                }
748
                if (!toPrune.isEmpty()) {
×
749
                    collection(Collection.TRUST_STATE_SNAPSHOTS.toString()).deleteMany(
×
750
                            s, new Document("_id", new Document("$in", toPrune)));
751
                }
752
            }
753

754
            if (status == coreLoading) {
×
755
                setServerStatus(s, coreReady);
×
756
            } else {
757
                setServerStatus(s, ready);
×
758
            }
759

760
            // Run update after 1h:
761
            schedule(s, UPDATE.withDelay(60 * 60 * 1000));
×
762
        }
×
763

764
    },
765

766
    UPDATE {
33✔
767
        public void run(ClientSession s, Document taskDoc) {
768
            ServerStatus status = getServerStatus(s);
×
769
            if (status == ready || status == coreReady) {
×
770
                setServerStatus(s, updating);
×
771
                schedule(s, INIT_COLLECTIONS);
×
772
            } else {
773
                log.info("Postponing update; currently in status {}", status);
×
774
                schedule(s, UPDATE.withDelay(10 * 60 * 1000));
×
775
            }
776

777
        }
×
778

779
    },
780

781
    LOAD_FULL {
33✔
782
        public void run(ClientSession s, Document taskDoc) {
783
            if ("false".equals(System.getenv("REGISTRY_PERFORM_FULL_LOAD"))) return;
15!
784

785
            ServerStatus status = getServerStatus(s);
9✔
786
            if (status != coreReady && status != ready && status != updating) {
27!
787
                log.info("Server currently not ready; checking again later");
9✔
788
                schedule(s, LOAD_FULL.withDelay(1000));
15✔
789
                return;
3✔
790
            }
791

792
            Document a = getOne(s, Collection.ACCOUNTS.toString(), new DbEntryWrapper(toLoad).getDocument());
×
793
            if (a == null) {
×
794
                log.info("Nothing to load");
×
795
                if (status == coreReady) {
×
796
                    log.info("Full load finished");
×
797
                    setServerStatus(s, ready);
×
798
                }
799
                log.info("Scheduling optional loading checks");
×
800
                schedule(s, RUN_OPTIONAL_LOAD.withDelay(100));
×
801
            } else {
802
                final String ph = a.getString("pubkey");
×
803
                boolean quotaReached = false;
×
804
                if (!ph.equals("$")) {
×
805
                    if (!AgentFilter.isAllowed(s, ph)) {
×
806
                        log.info("Skipping pubkey {} (not covered by agent filter)", ph);
×
807
                        set(s, Collection.ACCOUNTS.toString(), a.append("status", skipped.getValue()));
×
808
                        schedule(s, LOAD_FULL.withDelay(100));
×
809
                        return;
×
810
                    }
811
                    if (AgentFilter.isOverQuota(s, ph)) {
×
812
                        log.info("Skipping pubkey {} (quota exceeded)", ph);
×
813
                        quotaReached = true;
×
814
                    } else {
815
                        long startTime = System.nanoTime();
×
816
                        AtomicLong totalLoaded = new AtomicLong(0);
×
817

818
                        // Load per covered type (or "$" if no restriction) with checksum skip-ahead
819
                        for (String typeHash : getLoadTypeHashes(s, ph)) {
×
820
                            String checksums = buildChecksumFallbacks(s, ph, typeHash);
×
821
                            try (var stream = NanopubLoader.retrieveNanopubsFromPeers(typeHash, ph, checksums)) {
×
822
                                NanopubLoader.loadStreamInParallel(stream, np -> {
×
823
                                    if (!CoverageFilter.isCovered(np)) return;
×
824
                                    try (ClientSession ws = RegistryDB.getClient().startSession()) {
×
825
                                        if (!AgentFilter.isOverQuota(ws, ph)) {
×
826
                                            loadNanopub(ws, np, ph, "$");
×
827
                                            totalLoaded.incrementAndGet();
×
828
                                        }
829
                                    }
830
                                });
×
831
                            }
832
                        }
×
833

834
                        double timeSeconds = (System.nanoTime() - startTime) * 1e-9;
×
835
                        log.info("Loaded {} nanopubs in {}s, {} np/s",
×
836
                                totalLoaded.get(), timeSeconds, String.format("%.2f", totalLoaded.get() / timeSeconds));
×
837

838
                        if (AgentFilter.isOverQuota(s, ph)) {
×
839
                            quotaReached = true;
×
840
                        }
841
                    }
842
                }
843

844
                Document l = getOne(s, "lists", new Document().append("pubkey", ph).append("type", "$"));
×
845
                if (l != null) set(s, "lists", l.append("status", loaded.getValue()));
×
846
                EntryStatus accountStatus = quotaReached ? capped : loaded;
×
847
                int effectiveQuota = AgentFilter.getQuota(s, ph);
×
848
                if (effectiveQuota >= 0) {
×
849
                    a.append("quota", effectiveQuota);
×
850
                }
851
                set(s, Collection.ACCOUNTS.toString(), a.append("status", accountStatus.getValue()));
×
852

853
                schedule(s, LOAD_FULL.withDelay(100));
×
854
            }
855
        }
×
856

857
        @Override
858
        public boolean runAsTransaction() {
859
            // TODO Make this a transaction once we connect to other Nanopub Registry instances:
860
            return false;
×
861
        }
862

863
    },
864

865
    RUN_OPTIONAL_LOAD {
33✔
866

867
        private static final int BATCH_SIZE = Integer.parseInt(
15✔
868
                Utils.getEnv("REGISTRY_OPTIONAL_LOAD_BATCH_SIZE", "100"));
3✔
869

870
        public void run(ClientSession s, Document taskDoc) {
871
            if ("false".equals(System.getenv("REGISTRY_ENABLE_OPTIONAL_LOAD"))) {
×
872
                schedule(s, CHECK_NEW.withDelay(500));
×
873
                return;
×
874
            }
875

876
            AtomicLong totalLoaded = new AtomicLong(0);
×
877

878
            // Phase 1: Process encountered intro lists (core loading)
879
            while (totalLoaded.get() < BATCH_SIZE) {
×
880
                Document di = getOne(s, "lists", new Document("type", INTRO_TYPE_HASH).append("status", encountered.getValue()));
×
881
                if (di == null) break;
×
882

883
                final String pubkeyHash = di.getString("pubkey");
×
884
                Validate.notNull(pubkeyHash);
×
885
                log.info("Optional core loading: {}", pubkeyHash);
×
886

887
                String introChecksums = buildChecksumFallbacks(s, pubkeyHash, INTRO_TYPE_HASH);
×
888
                try (var stream = NanopubLoader.retrieveNanopubsFromPeers(INTRO_TYPE_HASH, pubkeyHash, introChecksums)) {
×
889
                    NanopubLoader.loadStreamInParallel(stream, np -> {
×
890
                        try (ClientSession ws = RegistryDB.getClient().startSession()) {
×
891
                            loadNanopub(ws, np, pubkeyHash, INTRO_TYPE);
×
892
                            totalLoaded.incrementAndGet();
×
893
                        }
894
                    });
×
895
                }
896
                set(s, "lists", di.append("status", loaded.getValue()));
×
897

898
                String endorseChecksums = buildChecksumFallbacks(s, pubkeyHash, ENDORSE_TYPE_HASH);
×
899
                try (var stream = NanopubLoader.retrieveNanopubsFromPeers(ENDORSE_TYPE_HASH, pubkeyHash, endorseChecksums)) {
×
900
                    NanopubLoader.loadStreamInParallel(stream, np -> {
×
901
                        try (ClientSession ws = RegistryDB.getClient().startSession()) {
×
902
                            loadNanopub(ws, np, pubkeyHash, ENDORSE_TYPE);
×
903
                            totalLoaded.incrementAndGet();
×
904
                        }
905
                    });
×
906
                }
907

908
                Document de = new Document("pubkey", pubkeyHash).append("type", ENDORSE_TYPE_HASH);
×
909
                if (has(s, "lists", de)) {
×
910
                    set(s, "lists", de.append("status", loaded.getValue()));
×
911
                } else {
912
                    insert(s, "lists", de.append("status", loaded.getValue()));
×
913
                }
914

915
                Document df = new Document("pubkey", pubkeyHash).append("type", "$");
×
916
                if (!has(s, "lists", df)) insert(s, "lists", df.append("status", encountered.getValue()));
×
917
            }
×
918

919
            // Phase 2: Process encountered full lists (if budget remains)
920
            while (totalLoaded.get() < BATCH_SIZE) {
×
921
                Document df = getOne(s, "lists", new Document("type", "$").append("status", encountered.getValue()));
×
922
                if (df == null) break;
×
923

924
                final String pubkeyHash = df.getString("pubkey");
×
925
                log.info("Optional full loading: {}", pubkeyHash);
×
926

927
                // Load per covered type (or "$" if no restriction) with checksum skip-ahead
928
                for (String typeHash : getLoadTypeHashes(s, pubkeyHash)) {
×
929
                    String checksums = buildChecksumFallbacks(s, pubkeyHash, typeHash);
×
930
                    try (var stream = NanopubLoader.retrieveNanopubsFromPeers(typeHash, pubkeyHash, checksums)) {
×
931
                        NanopubLoader.loadStreamInParallel(stream, np -> {
×
932
                            if (!CoverageFilter.isCovered(np)) return;
×
933
                            try (ClientSession ws = RegistryDB.getClient().startSession()) {
×
934
                                loadNanopub(ws, np, pubkeyHash, "$");
×
935
                                totalLoaded.incrementAndGet();
×
936
                            }
937
                        });
×
938
                    }
939
                }
×
940

941
                set(s, "lists", df.append("status", loaded.getValue()));
×
942

943
                // Backfill nanopubs stored locally during the transitional period (i.e. before
944
                // the $ list was loaded). Such nanopubs were stored in the nanopubs collection by
945
                // simpleLoad() but never added to listEntries; add them to the $ list now.
946
                log.info("Backfilling locally stored nanopubs for pubkey: {}", pubkeyHash);
×
947
                try (MongoCursor<Document> npCursor = collection(Collection.NANOPUBS.toString())
×
948
                        .find(s, new Document("pubkey", pubkeyHash)).cursor()) {
×
949
                    while (npCursor.hasNext()) {
×
950
                        String fullId = npCursor.next().getString("fullId");
×
951
                        if (fullId == null) continue;
×
952
                        try (ClientSession ws = RegistryDB.getClient().startSession()) {
×
953
                            Nanopub np = NanopubLoader.retrieveLocalNanopub(ws, fullId);
×
954
                            if (np != null && CoverageFilter.isCovered(np)) {
×
955
                                loadNanopub(ws, np, pubkeyHash, "$");
×
956
                                totalLoaded.incrementAndGet();
×
957
                            }
958
                        } catch (Exception ex) {
×
959
                            log.info("Error backfilling nanopub {}: {}", fullId, ex.getMessage());
×
960
                        }
×
961
                    }
×
962
                }
963
            }
×
964

965
            if (totalLoaded.get() > 0) {
×
966
                log.info("Optional load batch completed: {} nanopubs across multiple pubkeys", totalLoaded.get());
×
967
            }
968

969
            if (prioritizeAllPubkeys()) {
×
970
                // Check if there are more pubkeys waiting to be processed
971
                boolean moreWork = has(s, "lists", new Document("type", INTRO_TYPE_HASH).append("status", encountered.getValue()))
×
972
                        || has(s, "lists", new Document("type", "$").append("status", encountered.getValue()));
×
973
                if (moreWork) {
×
974
                    // Continue processing without a full CHECK_NEW cycle in between.
975
                    // CHECK_NEW will run naturally once all encountered lists are processed.
976
                    schedule(s, RUN_OPTIONAL_LOAD.withDelay(10));
×
977
                } else {
978
                    schedule(s, CHECK_NEW.withDelay(500));
×
979
                }
980
            } else {
×
981
                // Throttled: yield to CHECK_NEW after each batch to prioritize approved pubkeys
982
                schedule(s, CHECK_NEW.withDelay(500));
×
983
            }
984
        }
×
985

986
    },
987

988
    CHECK_NEW {
33✔
989
        public void run(ClientSession s, Document taskDoc) {
990
            RegistryPeerConnector.checkPeers(s);
×
991
            // Keep legacy connection during transition period:
992
            LegacyConnector.checkForNewNanopubs(s);
×
993
            // TODO Somehow throttle the loading of such potentially non-approved nanopubs
994

995
            schedule(s, LOAD_FULL.withDelay(100));
×
996
        }
×
997

998
        @Override
999
        public boolean runAsTransaction() {
1000
            // Peer sync includes long-running streaming fetches that would exceed
1001
            // MongoDB's transaction timeout; each operation is individually safe.
1002
            return false;
×
1003
        }
1004

1005
    };
1006

1007
    private static final Logger log = LoggerFactory.getLogger(Task.class);
9✔
1008

1009
    public abstract void run(ClientSession s, Document taskDoc) throws Exception;
1010

1011
    public boolean runAsTransaction() {
1012
        return true;
×
1013
    }
1014

1015
    Document asDocument() {
1016
        return withDelay(0L);
12✔
1017
    }
1018

1019
    private Document withDelay(long delay) {
1020
        // TODO Rename "not-before" to "notBefore" for consistency with other field names
1021
        return new Document()
15✔
1022
                .append("not-before", System.currentTimeMillis() + delay)
21✔
1023
                .append("action", name());
6✔
1024
    }
1025

1026
    private Document with(String key, Object value) {
1027
        return asDocument().append(key, value);
×
1028
    }
1029

1030
    private static boolean prioritizeAllPubkeys() {
1031
        return "true".equals(System.getenv("REGISTRY_PRIORITIZE_ALL_PUBKEYS"));
×
1032
    }
1033

1034
    /**
1035
     * Returns the type hashes to load for a given pubkey. When coverage is unrestricted,
1036
     * returns just "$" (all types in one request). When restricted, returns each covered
1037
     * type hash for per-type fetching with checksum skip-ahead.
1038
     *
1039
     * TODO: Fetching "$" from peers with type restrictions will only return their covered
1040
     * types, not all types. To get full coverage, we'd need to fetch per-type from such peers.
1041
     * Additionally, checksum-based skip-ahead won't work correctly against such peers, because
1042
     * their "$" list has different checksums due to the differing type subset. This means full
1043
     * re-downloads on every cycle. Per-type fetching would solve both issues.
1044
     */
1045
    private static java.util.List<String> getLoadTypeHashes(ClientSession s, String pubkeyHash) {
1046
        if (CoverageFilter.coversAllTypes()) {
×
1047
            return java.util.List.of("$");
×
1048
        }
1049
        return java.util.List.copyOf(CoverageFilter.getCoveredTypeHashes());
×
1050
    }
1051

1052
    // TODO Move these to setting:
1053
    private static final int MAX_TRUST_PATH_DEPTH = 10;
1054
    private static final double MIN_TRUST_PATH_RATIO = 0.00000001;
1055
    //private static final double MIN_TRUST_PATH_RATIO = 0.01; // For testing
1056
    private static final int GLOBAL_QUOTA = Integer.parseInt(
12✔
1057
            Utils.getEnv("REGISTRY_GLOBAL_QUOTA", "1000000000"));
3✔
1058
    private static final int MIN_USER_QUOTA = Integer.parseInt(
12✔
1059
            Utils.getEnv("REGISTRY_MIN_USER_QUOTA", "1000"));
3✔
1060
    private static final int MAX_USER_QUOTA = Integer.parseInt(
12✔
1061
            Utils.getEnv("REGISTRY_MAX_USER_QUOTA", "100000"));
3✔
1062

1063
    private static MongoCollection<Document> tasksCollection = collection(Collection.TASKS.toString());
15✔
1064

1065
    private static volatile String currentTaskName;
1066
    private static volatile long currentTaskStartTime;
1067

1068
    public static String getCurrentTaskName() {
1069
        return currentTaskName;
×
1070
    }
1071

1072
    public static long getCurrentTaskStartTime() {
1073
        return currentTaskStartTime;
×
1074
    }
1075

1076
    /**
1077
     * The super important base entry point!
1078
     */
1079
    static void runTasks() {
1080
        try (ClientSession s = RegistryDB.getClient().startSession()) {
×
1081
            if (!RegistryDB.isInitialized(s)) {
×
1082
                schedule(s, INIT_DB); // does not yet execute, only schedules
×
1083
            }
1084

1085
            while (true) {
1086
                FindIterable<Document> taskResult = tasksCollection.find(s).sort(ascending("not-before"));
×
1087
                Document taskDoc = taskResult.first();
×
1088
                long sleepTime = 10;
×
1089
                if (taskDoc != null && taskDoc.getLong("not-before") < System.currentTimeMillis()) {
×
1090
                    Task task = valueOf(taskDoc.getString("action"));
×
1091
                    log.info("Running task: {}", task.name());
×
1092
                    if (task.runAsTransaction()) {
×
1093
                        try {
1094
                            s.startTransaction();
×
1095
                            log.info("Transaction started");
×
1096
                            runTask(task, taskDoc);
×
1097
                            s.commitTransaction();
×
1098
                            log.info("Transaction committed");
×
1099
                        } catch (Exception ex) {
×
1100
                            log.info("Aborting transaction", ex);
×
1101
                            abortTransaction(s, ex.getMessage());
×
1102
                            log.info("Transaction aborted");
×
1103
                            sleepTime = 1000;
×
1104
                        } finally {
1105
                            cleanTransactionWithRetry(s);
×
1106
                        }
×
1107
                    } else {
1108
                        try {
1109
                            runTask(task, taskDoc);
×
1110
                        } catch (Exception ex) {
×
1111
                            log.info("Transaction failed", ex);
×
1112
                        }
×
1113
                    }
1114
                }
1115
                try {
1116
                    Thread.sleep(sleepTime);
×
1117
                } catch (InterruptedException ex) {
×
1118
                    // ignore
1119
                }
×
1120
            }
×
1121
        }
1122
    }
1123

1124
    static void runTask(Task task, Document taskDoc) throws Exception {
1125
        try (ClientSession s = RegistryDB.getClient().startSession()) {
9✔
1126
            log.info("Executing task: {}", task.name());
15✔
1127
            currentTaskName = task.name();
9✔
1128
            currentTaskStartTime = System.currentTimeMillis();
6✔
1129
            task.run(s, taskDoc);
12✔
1130
            tasksCollection.deleteOne(s, eq("_id", taskDoc.get("_id")));
27✔
1131
            log.info("Task {} completed and removed from queue.", task.name());
15✔
1132
        } finally {
1133
            currentTaskName = null;
6✔
1134
        }
1135
    }
3✔
1136

1137
    public static void abortTransaction(ClientSession mongoSession, String message) {
1138
        boolean successful = false;
×
1139
        while (!successful) {
×
1140
            try {
1141
                if (mongoSession.hasActiveTransaction()) {
×
1142
                    mongoSession.abortTransaction();
×
1143
                }
1144
                successful = true;
×
1145
            } catch (Exception ex) {
×
1146
                log.info("Aborting transaction failed. ", ex);
×
1147
                try {
1148
                    Thread.sleep(1000);
×
1149
                } catch (InterruptedException iex) {
×
1150
                    // ignore
1151
                }
×
1152
            }
×
1153
        }
1154
    }
×
1155

1156
    public synchronized static void cleanTransactionWithRetry(ClientSession mongoSession) {
1157
        boolean successful = false;
×
1158
        while (!successful) {
×
1159
            try {
1160
                if (mongoSession.hasActiveTransaction()) {
×
1161
                    mongoSession.abortTransaction();
×
1162
                }
1163
                successful = true;
×
1164
            } catch (Exception ex) {
×
1165
                log.info("Cleaning transaction failed. ", ex);
×
1166
                try {
1167
                    Thread.sleep(1000);
×
1168
                } catch (InterruptedException iex) {
×
1169
                    // ignore
1170
                }
×
1171
            }
×
1172
        }
1173
    }
×
1174

1175
    private static IntroNanopub getAgentIntro(ClientSession mongoSession, String nanopubId) {
1176
        IntroNanopub agentIntro = new IntroNanopub(NanopubLoader.retrieveNanopub(mongoSession, nanopubId));
×
1177
        if (agentIntro.getUser() == null) return null;
×
1178
        loadNanopub(mongoSession, agentIntro.getNanopub());
×
1179
        return agentIntro;
×
1180
    }
1181

1182
    private static void setServerStatus(ClientSession mongoSession, ServerStatus status) {
1183
        setValue(mongoSession, Collection.SERVER_INFO.toString(), "status", status.toString());
21✔
1184
    }
3✔
1185

1186
    private static ServerStatus getServerStatus(ClientSession mongoSession) {
1187
        Object status = getValue(mongoSession, Collection.SERVER_INFO.toString(), "status");
18✔
1188
        if (status == null) {
6!
1189
            throw new RuntimeException("Illegal DB state: serverInfo status unavailable");
×
1190
        }
1191
        return ServerStatus.valueOf(status.toString());
12✔
1192
    }
1193

1194
    private static void schedule(ClientSession mongoSession, Task task) {
1195
        schedule(mongoSession, task.asDocument());
12✔
1196
    }
3✔
1197

1198
    private static void schedule(ClientSession mongoSession, Document taskDoc) {
1199
        log.info("Scheduling task: {}", taskDoc.getString("action"));
18✔
1200
        tasksCollection.insertOne(mongoSession, taskDoc);
12✔
1201
    }
3✔
1202

1203
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc