• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

knowledgepixels / nanopub-query / 25001159634

27 Apr 2026 02:32PM UTC coverage: 56.911% (+0.3%) from 56.577%
25001159634

Pull #90

github

web-flow
Merge bf9d14c84 into 8f22e9f64
Pull Request #90: feat: materialize canonical foaf:name per agent into trust + spaces graphs (#62)

425 of 842 branches covered (50.48%)

Branch coverage included in aggregate %.

1189 of 1994 relevant lines covered (59.63%)

8.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

13.05
src/main/java/com/knowledgepixels/query/AuthorityResolver.java
1
package com.knowledgepixels.query;
2

3
import java.util.ArrayList;
4
import java.util.List;
5
import java.util.Optional;
6
import java.util.Set;
7

8
import org.eclipse.rdf4j.common.transaction.IsolationLevels;
9
import org.eclipse.rdf4j.model.IRI;
10
import org.eclipse.rdf4j.model.Statement;
11
import org.eclipse.rdf4j.model.Value;
12
import org.eclipse.rdf4j.model.ValueFactory;
13
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
14
import org.eclipse.rdf4j.model.vocabulary.FOAF;
15
import org.eclipse.rdf4j.model.vocabulary.RDF;
16
import org.eclipse.rdf4j.query.BindingSet;
17
import org.eclipse.rdf4j.query.QueryLanguage;
18
import org.eclipse.rdf4j.query.TupleQueryResult;
19
import org.eclipse.rdf4j.repository.RepositoryConnection;
20
import org.eclipse.rdf4j.repository.RepositoryResult;
21
import org.nanopub.vocabulary.NPA;
22
import org.slf4j.Logger;
23
import org.slf4j.LoggerFactory;
24

25
import com.knowledgepixels.query.vocabulary.GEN;
26
import com.knowledgepixels.query.vocabulary.NPAT;
27
import com.knowledgepixels.query.vocabulary.SpacesVocab;
28

29
/**
30
 * Drives the space-state materialization pipeline. Three entry points scheduled
31
 * by {@code MainVerticle}:
32
 * <ul>
33
 *   <li>{@link #tick()} — detects trust-state flips (full build) and otherwise
34
 *       advances the current space-state graph by an {@link #runIncrementalCycle
35
 *       incremental cycle} bounded by {@code (processedUpTo, currentLoadCounter]}.</li>
36
 *   <li>{@link #periodicRebuildTick()} — checks the {@code npa:needsFullRebuild}
37
 *       flag set by structural invalidations and re-runs the full build into a
38
 *       fresh graph, atomically flips the pointer, drops the old graph.</li>
39
 *   <li>{@link #cleanOrphans()} — startup cleanup of {@code npass:*} graphs the
40
 *       pointer isn't referencing.</li>
41
 * </ul>
42
 *
43
 * <p>Incremental cycle order: invalidation DELETEs (admin RI / RoleAssignment /
44
 * non-admin RI) → mirror-step delta is implicit (rebuilt only on full build) →
45
 * per-tier INSERTs (admin → attachment → maintainer → member → observer) →
46
 * late-arrival sweep (re-run downstream tiers without the load-number filter
47
 * iff this cycle added any structural rows). Sets {@code npa:needsFullRebuild}
48
 * when an admin RI / RoleAssignment / RoleDeclaration was invalidated; periodic
49
 * worker turns the flag into a from-scratch rebuild.
50
 *
51
 * <p>See {@code doc/design-space-repositories.md} — this implements the "Full
52
 * build", "Incremental cycle", and "Periodic full rebuild" procedures.
53
 */
54
public final class AuthorityResolver {
55

56
    private static final Logger log = LoggerFactory.getLogger(AuthorityResolver.class);
9✔
57

58
    private static final ValueFactory vf = SimpleValueFactory.getInstance();
6✔
59

60
    private static final String SPACES_REPO = "spaces";
61
    private static final String TRUST_REPO = "trust";
62

63
    /** NPA constants pulled in locally (trust-side). */
64
    private static final IRI NPA_HAS_CURRENT_TRUST_STATE =
9✔
65
            vf.createIRI(NPA.NAMESPACE, "hasCurrentTrustState");
6✔
66
    private static final IRI NPA_ACCOUNT_STATE = vf.createIRI(NPA.NAMESPACE, "AccountState");
15✔
67
    private static final IRI NPA_AGENT = vf.createIRI(NPA.NAMESPACE, "agent");
15✔
68
    private static final IRI NPA_PUBKEY = vf.createIRI(NPA.NAMESPACE, "pubkey");
15✔
69
    private static final IRI NPA_TRUST_STATUS = vf.createIRI(NPA.NAMESPACE, "trustStatus");
15✔
70
    private static final IRI NPA_LOADED = vf.createIRI(NPA.NAMESPACE, "loaded");
15✔
71
    private static final IRI NPA_TO_LOAD = vf.createIRI(NPA.NAMESPACE, "toLoad");
15✔
72

73
    /**
74
     * Trust-approved set: rows with one of these {@code npa:trustStatus} values
75
     * are mirrored into the space-state graph. Per
76
     * {@code doc/design-trust-state-repos.md}, these are the two "authority-
77
     * approving" statuses; {@code npa:contested}, {@code npa:skipped}, and the
78
     * transient statuses are distinct values of the same predicate and are
79
     * excluded automatically by this positive-list filter.
80
     */
81
    private static final Set<IRI> APPROVED_SET = Set.of(NPA_LOADED, NPA_TO_LOAD);
15✔
82

83
    private static AuthorityResolver instance;
84

85
    /** Returns the singleton. */
86
    public static synchronized AuthorityResolver get() {
87
        if (instance == null) {
6✔
88
            instance = new AuthorityResolver();
12✔
89
        }
90
        return instance;
6✔
91
    }
92

93
    private AuthorityResolver() {
6✔
94
    }
3✔
95

96
    // ---------------- Operational metrics snapshot ----------------
97
    //
98
    // Updated at the end of each runFullBuild / runIncrementalCycle, read by
99
    // MetricsCollector via the get*() accessors below. volatile is enough —
100
    // writers serialise via the synchronized methods, and readers (Prometheus
101
    // scrapes) only need most-recent visibility, not transactional consistency
102
    // across the snapshot. Defaults to zero values so a scrape that races a
103
    // boot before the first cycle returns 0, not NaN.
104

105
    private volatile TierSubjectTotals lastSubjectTotals = new TierSubjectTotals(0L, 0L, 0L);
24✔
106
    private volatile long lastInsertedTriplesTotal;
107
    private volatile long lastFullBuildDurationMs;
108
    private volatile long lastIncrementalCycleDurationMs;
109
    private volatile long lastProcessedUpToLag;
110

111
    public TierSubjectTotals getLastSubjectTotals() { return lastSubjectTotals; }
9✔
112
    public long getLastInsertedTriplesTotal() { return lastInsertedTriplesTotal; }
9✔
113
    public long getLastFullBuildDurationMs() { return lastFullBuildDurationMs; }
9✔
114
    public long getLastIncrementalCycleDurationMs() { return lastIncrementalCycleDurationMs; }
9✔
115
    public long getLastProcessedUpToLag() { return lastProcessedUpToLag; }
9✔
116

117
    // ---------------- Public entry points ----------------
118

119
    /**
120
     * Poll entry point. Behaviour:
121
     * <ul>
122
     *   <li>If no current space-state graph or the trust state has flipped → full build.</li>
123
     *   <li>Otherwise → {@link #runIncrementalCycle incremental cycle} on the load-number
124
     *       delta {@code (processedUpTo, currentLoadCounter]}. No-op if {@code
125
     *       processedUpTo == currentLoadCounter}.</li>
126
     * </ul>
127
     * Safe to call repeatedly on a schedule. Gated by {@link FeatureFlags#spacesEnabled()}.
128
     */
129
    public void tick() {
130
        if (!FeatureFlags.spacesEnabled()) return;
6!
131
        String trustStateHash = TrustStateRegistry.get().getCurrentHash().orElse(null);
18✔
132
        if (trustStateHash == null) {
6!
133
            log.debug("AuthorityResolver.tick: no current trust state yet — skipping");
9✔
134
            return;
3✔
135
        }
136
        IRI currentGraph = getCurrentSpaceStateGraph();
×
137
        String currentGraphName = (currentGraph == null) ? null
×
138
                : currentGraph.stringValue().substring(SpacesVocab.NPASS_NAMESPACE.length());
×
139
        if (currentGraphName == null || !currentGraphName.startsWith(trustStateHash + "_")) {
×
140
            log.info("AuthorityResolver.tick: trust-state flip detected (now {}); running full build",
×
141
                    abbrev(trustStateHash));
×
142
            runFullBuild(trustStateHash);
×
143
            return;
×
144
        }
145
        runIncrementalCycle(currentGraph);
×
146
    }
×
147

148
    /**
149
     * Periodic worker. If {@code npa:needsFullRebuild} was raised by an
150
     * incremental cycle's structural DELETE, runs a from-scratch rebuild into
151
     * a fresh space-state graph (using the current trust-state hash and load
152
     * counter) and clears the flag. No-op when the flag is not set. Safe to
153
     * call concurrently with {@link #tick()} when both are scheduled on the
154
     * same single-threaded executor.
155
     */
156
    public void periodicRebuildTick() {
157
        if (!FeatureFlags.spacesEnabled()) return;
×
158
        if (!readNeedsFullRebuild()) return;
×
159
        String trustStateHash = TrustStateRegistry.get().getCurrentHash().orElse(null);
×
160
        if (trustStateHash == null) {
×
161
            log.debug("AuthorityResolver.periodicRebuildTick: no current trust state — deferring");
×
162
            return;
×
163
        }
164
        log.info("AuthorityResolver.periodicRebuildTick: needsFullRebuild flag set; rebuilding");
×
165
        runFullBuild(trustStateHash);
×
166
        clearNeedsFullRebuild();
×
167
    }
×
168

169
    /**
170
     * Startup cleanup: drop any {@code npass:*} graph that the
171
     * {@code npa:hasCurrentSpaceState} pointer isn't pointing at. Orphans come
172
     * from crashes mid-build. Safe to call at any time; idempotent.
173
     */
174
    public void cleanOrphans() {
175
        if (!FeatureFlags.spacesEnabled()) return;
×
176
        IRI current = getCurrentSpaceStateGraph();
×
177
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
178
            int dropped = 0;
×
179
            try (RepositoryResult<org.eclipse.rdf4j.model.Resource> ctxs = conn.getContextIDs()) {
×
180
                List<IRI> toDrop = new ArrayList<>();
×
181
                while (ctxs.hasNext()) {
×
182
                    org.eclipse.rdf4j.model.Resource ctx = ctxs.next();
×
183
                    if (!(ctx instanceof IRI iri)) continue;
×
184
                    if (!iri.stringValue().startsWith(SpacesVocab.NPASS_NAMESPACE)) continue;
×
185
                    if (iri.equals(current)) continue;
×
186
                    toDrop.add(iri);
×
187
                }
×
188
                for (IRI iri : toDrop) {
×
189
                    conn.begin(IsolationLevels.SERIALIZABLE);
×
190
                    conn.clear(iri);
×
191
                    conn.commit();
×
192
                    dropped++;
×
193
                    log.info("AuthorityResolver.cleanOrphans: dropped orphan graph {}", iri);
×
194
                }
×
195
            }
196
            if (dropped == 0) {
×
197
                log.debug("AuthorityResolver.cleanOrphans: no orphan space-state graphs");
×
198
            }
199
        } catch (Exception ex) {
×
200
            log.info("AuthorityResolver.cleanOrphans: failed: {}", ex.toString());
×
201
        }
×
202
    }
×
203

204
    // ---------------- Full build ----------------
205

206
    /**
207
     * Mutex-protected full build of the space-state graph for the given trust
208
     * state. Captures {@code M = currentLoadCounter}, mirrors trust-approved
209
     * rows, (PR 2b: runs per-tier UPDATE loops from scratch), stamps
210
     * {@code processedUpTo = M}, flips the pointer, drops the previous graph.
211
     */
212
    synchronized void runFullBuild(String trustStateHash) {
213
        long startNanos = System.nanoTime();
×
214
        long loadCounter = getCurrentLoadCounter();
×
215
        IRI newGraph = SpacesVocab.forSpaceState(trustStateHash, loadCounter);
×
216
        IRI oldGraph = getCurrentSpaceStateGraph();
×
217
        if (newGraph.equals(oldGraph)) {
×
218
            log.debug("AuthorityResolver.runFullBuild: already current at {}", newGraph);
×
219
            return;
×
220
        }
221

222
        // 1. Mirror trust-approved rows into the new graph.
223
        int mirrored = mirrorTrustState(trustStateHash, newGraph);
×
224

225
        // 2. Per-tier UPDATE loops (from scratch: lastProcessed = -1 so the
226
        //    delta filter FILTER(?ln > ?lastProcessed) includes everything).
227
        TierInsertedTriples counts = runAllTierLoops(newGraph, -1);
×
228

229
        // 3. Stamp processedUpTo inside the new graph.
230
        writeProcessedUpTo(newGraph, loadCounter);
×
231

232
        // 4. Flip the current-space-state pointer.
233
        flipPointer(newGraph);
×
234

235
        // 5. Drop the old graph if one existed.
236
        if (oldGraph != null) {
×
237
            dropGraph(oldGraph);
×
238
        }
239

240
        TierSubjectTotals totals = computeTierSubjectTotals(newGraph);
×
241
        long durationMs = (System.nanoTime() - startNanos) / 1_000_000L;
×
242
        lastSubjectTotals = totals;
×
243
        lastInsertedTriplesTotal = (long) counts.admin + counts.attachment
×
244
                + counts.maintainer + counts.member + counts.observer;
245
        lastFullBuildDurationMs = durationMs;
×
246
        lastProcessedUpToLag = 0L;
×
247
        log.info("AuthorityResolver: full build complete — graph={} mirrored={} rows loadCounter={} "
×
248
                        + "subjects: adminRIs={} attachmentRAs={} nonAdminRIs={} "
249
                        + "(inserted-triples: admin={} attachment={} maintainer={} member={} observer={}) "
250
                        + "durationMs={}",
251
                newGraph, mirrored, loadCounter,
×
252
                totals.adminRIs(), totals.attachmentRAs(), totals.nonAdminRIs(),
×
253
                counts.admin, counts.attachment, counts.maintainer, counts.member, counts.observer,
×
254
                durationMs);
×
255
    }
×
256

257
    // ---------------- Incremental cycle ----------------
258

259
    /**
260
     * Single delta cycle on the current space-state graph. Bounded by
261
     * {@code (processedUpTo, currentLoadCounter]}; no-op if the range is empty.
262
     *
263
     * <p>Order:
264
     * <ol>
265
     *   <li>Apply invalidation DELETEs (admin RI, RoleAssignment, non-admin RI)
266
     *       and the RoleDeclaration ASK. Any DELETE on a structural kind sets
267
     *       {@code npa:needsFullRebuild} to bound the staleness from sticky
268
     *       downstream entries; the periodic worker turns that into a from-scratch
269
     *       rebuild on its next pass.</li>
270
     *   <li>Run per-tier INSERTs in the same order as the full build.</li>
271
     *   <li>Late-arrival sweep: if any structural row was added, re-run downstream
272
     *       tier INSERTs with {@code lastProcessed = -1} to catch candidates whose
273
     *       enabling event landed in this same cycle. Dedup filters protect
274
     *       against double-insert.</li>
275
     *   <li>Bump {@code processedUpTo} to {@code currentLoadCounter}.</li>
276
     * </ol>
277
     */
278
    synchronized void runIncrementalCycle(IRI graph) {
279
        long startNanos = System.nanoTime();
×
280
        long currentLoadCounter = getCurrentLoadCounter();
×
281
        long lastProcessed = readProcessedUpTo(graph);
×
282
        if (lastProcessed < 0) {
×
283
            log.warn("AuthorityResolver.runIncrementalCycle: missing processedUpTo on {}; skipping",
×
284
                    graph);
285
            return;
×
286
        }
287
        lastProcessedUpToLag = currentLoadCounter - lastProcessed;
×
288
        if (currentLoadCounter <= lastProcessed) {
×
289
            log.debug("AuthorityResolver.runIncrementalCycle: caught up at load {} on {}",
×
290
                    currentLoadCounter, graph);
×
291
            return;
×
292
        }
293

294
        boolean structuralInvalidation = applyInvalidations(graph, lastProcessed);
×
295
        TierInsertedTriples counts = runAllTierLoops(graph, lastProcessed);
×
296
        boolean structuralAdds = (counts.admin > 0)
×
297
                || (counts.attachment > 0)
298
                || newRoleDeclarationsArrived(lastProcessed);
×
299
        if (structuralAdds) {
×
300
            // Late-arrival sweep: only the leaf tiers (attachment/maintainer/member/observer)
301
            // can promote candidates whose enabling event arrived in this same cycle. Skip
302
            // the admin tier — its only enabling event is the admin grant itself, already
303
            // handled by the regular pass.
304
            TierInsertedTriples lateCounts = runDownstreamWithoutLoadFilter(graph);
×
305
            counts.attachment += lateCounts.attachment;
×
306
            counts.maintainer += lateCounts.maintainer;
×
307
            counts.member     += lateCounts.member;
×
308
            counts.observer   += lateCounts.observer;
×
309
        }
310

311
        writeProcessedUpTo(graph, currentLoadCounter);
×
312

313
        TierSubjectTotals totals = computeTierSubjectTotals(graph);
×
314
        long durationMs = (System.nanoTime() - startNanos) / 1_000_000L;
×
315
        lastSubjectTotals = totals;
×
316
        lastInsertedTriplesTotal = (long) counts.admin + counts.attachment
×
317
                + counts.maintainer + counts.member + counts.observer;
318
        lastIncrementalCycleDurationMs = durationMs;
×
319
        log.info("AuthorityResolver: incremental cycle complete — graph={} delta=({}, {}] "
×
320
                        + "subjects: adminRIs={} attachmentRAs={} nonAdminRIs={} "
321
                        + "(inserted-triples: admin={} attachment={} maintainer={} member={} observer={}) "
322
                        + "structuralInvalidation={} structuralAdds={} durationMs={}",
323
                graph, lastProcessed, currentLoadCounter,
×
324
                totals.adminRIs(), totals.attachmentRAs(), totals.nonAdminRIs(),
×
325
                counts.admin, counts.attachment, counts.maintainer, counts.member, counts.observer,
×
326
                structuralInvalidation, structuralAdds, durationMs);
×
327
    }
×
328

329
    /**
330
     * Runs the four invalidation-DELETE / ASK steps. Sets {@code npa:needsFullRebuild}
331
     * when admin-RI, RoleAssignment, or RoleDeclaration invalidations matched (the
332
     * three structural kinds). Leaf-tier RI deletes don't set the flag.
333
     *
334
     * @return true iff at least one structural kind was invalidated
335
     */
336
    boolean applyInvalidations(IRI graph, long lastProcessed) {
337
        boolean structural = false;
×
338
        if (wouldInvalidate(graph, lastProcessed, /*adminPinned=*/ true,
×
339
                            adminInvalidationCheckWhere(graph, lastProcessed))) {
×
340
            executeUpdate(adminInvalidationDelete(graph, lastProcessed));
×
341
            structural = true;
×
342
        }
343
        if (wouldInvalidate(graph, lastProcessed, /*adminPinned=*/ false,
×
344
                            roleAssignmentInvalidationCheckWhere(graph, lastProcessed))) {
×
345
            executeUpdate(roleAssignmentInvalidationDelete(graph, lastProcessed));
×
346
            structural = true;
×
347
        }
348
        // RoleDeclaration ASK only — RDs aren't materialized into the space-state
349
        // graph, so there's nothing to DELETE here. The flag still flips because
350
        // sticky downstream RIs derived from the now-invalidated RD need a
351
        // from-scratch recompute.
352
        if (wouldInvalidate(graph, lastProcessed, /*adminPinned=*/ false,
×
353
                            roleDeclarationInvalidationCheckWhere(lastProcessed))) {
×
354
            structural = true;
×
355
        }
356
        // Leaf-tier RI deletes — no flag.
357
        executeUpdate(leafTierInvalidationDelete(graph, lastProcessed));
×
358
        if (structural) setNeedsFullRebuild();
×
359
        return structural;
×
360
    }
361

362
    /**
363
     * Runs the four leaf tiers (attachment/maintainer/member/observer) with
364
     * {@code lastProcessed = -1} so the load-number filter on the candidate
365
     * side admits everything. Dedup filters in the tier templates prevent
366
     * double-insert. Used by the late-arrival sweep.
367
     */
368
    TierInsertedTriples runDownstreamWithoutLoadFilter(IRI graph) {
369
        TierInsertedTriples c = new TierInsertedTriples();
×
370
        c.attachment = runTierLabeled("attachment(late)", graph,
×
371
                attachmentValidationUpdate(graph, -1));
×
372
        c.maintainer = runTierLabeled("maintainer(late)", graph,
×
373
                nonAdminTierUpdate(graph, -1, GEN.MAINTAINER_ROLE, PUBLISHER_IS_ADMIN));
×
374
        c.member = runTierLabeled("member(admin-pub,late)", graph,
×
375
                nonAdminTierUpdate(graph, -1, GEN.MEMBER_ROLE, PUBLISHER_IS_ADMIN));
×
376
        c.member += runTierLabeled("member(maint-pub,late)", graph,
×
377
                nonAdminTierUpdate(graph, -1,
×
378
                        GEN.MEMBER_ROLE, publisherIsTieredRole(GEN.MAINTAINER_ROLE)));
×
379
        c.observer = runTierLabeled("observer(self,late)", graph,
×
380
                nonAdminTierUpdate(graph, -1, GEN.OBSERVER_ROLE, PUBLISHER_IS_SELF));
×
381
        return c;
×
382
    }
383

384
    /**
385
     * Cheap ASK: did any new {@code npa:RoleDeclaration} extraction land in the
386
     * load-number delta {@code (lastProcessed, ∞)}? Used by the late-arrival
387
     * trigger so an RD that arrives in the same cycle as a matching candidate
388
     * still gets validated.
389
     */
390
    boolean newRoleDeclarationsArrived(long lastProcessed) {
391
        String ask = String.format("""
×
392
                PREFIX npa: <%1$s>
393
                ASK {
394
                  GRAPH <%2$s> {
395
                    ?rd a npa:RoleDeclaration ;
396
                        npa:viaNanopub ?np .
397
                  }
398
                  GRAPH <%3$s> {
399
                    ?np npa:hasLoadNumber ?ln .
400
                    FILTER (?ln > %4$d)
401
                  }
402
                }
403
                """, NPA.NAMESPACE, SpacesVocab.SPACES_GRAPH, NPA.GRAPH, lastProcessed);
×
404
        return runAsk(ask);
×
405
    }
406

407
    // ---------------- Tier UPDATE loops ----------------
408

409
    /**
410
     * Per-tier inserted-triple tallies for one build or cycle. Counts the sum
411
     * of {@code (graphSize_after - graphSize_before)} across all iterations of
412
     * each tier's fixed-point INSERT loop — i.e. inserted *triples*, not
413
     * distinct subjects (a single RoleInstantiation insert writes 4–5 triples).
414
     *
415
     * <p>Used internally by the {@link #runIncrementalCycle structuralAdds}
416
     * boolean check (we only care whether any tier inserted at all).
417
     * Not what the log lines report: see {@link TierSubjectTotals} +
418
     * {@link #computeTierSubjectTotals} for the distinct-subject totals
419
     * surfaced to operators.
420
     */
421
    static final class TierInsertedTriples {
×
422
        int admin;
423
        int attachment;
424
        int maintainer;
425
        int member;
426
        int observer;
427
    }
428

429
    /**
430
     * Snapshot of distinct-subject totals in a space-state graph at a moment
431
     * in time. Independent of which tier-loop added each subject.
432
     */
433
    record TierSubjectTotals(long adminRIs, long attachmentRAs, long nonAdminRIs) {}
36✔
434

435
    /**
436
     * Runs the five tier loops in order: admin → {@code gen:hasRole} attachment
437
     * validation → maintainer → member → observer. Each loop iterates a SPARQL
438
     * INSERT to fixed point (no new triples added). Returns per-tier counts.
439
     *
440
     * @param graph         target space-state graph
441
     * @param lastProcessed load-number horizon; use {@code -1} for full build
442
     */
443
    TierInsertedTriples runAllTierLoops(IRI graph, long lastProcessed) {
444
        TierInsertedTriples c = new TierInsertedTriples();
×
445
        c.admin = runTierLabeled("admin", graph, adminTierUpdate(graph, lastProcessed));
×
446
        c.attachment = runTierLabeled("attachment", graph,
×
447
                attachmentValidationUpdate(graph, lastProcessed));
×
448
        c.maintainer = runTierLabeled("maintainer", graph, nonAdminTierUpdate(graph, lastProcessed,
×
449
                GEN.MAINTAINER_ROLE, PUBLISHER_IS_ADMIN));
450
        // Member tier: admin OR maintainer publisher — split into two simpler updates
451
        // so the query planner doesn't struggle with the UNION.
452
        c.member = runTierLabeled("member(admin-pub)", graph, nonAdminTierUpdate(graph, lastProcessed,
×
453
                GEN.MEMBER_ROLE, PUBLISHER_IS_ADMIN));
454
        c.member += runTierLabeled("member(maint-pub)", graph, nonAdminTierUpdate(graph, lastProcessed,
×
455
                GEN.MEMBER_ROLE, publisherIsTieredRole(GEN.MAINTAINER_ROLE)));
×
456
        // Observer tier: self-evidence only per the plan's policy table
457
        // (gen:ObserverRole = self). Authority-publisher sub-tiers were overreach;
458
        // the three of them have been removed, so an observer instantiation is
459
        // validated iff the assignee's own pubkey signed it.
460
        c.observer = runTierLabeled("observer(self)", graph, nonAdminTierUpdate(graph, lastProcessed,
×
461
                GEN.OBSERVER_ROLE, PUBLISHER_IS_SELF));
462
        return c;
×
463
    }
464

465
    /**
466
     * Builds a publisher constraint requiring the publisher to be a validated holder
467
     * of the given tier's role (maintainer or member) in the target space.
468
     * Owns its own AccountState resolution so ?publisher is bound through the
469
     * targeted (pkh → agent) lookup rather than enumerated.
470
     */
471
    private static String publisherIsTieredRole(IRI tierClass) {
472
        return """
×
473
                ?acct a npa:AccountState ;
474
                      npa:pubkey ?pkh ;
475
                      npa:agent  ?publisher .
476
                ?tierRI a gen:RoleInstantiation ;
477
                        npa:forSpace ?space ;
478
                        npa:forAgent ?publisher .
479
                ?rdT a npa:RoleDeclaration ;
480
                     npa:hasRoleType <%1$s> .
481
                { ?tierRI npa:regularProperty ?predT . ?rdT gen:hasRegularProperty ?predT . }
482
                UNION
483
                { ?tierRI npa:inverseProperty ?predT . ?rdT gen:hasInverseProperty ?predT . }
484
                """.formatted(tierClass);
×
485
    }
486

487
    /** Wraps {@link #runTierLoop} with tier-name context for logs/exceptions. */
488
    private int runTierLabeled(String tier, IRI graph, String sparqlUpdate) {
489
        try {
490
            return runTierLoop(graph, sparqlUpdate);
×
491
        } catch (RuntimeException ex) {
×
492
            log.error("AuthorityResolver: tier={} failed with SPARQL UPDATE:\n{}\n", tier, sparqlUpdate, ex);
×
493
            throw ex;
×
494
        }
495
    }
496

497
    /**
498
     * Runs a single tier's INSERT to fixed point. Counts rows by probing
499
     * graph size before/after each INSERT; stops when the size doesn't change.
500
     *
501
     * @return total number of triples inserted by this tier across all iterations
502
     */
503
    int runTierLoop(IRI graph, String sparqlUpdate) {
504
        int total = 0;
×
505
        long before = graphSize(graph);
×
506
        while (true) {
507
            // Note: no explicit transaction wrapping here. In tests we observed that
508
            // HTTPRepository's RDF4J-transaction protocol silently no-op'd cross-graph
509
            // SPARQL UPDATEs with UNION sub-patterns inside conn.begin()/commit(),
510
            // while the same UPDATE POSTed directly to /statements applied correctly.
511
            // A bare prepareUpdate().execute() takes the direct /statements path and
512
            // runs the UPDATE atomically per SPARQL 1.1 semantics — which is all we
513
            // need; there's nothing else to commit atomically alongside the UPDATE.
514
            try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
515
                conn.prepareUpdate(QueryLanguage.SPARQL, sparqlUpdate).execute();
×
516
            }
517
            long after = graphSize(graph);
×
518
            long added = after - before;
×
519
            if (added <= 0) break;
×
520
            total += added;
×
521
            before = after;
×
522
        }
×
523
        return total;
×
524
    }
525

526
    private long graphSize(IRI graph) {
527
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
528
            return conn.size(graph);
×
529
        }
530
    }
531

532
    /**
533
     * Distinct-subject totals in the given space-state graph, broken down by
534
     * RoleInstantiation kind (admin-pinned vs not) and RoleAssignment.
535
     * Three SELECT-COUNT queries — cheap, called once per build/cycle for
536
     * the user-facing log line. Returns zeros on failure (logged) so a flaky
537
     * count read can't wedge the cycle.
538
     */
539
    TierSubjectTotals computeTierSubjectTotals(IRI graph) {
540
        long adminRIs       = countDistinctSubjects(graph, """
×
541
                ?ri a gen:RoleInstantiation ; npa:inverseProperty gen:hasAdmin .
542
                """, "ri");
543
        long attachmentRAs  = countDistinctSubjects(graph, """
×
544
                ?ra a gen:RoleAssignment .
545
                """, "ra");
546
        long nonAdminRIs    = countDistinctSubjects(graph, """
×
547
                ?ri a gen:RoleInstantiation .
548
                FILTER NOT EXISTS { ?ri npa:inverseProperty gen:hasAdmin }
549
                """, "ri");
550
        return new TierSubjectTotals(adminRIs, attachmentRAs, nonAdminRIs);
×
551
    }
552

553
    private long countDistinctSubjects(IRI graph, String wherePattern, String varName) {
554
        String query = String.format("""
×
555
                PREFIX npa: <%1$s>
556
                PREFIX gen: <%2$s>
557
                SELECT (COUNT(DISTINCT ?%3$s) AS ?n) WHERE {
558
                  GRAPH <%4$s> {
559
                    %5$s
560
                  }
561
                }
562
                """, NPA.NAMESPACE, GEN.NAMESPACE, varName, graph, wherePattern);
563
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO);
×
564
             TupleQueryResult r = conn.prepareTupleQuery(QueryLanguage.SPARQL, query).evaluate()) {
×
565
            if (!r.hasNext()) return 0;
×
566
            return Long.parseLong(r.next().getBinding("n").getValue().stringValue());
×
567
        } catch (Exception ex) {
×
568
            log.warn("AuthorityResolver: countDistinctSubjects on {} failed: {}",
×
569
                    graph, ex.toString());
×
570
            return 0;
×
571
        }
572
    }
573

574
    // ---------------- SPARQL templates ----------------
575

576
    /**
577
     * Reusable invalidation filter on a bound nanopub-IRI variable. Pass the bare
578
     * variable name (no leading {@code ?}); e.g. {@code invalidationFilter("np")}
579
     * produces an outer-scoped {@code FILTER NOT EXISTS { GRAPH npa:spacesGraph
580
     * { ?_inv_np a npa:Invalidation ; npa:invalidates ?np . } }}.
581
     *
582
     * <p>Important: this filter must be placed OUTSIDE the surrounding
583
     * {@code GRAPH npa:spacesGraph { ... }} block, not nested inside it. When
584
     * nested, RDF4J's planner couples the FILTER NOT EXISTS evaluation into the
585
     * join order (per-row scan of {@code ?_inv a npa:Invalidation} multiplied by
586
     * the candidate set), which we measured turning a 39ms query into a 60s+
587
     * timeout on the live observer-tier data. Outside the GRAPH block, the
588
     * planner defers the filter until {@code ?np}/{@code ?rdNp} are bound and
589
     * does a targeted index lookup.
590
     *
591
     * <p>Variable names must match {@code [A-Za-z0-9_]+} per SPARQL grammar —
592
     * embedding a {@code ?} inside {@code ?_inv_?np} would yield a parse error.
593
     */
594
    private static String invalidationFilter(String bareVarName) {
595
        return "FILTER NOT EXISTS { GRAPH <" + SpacesVocab.SPACES_GRAPH + "> {"
30✔
596
                + " ?_inv_" + bareVarName
597
                + " a <" + SpacesVocab.INVALIDATION + "> ; "
598
                + "<" + SpacesVocab.INVALIDATES + "> ?" + bareVarName + " . } }";
599
    }
600

601
    /**
602
     * Admin tier: seed from {@code npadef:...hasRootAdmin} (trusted by construction)
603
     * plus closed-over admin grants; insert any {@code gen:RoleInstantiation} with
604
     * {@code npa:inverseProperty gen:hasAdmin} whose publisher (resolved via mirrored
605
     * trust-approved AccountState) is already in the admin set.
606
     */
607
    static String adminTierUpdate(IRI graph, long lastProcessed) {
608
        // Order tuned for RDF4J's evaluator:
609
        //   1. Anchor on the small (seed UNION closed-over) set to bind ?publisher
610
        //      and ?space cheaply.
611
        //   2. Resolve ?pkh from the mirrored AccountState row (?publisher bound).
612
        //   3. Probe instantiations using the now-bound (?space, ?pkh) — targeted
613
        //      lookup, not a full RoleInstantiation scan.
614
        //   4. Load-number filter on bound ?np.
615
        //   5. Dedup at the end.
616
        return """
69✔
617
                PREFIX npa:  <%1$s>
618
                PREFIX gen:  <%2$s>
619
                INSERT { GRAPH <%3$s> {
620
                  ?ri a gen:RoleInstantiation ;
621
                      npa:forSpace ?space ;
622
                      npa:inverseProperty gen:hasAdmin ;
623
                      npa:forAgent ?agent ;
624
                      npa:viaNanopub ?np .
625
                } }
626
                WHERE {
627
                  # 1. Anchor: who is already an admin of which space?
628
                  {
629
                    # Seed branch: root-admin in a non-invalidated SpaceDefinition.
630
                    GRAPH <%4$s> {
631
                      ?def a npa:SpaceDefinition ;
632
                           npa:forSpaceRef  ?spaceRef ;
633
                           npa:hasRootAdmin ?publisher ;
634
                           npa:viaNanopub   ?defNp .
635
                      ?spaceRef npa:spaceIri ?space .
636
                    }
637
                    %7$s
638
                  }
639
                  UNION
640
                  {
641
                    # Closed-over branch: an existing admin in this space-state graph.
642
                    GRAPH <%3$s> {
643
                      ?prev a gen:RoleInstantiation ;
644
                            npa:forSpace        ?space ;
645
                            npa:inverseProperty gen:hasAdmin ;
646
                            npa:forAgent        ?publisher .
647
                    }
648
                  }
649
                  # 2. Mirror: resolve ?publisher → ?pkh via the trust-approved row.
650
                  GRAPH <%3$s> {
651
                    ?acct a npa:AccountState ;
652
                          npa:agent  ?publisher ;
653
                          npa:pubkey ?pkh .
654
                  }
655
                  # 3. Targeted instantiation lookup by space + pubkey.
656
                  GRAPH <%4$s> {
657
                    ?ri a gen:RoleInstantiation ;
658
                        npa:forSpace        ?space ;
659
                        npa:inverseProperty gen:hasAdmin ;
660
                        npa:forAgent        ?agent ;
661
                        npa:pubkeyHash      ?pkh ;
662
                        npa:viaNanopub      ?np .
663
                  }
664
                  %6$s
665
                  # 4. Load-number filter on bound ?np.
666
                  GRAPH <%8$s> {
667
                    ?np npa:hasLoadNumber ?ln .
668
                    FILTER (?ln > %5$d)
669
                  }
670
                  # 5. Dedup last.
671
                  FILTER NOT EXISTS { GRAPH <%3$s> {
672
                    ?existing a gen:RoleInstantiation ;
673
                              npa:forSpace ?space ;
674
                              npa:forAgent ?agent ;
675
                              npa:inverseProperty gen:hasAdmin .
676
                  } }
677
                }
678
                """.formatted(
3✔
679
                NPA.NAMESPACE,
680
                GEN.NAMESPACE,
681
                graph,
682
                SpacesVocab.SPACES_GRAPH,
683
                lastProcessed,
15✔
684
                invalidationFilter("np"),
15✔
685
                invalidationFilter("defNp"),
18✔
686
                NPA.GRAPH);
687
    }
688

689
    /**
690
     * {@code gen:hasRole} attachment validation: an attachment is validated iff its
691
     * publisher is already a validated admin of the target space. Adds
692
     * {@code gen:RoleAssignment} rows to the space-state graph.
693
     */
694
    static String attachmentValidationUpdate(IRI graph, long lastProcessed) {
695
        return """
69✔
696
                PREFIX npa:  <%1$s>
697
                PREFIX gen:  <%2$s>
698
                INSERT { GRAPH <%3$s> {
699
                  ?ra a gen:RoleAssignment ;
700
                      npa:forSpace ?space ;
701
                      gen:hasRole  ?role ;
702
                      npa:viaNanopub ?np .
703
                } }
704
                WHERE {
705
                  GRAPH <%4$s> {
706
                    ?ra a gen:RoleAssignment ;
707
                        npa:forSpace ?space ;
708
                        gen:hasRole  ?role ;
709
                        npa:pubkeyHash ?pkh ;
710
                        npa:viaNanopub ?np .
711
                  }
712
                  GRAPH <%7$s> {
713
                    ?np npa:hasLoadNumber ?ln .
714
                    FILTER (?ln > %5$d)
715
                  }
716
                  GRAPH <%3$s> {
717
                    ?acct a npa:AccountState ;
718
                          npa:agent  ?publisher ;
719
                          npa:pubkey ?pkh .
720
                    ?adminRI a gen:RoleInstantiation ;
721
                             npa:forSpace ?space ;
722
                             npa:inverseProperty gen:hasAdmin ;
723
                             npa:forAgent ?publisher .
724
                  }
725
                  %6$s
726
                  FILTER NOT EXISTS { GRAPH <%3$s> {
727
                    ?existing a gen:RoleAssignment ;
728
                              npa:forSpace ?space ;
729
                              gen:hasRole  ?role .
730
                  } }
731
                }
732
                """.formatted(
3✔
733
                NPA.NAMESPACE,
734
                GEN.NAMESPACE,
735
                graph,
736
                SpacesVocab.SPACES_GRAPH,
737
                lastProcessed,
15✔
738
                invalidationFilter("np"),
18✔
739
                NPA.GRAPH);
740
    }
741

742
    /**
743
     * Non-admin tier publisher constraints (inserted as a SPARQL sub-pattern).
744
     * Each constraint owns the AccountState (pkh → agent) lookup so the join
745
     * variable is bound through a targeted pattern. The observer-self variant
746
     * binds {@code npa:agent ?agent} directly — no separate {@code ?publisher}
747
     * variable, no post-join equality filter — which lets the planner anchor
748
     * the AccountState lookup on the already-bound {@code ?agent} instead of
749
     * enumerating all approved publishers and filtering at the end.
750
     */
751
    static final String PUBLISHER_IS_ADMIN = """
752
            ?acct a npa:AccountState ;
753
                  npa:pubkey ?pkh ;
754
                  npa:agent  ?publisher .
755
            ?adminRI a gen:RoleInstantiation ;
756
                     npa:forSpace ?space ;
757
                     npa:inverseProperty gen:hasAdmin ;
758
                     npa:forAgent ?publisher .
759
            """;
760

761
    /** Observer self-evidence: the assignee's own pubkey signed the instantiation. */
762
    static final String PUBLISHER_IS_SELF = """
763
            ?acct a npa:AccountState ;
764
                  npa:pubkey ?pkh ;
765
                  npa:agent  ?agent .
766
            """;
767

768
    /**
769
     * Maintainer / Member / Observer tier INSERT. Same shape: find an instantiation
770
     * whose predicate matches a RoleDeclaration of the given tier attached to the
771
     * target space, and whose publisher passes the tier-specific constraint.
772
     */
773
    static String nonAdminTierUpdate(IRI graph, long lastProcessed,
774
                                     IRI tierClass, String publisherConstraint) {
775
        // Order tuned for RDF4J's evaluator (which executes BGPs roughly in order).
776
        // The crucial choice is the *anchor*: instantiation-first plans send the
777
        // planner exploring the full ~thousands of candidate RIs and only filter
778
        // by tier at the very end. Attachment-first anchors on the small set of
779
        // gen:RoleAssignment rows already validated in this space-state graph
780
        // (~hundreds, often zero) and walks outward by bound (?role, ?space).
781
        //
782
        //   1. Anchor on RoleAssignments in this space-state graph (small).
783
        //   2. Match the tier-pinned RoleDeclaration by ?role.
784
        //   3. Pair role-decl direction to instantiation direction in one UNION
785
        //      so only (reg, reg)/(inv, inv) combos are explored.
786
        //   4. Targeted instantiation lookup — (?space, ?pred) are bound.
787
        //   5. Publisher constraint (incl. AccountState resolution).
788
        //   6. Load-number filter on bound ?np.
789
        //   7. Dedup at the end.
790
        return """
69✔
791
                PREFIX npa:  <%1$s>
792
                PREFIX gen:  <%2$s>
793
                INSERT { GRAPH <%3$s> {
794
                  ?ri a gen:RoleInstantiation ;
795
                      npa:forSpace ?space ;
796
                      npa:forAgent ?agent ;
797
                      npa:viaNanopub ?np .
798
                } }
799
                WHERE {
800
                  # 1. Anchor: validated attachments in this space-state graph.
801
                  GRAPH <%3$s> {
802
                    ?ra a gen:RoleAssignment ;
803
                        gen:hasRole  ?role ;
804
                        npa:forSpace ?space .
805
                  }
806
                  # 2. Tier-pinned RoleDeclaration (?role bound from the attachment).
807
                  GRAPH <%4$s> {
808
                    ?rd a npa:RoleDeclaration ;
809
                        npa:hasRoleType <%7$s> ;
810
                        npa:role        ?role ;
811
                        npa:viaNanopub  ?rdNp .
812
                    # 3. Pair direction so only matching combos are explored.
813
                    {
814
                      ?rd gen:hasRegularProperty ?pred .
815
                      ?ri npa:regularProperty    ?pred .
816
                    }
817
                    UNION
818
                    {
819
                      ?rd gen:hasInverseProperty ?pred .
820
                      ?ri npa:inverseProperty    ?pred .
821
                    }
822
                    # 4. Targeted instantiation lookup — (?space, ?pred) bound.
823
                    ?ri a gen:RoleInstantiation ;
824
                        npa:forSpace   ?space ;
825
                        npa:forAgent   ?agent ;
826
                        npa:pubkeyHash ?pkh ;
827
                        npa:viaNanopub ?np .
828
                  }
829
                  # 5. Publisher constraint (incl. AccountState resolution).
830
                  GRAPH <%3$s> {
831
                    %9$s
832
                  }
833
                  # 6. Load-number filter on bound ?np.
834
                  GRAPH <%10$s> {
835
                    ?np npa:hasLoadNumber ?ln .
836
                    FILTER (?ln > %5$d)
837
                  }
838
                  # 7. Invalidation filters — outside the GRAPH block so the
839
                  #    planner defers them until ?rdNp/?np are bound.
840
                  %8$s
841
                  %6$s
842
                  # 8. Dedup last.
843
                  FILTER NOT EXISTS { GRAPH <%3$s> {
844
                    ?existing a gen:RoleInstantiation ;
845
                              npa:forSpace ?space ;
846
                              npa:forAgent ?agent ;
847
                              npa:viaNanopub ?np .
848
                  } }
849
                }
850
                """.formatted(
3✔
851
                NPA.NAMESPACE,
852
                GEN.NAMESPACE,
853
                graph,
854
                SpacesVocab.SPACES_GRAPH,
855
                lastProcessed,
15✔
856
                invalidationFilter("np"),
27✔
857
                tierClass,
858
                invalidationFilter("rdNp"),
30✔
859
                publisherConstraint,
860
                NPA.GRAPH);
861
    }
862

863
    // ---------------- Invalidation templates (incremental cycle) ----------------
864

865
    /**
866
     * WHERE clause shared by the admin-RI invalidation ASK precheck and the
867
     * matching DELETE. Identifies admin-tier {@code gen:RoleInstantiation} rows
868
     * in the space-state graph whose {@code npa:viaNanopub} equals the target
869
     * of an {@code npa:Invalidation} that landed in {@code (lastProcessed, ∞)}.
870
     */
871
    static String adminInvalidationCheckWhere(IRI graph, long lastProcessed) {
872
        return String.format("""
60✔
873
                  GRAPH <%1$s> {
874
                    ?ri a gen:RoleInstantiation ;
875
                        npa:inverseProperty gen:hasAdmin ;
876
                        npa:viaNanopub ?np .
877
                  }
878
                  GRAPH <%2$s> {
879
                    ?inv a npa:Invalidation ;
880
                         npa:invalidates ?np ;
881
                         npa:viaNanopub  ?invNp .
882
                  }
883
                  GRAPH <%3$s> {
884
                    ?invNp npa:hasLoadNumber ?ln .
885
                    FILTER (?ln > %4$d)
886
                  }
887
                """, graph, SpacesVocab.SPACES_GRAPH, NPA.GRAPH, lastProcessed);
6✔
888
    }
889

890
    /** DELETE template for admin-tier RoleInstantiations whose source nanopub was invalidated. */
891
    static String adminInvalidationDelete(IRI graph, long lastProcessed) {
892
        return String.format("""
63✔
893
                PREFIX npa: <%1$s>
894
                PREFIX gen: <%2$s>
895
                DELETE { GRAPH <%3$s> {
896
                  ?ri ?p ?o .
897
                } }
898
                WHERE {
899
                  GRAPH <%3$s> { ?ri ?p ?o . }
900
                %4$s
901
                }
902
                """, NPA.NAMESPACE, GEN.NAMESPACE, graph,
903
                adminInvalidationCheckWhere(graph, lastProcessed));
6✔
904
    }
905

906
    /** WHERE clause for RoleAssignment invalidation. */
907
    static String roleAssignmentInvalidationCheckWhere(IRI graph, long lastProcessed) {
908
        return String.format("""
60✔
909
                  GRAPH <%1$s> {
910
                    ?ra a gen:RoleAssignment ;
911
                        npa:viaNanopub ?np .
912
                  }
913
                  GRAPH <%2$s> {
914
                    ?inv a npa:Invalidation ;
915
                         npa:invalidates ?np ;
916
                         npa:viaNanopub  ?invNp .
917
                  }
918
                  GRAPH <%3$s> {
919
                    ?invNp npa:hasLoadNumber ?ln .
920
                    FILTER (?ln > %4$d)
921
                  }
922
                """, graph, SpacesVocab.SPACES_GRAPH, NPA.GRAPH, lastProcessed);
6✔
923
    }
924

925
    /** DELETE template for RoleAssignments whose source nanopub was invalidated. */
926
    static String roleAssignmentInvalidationDelete(IRI graph, long lastProcessed) {
927
        return String.format("""
63✔
928
                PREFIX npa: <%1$s>
929
                PREFIX gen: <%2$s>
930
                DELETE { GRAPH <%3$s> {
931
                  ?ra ?p ?o .
932
                } }
933
                WHERE {
934
                  GRAPH <%3$s> { ?ra ?p ?o . }
935
                %4$s
936
                }
937
                """, NPA.NAMESPACE, GEN.NAMESPACE, graph,
938
                roleAssignmentInvalidationCheckWhere(graph, lastProcessed));
6✔
939
    }
940

941
    /**
942
     * WHERE clause for RoleDeclaration invalidation. ASK-only (no DELETE):
943
     * RoleDeclarations live in {@code npa:spacesGraph} and aren't materialized
944
     * into the space-state graph, so there's nothing to remove from the
945
     * space-state. The ASK still flips {@code npa:needsFullRebuild} because
946
     * sticky downstream RIs that were derived under the now-invalidated RD
947
     * need a from-scratch recompute.
948
     */
949
    static String roleDeclarationInvalidationCheckWhere(long lastProcessed) {
950
        return String.format("""
48✔
951
                  GRAPH <%1$s> {
952
                    ?rd a npa:RoleDeclaration ;
953
                        npa:viaNanopub ?np .
954
                    ?inv a npa:Invalidation ;
955
                         npa:invalidates ?np ;
956
                         npa:viaNanopub  ?invNp .
957
                  }
958
                  GRAPH <%2$s> {
959
                    ?invNp npa:hasLoadNumber ?ln .
960
                    FILTER (?ln > %3$d)
961
                  }
962
                """, SpacesVocab.SPACES_GRAPH, NPA.GRAPH, lastProcessed);
6✔
963
    }
964

965
    /**
966
     * DELETE template for non-admin (leaf-tier) RoleInstantiations whose source
967
     * nanopub was invalidated. Identified as {@code gen:RoleInstantiation} rows
968
     * lacking the admin-pinning {@code npa:inverseProperty gen:hasAdmin} triple.
969
     * No flag is set; leaf-tier removals are recoverable on the next cycle.
970
     */
971
    static String leafTierInvalidationDelete(IRI graph, long lastProcessed) {
972
        return String.format("""
84✔
973
                PREFIX npa: <%1$s>
974
                PREFIX gen: <%2$s>
975
                DELETE { GRAPH <%3$s> {
976
                  ?ri ?p ?o .
977
                } }
978
                WHERE {
979
                  GRAPH <%3$s> {
980
                    ?ri a gen:RoleInstantiation ;
981
                        npa:viaNanopub ?np .
982
                    FILTER NOT EXISTS { ?ri npa:inverseProperty gen:hasAdmin }
983
                    ?ri ?p ?o .
984
                  }
985
                  GRAPH <%4$s> {
986
                    ?inv a npa:Invalidation ;
987
                         npa:invalidates ?np ;
988
                         npa:viaNanopub  ?invNp .
989
                  }
990
                  GRAPH <%5$s> {
991
                    ?invNp npa:hasLoadNumber ?ln .
992
                    FILTER (?ln > %6$d)
993
                  }
994
                }
995
                """, NPA.NAMESPACE, GEN.NAMESPACE, graph,
996
                SpacesVocab.SPACES_GRAPH, NPA.GRAPH, lastProcessed);
6✔
997
    }
998

999
    /** Wraps an ASK by joining the shared prefixes. */
1000
    private boolean wouldInvalidate(IRI graph, long lastProcessed,
1001
                                    boolean adminPinned, String whereClause) {
1002
        // adminPinned is informational only — kept to make call sites read clearly;
1003
        // the WHERE clause already encodes the kind via its own type predicates.
1004
        String ask = String.format("""
×
1005
                PREFIX npa: <%1$s>
1006
                PREFIX gen: <%2$s>
1007
                ASK { %3$s }
1008
                """, NPA.NAMESPACE, GEN.NAMESPACE, whereClause);
1009
        return runAsk(ask);
×
1010
    }
1011

1012
    private boolean runAsk(String sparql) {
1013
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
1014
            return conn.prepareBooleanQuery(QueryLanguage.SPARQL, sparql).evaluate();
×
1015
        }
1016
    }
1017

1018
    private void executeUpdate(String sparqlUpdate) {
1019
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
1020
            conn.prepareUpdate(QueryLanguage.SPARQL, sparqlUpdate).execute();
×
1021
        }
1022
    }
×
1023

1024
    // ---------------- Mirror step ----------------
1025

1026
    /**
1027
     * Copies trust-approved {@code npa:AccountState} rows from {@code npat:<T>}
1028
     * in the {@code trust} repo into {@code newGraph} in the {@code spaces} repo,
1029
     * inside one spaces-side serializable transaction.
1030
     *
1031
     * @return number of rows mirrored (useful for metrics / logging)
1032
     */
1033
    int mirrorTrustState(String trustStateHash, IRI newGraph) {
1034
        IRI trustStateIri = NPAT.forHash(trustStateHash);
×
1035
        int count = 0;
×
1036
        try (RepositoryConnection trustConn = TripleStore.get().getRepoConnection(TRUST_REPO);
×
1037
             RepositoryConnection spacesConn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
1038
            trustConn.begin(IsolationLevels.READ_COMMITTED);
×
1039
            spacesConn.begin(IsolationLevels.SERIALIZABLE);
×
1040
            // Walk rdf:type triples in the trust state's graph; for each AccountState,
1041
            // check status and copy the approved ones verbatim (minus status-specific
1042
            // detail triples, which we don't need for validation).
1043
            try (RepositoryResult<Statement> typeRows = trustConn.getStatements(
×
1044
                    null, RDF.TYPE, NPA_ACCOUNT_STATE, trustStateIri)) {
1045
                while (typeRows.hasNext()) {
×
1046
                    Statement st = typeRows.next();
×
1047
                    if (!(st.getSubject() instanceof IRI accountStateIri)) continue;
×
1048
                    Value status = trustConn.getStatements(accountStateIri, NPA_TRUST_STATUS, null, trustStateIri)
×
1049
                            .stream().findFirst().map(Statement::getObject).orElse(null);
×
1050
                    if (!(status instanceof IRI statusIri) || !APPROVED_SET.contains(statusIri)) continue;
×
1051
                    Value agent = trustConn.getStatements(accountStateIri, NPA_AGENT, null, trustStateIri)
×
1052
                            .stream().findFirst().map(Statement::getObject).orElse(null);
×
1053
                    Value pubkey = trustConn.getStatements(accountStateIri, NPA_PUBKEY, null, trustStateIri)
×
1054
                            .stream().findFirst().map(Statement::getObject).orElse(null);
×
1055
                    if (agent == null || pubkey == null) {
×
1056
                        log.warn("AuthorityResolver.mirror: account {} missing agent or pubkey; skipping",
×
1057
                                accountStateIri);
1058
                        continue;
×
1059
                    }
1060
                    spacesConn.add(accountStateIri, RDF.TYPE, NPA_ACCOUNT_STATE, newGraph);
×
1061
                    spacesConn.add(accountStateIri, NPA_AGENT, agent, newGraph);
×
1062
                    spacesConn.add(accountStateIri, NPA_PUBKEY, pubkey, newGraph);
×
1063
                    spacesConn.add(accountStateIri, NPA_TRUST_STATUS, statusIri, newGraph);
×
1064
                    count++;
×
1065
                }
×
1066
            }
1067
            // Mirror canonical foaf:name triples for approved agents. The trust
1068
            // loader emits one per agent (across approved keys, MAX(ratio) wins).
1069
            // Copying them into the space-state graph means consumers reading
1070
            // ?agent foaf:name ?n inside the state graph hit local data, with no
1071
            // cross-repo SERVICE.
1072
            try (RepositoryResult<Statement> nameRows = trustConn.getStatements(
×
1073
                    null, FOAF.NAME, null, trustStateIri)) {
1074
                while (nameRows.hasNext()) {
×
1075
                    Statement st = nameRows.next();
×
1076
                    spacesConn.add(st.getSubject(), st.getPredicate(), st.getObject(), newGraph);
×
1077
                }
×
1078
            }
1079
            spacesConn.commit();
×
1080
            trustConn.commit();
×
1081
        }
1082
        return count;
×
1083
    }
1084

1085
    // ---------------- Pointer + counter helpers ----------------
1086

1087
    /**
1088
     * Reads the current {@code npa:hasCurrentSpaceState} pointer from the
1089
     * {@code npa:graph} admin graph of the {@code spaces} repo. Returns
1090
     * {@code null} if no pointer exists yet.
1091
     */
1092
    IRI getCurrentSpaceStateGraph() {
1093
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
1094
            Value v = Utils.getObjectForPattern(conn, NPA.GRAPH, NPA.THIS_REPO,
×
1095
                    SpacesVocab.HAS_CURRENT_SPACE_STATE);
1096
            return (v instanceof IRI iri) ? iri : null;
×
1097
        } catch (Exception ex) {
×
1098
            log.warn("AuthorityResolver: failed to read hasCurrentSpaceState pointer: {}", ex.toString());
×
1099
            return null;
×
1100
        }
1101
    }
1102

1103
    long getCurrentLoadCounter() {
1104
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
1105
            Value v = Utils.getObjectForPattern(conn, NPA.GRAPH, NPA.THIS_REPO,
×
1106
                    SpacesVocab.CURRENT_LOAD_COUNTER);
1107
            if (v == null) return 0;
×
1108
            try {
1109
                return Long.parseLong(v.stringValue());
×
1110
            } catch (NumberFormatException ex) {
×
1111
                log.warn("AuthorityResolver: non-numeric currentLoadCounter: {}", v);
×
1112
                return 0;
×
1113
            }
1114
        } catch (Exception ex) {
×
1115
            log.warn("AuthorityResolver: failed to read currentLoadCounter: {}", ex.toString());
×
1116
            return 0;
×
1117
        }
1118
    }
1119

1120
    /**
1121
     * Atomic pointer flip: a single SPARQL {@code DELETE … INSERT … WHERE}
1122
     * replaces the old pointer with the new one in one statement, so readers
1123
     * never see a zero-pointer window.
1124
     */
1125
    void flipPointer(IRI newGraph) {
1126
        String update = String.format("""
×
1127
                DELETE { GRAPH <%s> { <%s> <%s> ?old } }
1128
                INSERT { GRAPH <%s> { <%s> <%s> <%s> } }
1129
                WHERE  { OPTIONAL { GRAPH <%s> { <%s> <%s> ?old } } }
1130
                """,
1131
                NPA.GRAPH, NPA.THIS_REPO, SpacesVocab.HAS_CURRENT_SPACE_STATE,
1132
                NPA.GRAPH, NPA.THIS_REPO, SpacesVocab.HAS_CURRENT_SPACE_STATE, newGraph,
1133
                NPA.GRAPH, NPA.THIS_REPO, SpacesVocab.HAS_CURRENT_SPACE_STATE);
1134
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
1135
            conn.begin(IsolationLevels.SERIALIZABLE);
×
1136
            conn.prepareUpdate(QueryLanguage.SPARQL, update).execute();
×
1137
            conn.commit();
×
1138
        }
1139
    }
×
1140

1141
    void writeProcessedUpTo(IRI graph, long loadCounter) {
1142
        String update = String.format("""
×
1143
                DELETE { GRAPH <%s> { <%s> <%s> ?old } }
1144
                INSERT { GRAPH <%s> { <%s> <%s> "%d"^^<http://www.w3.org/2001/XMLSchema#long> } }
1145
                WHERE  { OPTIONAL { GRAPH <%s> { <%s> <%s> ?old } } }
1146
                """,
1147
                graph, graph, SpacesVocab.PROCESSED_UP_TO,
1148
                graph, graph, SpacesVocab.PROCESSED_UP_TO, loadCounter,
×
1149
                graph, graph, SpacesVocab.PROCESSED_UP_TO);
1150
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
1151
            conn.begin(IsolationLevels.SERIALIZABLE);
×
1152
            conn.prepareUpdate(QueryLanguage.SPARQL, update).execute();
×
1153
            conn.commit();
×
1154
        }
1155
    }
×
1156

1157
    /**
1158
     * Reads {@code processedUpTo} from the given space-state graph.
1159
     * Returns {@code -1} if absent (graph not fully built yet).
1160
     */
1161
    long readProcessedUpTo(IRI graph) {
1162
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
1163
            String query = String.format(
×
1164
                    "SELECT ?n WHERE { GRAPH <%s> { <%s> <%s> ?n } }",
1165
                    graph, graph, SpacesVocab.PROCESSED_UP_TO);
1166
            try (TupleQueryResult r = conn.prepareTupleQuery(QueryLanguage.SPARQL, query).evaluate()) {
×
1167
                if (!r.hasNext()) return -1;
×
1168
                BindingSet b = r.next();
×
1169
                return Long.parseLong(b.getBinding("n").getValue().stringValue());
×
1170
            }
×
1171
        } catch (Exception ex) {
×
1172
            log.warn("AuthorityResolver: failed to read processedUpTo for {}: {}", graph, ex.toString());
×
1173
            return -1;
×
1174
        }
1175
    }
1176

1177
    /**
1178
     * Reads the {@code npa:needsFullRebuild} flag (boolean literal) from
1179
     * {@code npa:graph} in the {@code spaces} repo. Defaults to {@code false}
1180
     * when the triple is absent.
1181
     */
1182
    boolean readNeedsFullRebuild() {
1183
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
1184
            Value v = Utils.getObjectForPattern(conn, NPA.GRAPH, NPA.THIS_REPO,
×
1185
                    SpacesVocab.NEEDS_FULL_REBUILD);
1186
            return v != null && Boolean.parseBoolean(v.stringValue());
×
1187
        } catch (Exception ex) {
×
1188
            log.warn("AuthorityResolver: failed to read needsFullRebuild: {}", ex.toString());
×
1189
            return false;
×
1190
        }
1191
    }
1192

1193
    void setNeedsFullRebuild() {
1194
        writeNeedsFullRebuild(true);
×
1195
    }
×
1196

1197
    void clearNeedsFullRebuild() {
1198
        writeNeedsFullRebuild(false);
×
1199
    }
×
1200

1201
    private void writeNeedsFullRebuild(boolean value) {
1202
        String update = String.format("""
×
1203
                DELETE { GRAPH <%s> { <%s> <%s> ?old } }
1204
                INSERT { GRAPH <%s> { <%s> <%s> "%s"^^<http://www.w3.org/2001/XMLSchema#boolean> } }
1205
                WHERE  { OPTIONAL { GRAPH <%s> { <%s> <%s> ?old } } }
1206
                """,
1207
                NPA.GRAPH, NPA.THIS_REPO, SpacesVocab.NEEDS_FULL_REBUILD,
1208
                NPA.GRAPH, NPA.THIS_REPO, SpacesVocab.NEEDS_FULL_REBUILD, value,
×
1209
                NPA.GRAPH, NPA.THIS_REPO, SpacesVocab.NEEDS_FULL_REBUILD);
1210
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
1211
            conn.begin(IsolationLevels.SERIALIZABLE);
×
1212
            conn.prepareUpdate(QueryLanguage.SPARQL, update).execute();
×
1213
            conn.commit();
×
1214
        }
1215
    }
×
1216

1217
    void dropGraph(IRI graph) {
1218
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
1219
            conn.begin(IsolationLevels.SERIALIZABLE);
×
1220
            conn.clear(graph);
×
1221
            conn.commit();
×
1222
            log.info("AuthorityResolver: dropped old space-state graph {}", graph);
×
1223
        }
1224
    }
×
1225

1226
    // ---------------- Trust-repo pointer lookup (used by TrustStateRegistry's bootstrap) ----------------
1227

1228
    /**
1229
     * Queries the {@code trust} repo directly for the current trust-state hash.
1230
     * Prefer {@link TrustStateRegistry#getCurrentHash()} in normal operation —
1231
     * this helper exists for tests and diagnostics.
1232
     *
1233
     * @return the current trust-state hash, or empty if none is set
1234
     */
1235
    Optional<String> readTrustRepoCurrentHash() {
1236
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(TRUST_REPO)) {
×
1237
            Value v = Utils.getObjectForPattern(conn, NPA.GRAPH, NPA.THIS_REPO,
×
1238
                    NPA_HAS_CURRENT_TRUST_STATE);
1239
            if (!(v instanceof IRI iri)) return Optional.empty();
×
1240
            String s = iri.stringValue();
×
1241
            if (!s.startsWith(NPAT.NAMESPACE)) return Optional.empty();
×
1242
            return Optional.of(s.substring(NPAT.NAMESPACE.length()));
×
1243
        } catch (Exception ex) {
×
1244
            log.warn("AuthorityResolver: failed to read trust-repo current pointer: {}", ex.toString());
×
1245
            return Optional.empty();
×
1246
        }
1247
    }
1248

1249
    private static String abbrev(String hash) {
1250
        return hash.length() > 12 ? hash.substring(0, 12) + "…" : hash;
×
1251
    }
1252

1253
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc