• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

knowledgepixels / nanopub-registry / 24121799241

08 Apr 2026 06:42AM UTC coverage: 32.765% (-0.6%) from 33.374%
24121799241

push

github

tkuhn
Merge branch 'feat/agent-quota-enforcement'

263 of 888 branches covered (29.62%)

Branch coverage included in aggregate %.

793 of 2335 relevant lines covered (33.96%)

5.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.9
src/main/java/com/knowledgepixels/registry/RegistryPeerConnector.java
1
package com.knowledgepixels.registry;
2

3
import com.mongodb.ErrorCategory;
4
import com.mongodb.MongoWriteException;
5
import com.mongodb.client.ClientSession;
6
import com.mongodb.client.MongoCursor;
7
import org.apache.http.HttpResponse;
8
import org.apache.http.client.methods.HttpGet;
9
import org.apache.http.client.methods.HttpHead;
10
import org.apache.http.util.EntityUtils;
11
import org.bson.Document;
12
import org.nanopub.Nanopub;
13
import org.nanopub.NanopubUtils;
14
import org.nanopub.jelly.NanopubStream;
15
import org.slf4j.Logger;
16
import org.slf4j.LoggerFactory;
17

18
import java.io.IOException;
19
import java.io.InputStream;
20
import java.util.ArrayList;
21
import java.util.Collections;
22
import java.util.List;
23
import java.util.concurrent.atomic.AtomicLong;
24

25
import static com.knowledgepixels.registry.RegistryDB.*;
26

27
/**
28
 * Checks peer Nanopub Registries for new nanopublications and loads them.
29
 */
30
public class RegistryPeerConnector {
31

32
    private RegistryPeerConnector() {}
33

34
    private static final Logger log = LoggerFactory.getLogger(RegistryPeerConnector.class);
12✔
35

36
    public static void checkPeers(ClientSession s) {
37
        List<String> peerUrls = new ArrayList<>(Utils.getPeerUrls());
×
38
        Collections.shuffle(peerUrls);
×
39

40
        for (String peerUrl : peerUrls) {
×
41
            try {
42
                checkPeer(s, peerUrl);
×
43
            } catch (Exception ex) {
×
44
                log.info("Error checking peer {}: {}", peerUrl, ex.getMessage());
×
45
            }
×
46
        }
×
47
    }
×
48

49
    static void checkPeer(ClientSession s, String peerUrl) throws IOException {
50
        log.info("Checking peer: {}", peerUrl);
×
51

52
        HttpResponse resp = NanopubUtils.getHttpClient().execute(new HttpHead(peerUrl));
×
53
        int httpStatus = resp.getStatusLine().getStatusCode();
×
54
        EntityUtils.consumeQuietly(resp.getEntity());
×
55
        if (httpStatus < 200 || httpStatus >= 300) {
×
56
            log.info("Failed to reach peer {}: {}", peerUrl, httpStatus);
×
57
            return;
×
58
        }
59

60
        if (isTestInstance(resp)) {
×
61
            log.info("Skipping peer {} because it is a test instance", peerUrl);
×
62
            return;
×
63
        }
64

65
        String status = getHeader(resp, "Nanopub-Registry-Status");
×
66
        if (!"ready".equals(status) && !"updating".equals(status)) {
×
67
            log.info("Peer {} in non-ready state: {}", peerUrl, status);
×
68
            return;
×
69
        }
70

71
        Long peerSetupId = getHeaderLong(resp, "Nanopub-Registry-Setup-Id");
×
72
        // TODO(transition): Remove Load-Counter fallback after all peers upgraded
73
        Long peerSeqNum = getHeaderLong(resp, "Nanopub-Registry-SeqNum");
×
74
        if (peerSeqNum == null) {
×
75
            peerSeqNum = getHeaderLong(resp, "Nanopub-Registry-Load-Counter");
×
76
        }
77
        if (peerSetupId == null || peerSeqNum == null) {
×
78
            log.info("Peer {} missing setupId or seqNum headers", peerUrl);
×
79
            return;
×
80
        }
81

82
        syncWithPeer(s, peerUrl, peerSetupId, peerSeqNum);
×
83
    }
×
84

85
    static void syncWithPeer(ClientSession s, String peerUrl, long peerSetupId, long peerSeqNum) {
86
        Document peerState = getPeerState(s, peerUrl);
12✔
87
        Long lastSetupId = peerState != null ? peerState.getLong("setupId") : null;
24✔
88
        // TODO(transition): Remove loadCounter fallback after all peers upgraded
89
        Long lastSeqNum = peerState != null ? peerState.getLong("seqNum") : null;
24✔
90
        if (lastSeqNum == null && peerState != null) {
12✔
91
            lastSeqNum = peerState.getLong("loadCounter");
12✔
92
        }
93

94
        if (lastSetupId != null && !lastSetupId.equals(peerSetupId)) {
21✔
95
            log.info("Peer {} was reset (setupId changed), resetting tracking", peerUrl);
12✔
96
            deletePeerState(s, peerUrl);
9✔
97
            lastSeqNum = null;
6✔
98
        }
99

100
        long effectiveSeqNum = lastSeqNum != null ? lastSeqNum : 0;
21✔
101

102
        if (lastSeqNum != null && lastSeqNum.equals(peerSeqNum)) {
21!
103
            log.info("Peer {} has no new nanopubs (seqNum unchanged: {})", peerUrl, peerSeqNum);
21✔
104
        } else if (lastSeqNum != null) {
6!
105
            // Fetch all nanopubs added since our last known position.
106
            log.info("Peer {} has new nanopubs (seqNum {} -> {}), fetching recent", peerUrl, lastSeqNum, peerSeqNum);
×
107
            long lastReceived = loadRecentNanopubs(s, peerUrl, lastSeqNum);
×
108
            if (lastReceived > 0) {
×
109
                effectiveSeqNum = lastReceived;
×
110
            }
111
            // Only discover new pubkeys when the peer has new data
112
            discoverPubkeys(s, peerUrl);
×
113
        } else {
×
114
            log.info("Peer {} is new, pubkey discovery will handle initial sync", peerUrl);
12✔
115
            discoverPubkeys(s, peerUrl);
9✔
116
        }
117
        updatePeerState(s, peerUrl, peerSetupId, effectiveSeqNum);
15✔
118
    }
3✔
119

120
    /**
121
     * Fetches nanopubs from a peer after the given seqNum.
122
     * @return the seqNum of the last successfully received nanopub, or -1 if none were received
123
     */
124
    private static long loadRecentNanopubs(ClientSession s, String peerUrl, long afterSeqNum) {
125
        // TODO(transition): Remove afterCounter param after all peers upgraded
126
        String requestUrl = peerUrl + "nanopubs.jelly?afterSeqNum=" + afterSeqNum + "&afterCounter=" + afterSeqNum;
×
127
        log.info("Fetching recent nanopubs from: {}", requestUrl);
×
128
        AtomicLong lastReceivedCounter = new AtomicLong(-1);
×
129
        try {
130
            HttpResponse resp = NanopubUtils.getHttpClient().execute(new HttpGet(requestUrl));
×
131
            int httpStatus = resp.getStatusLine().getStatusCode();
×
132
            if (httpStatus < 200 || httpStatus >= 300) {
×
133
                EntityUtils.consumeQuietly(resp.getEntity());
×
134
                log.info("Request failed: {} {}", requestUrl, httpStatus);
×
135
                return -1;
×
136
            }
137
            try (InputStream is = resp.getEntity().getContent()) {
×
138
                NanopubLoader.loadStreamInParallel(
×
139
                        NanopubStream.fromByteStream(is).getAsNanopubs().peek(m -> {
×
140
                            // Track counter in the main thread as items are consumed from the stream
141
                            if (m.isSuccess() && m.getCounter() > 0) {
×
142
                                lastReceivedCounter.set(m.getCounter());
×
143
                            }
144
                        }),
×
145
                        np -> {
146
                            if (!CoverageFilter.isCovered(np)) return;
×
147
                            try (ClientSession workerSession = RegistryDB.getClient().startSession()) {
×
148
                                String pubkey = RegistryDB.getPubkey(np);
×
149
                                if (pubkey != null) {
×
150
                                    NanopubLoader.simpleLoad(workerSession, np, pubkey);
×
151
                                }
152
                            }
153
                        });
×
154
            }
155
        } catch (IOException ex) {
×
156
            log.info("Failed to fetch recent nanopubs from {}: {}", peerUrl, ex.getMessage());
×
157
        }
×
158
        log.info("Last received counter from {}: {}", peerUrl, lastReceivedCounter.get());
×
159
        return lastReceivedCounter.get();
×
160
    }
161

162
    static void discoverPubkeys(ClientSession s, String peerUrl) {
163
        log.info("Discovering pubkeys from peer: {}", peerUrl);
12✔
164
        try {
165
            List<String> peerPubkeys = Utils.retrieveListFromJsonUrl(peerUrl + "pubkeys.json");
×
166
            int discovered = 0;
×
167
            for (String pubkeyHash : peerPubkeys) {
×
168
                Document filter = new Document("pubkey", pubkeyHash).append("type", NanopubLoader.INTRO_TYPE_HASH);
×
169
                if (!has(s, "lists", filter)) {
×
170
                    try {
171
                        insert(s, "lists", new Document("pubkey", pubkeyHash)
×
172
                                .append("type", NanopubLoader.INTRO_TYPE_HASH)
×
173
                                .append("status", EntryStatus.encountered.getValue()));
×
174
                    } catch (MongoWriteException e) {
×
175
                        if (e.getError().getCategory() != ErrorCategory.DUPLICATE_KEY) throw e;
×
176
                    }
×
177
                    discovered++;
×
178
                } else if (!has(s, "lists", new Document(filter).append("status", EntryStatus.loaded.getValue()))) {
×
179
                    // Set status to encountered if not already loaded (fixes null-status entries from older code)
180
                    collection("lists").updateMany(s, filter,
×
181
                            new Document("$set", new Document("status", EntryStatus.encountered.getValue())));
×
182
                    discovered++;
×
183
                }
184
            }
×
185
            log.info("Discovered {} new pubkeys from peer {}", discovered, peerUrl);
×
186
        } catch (Exception ex) {
3✔
187
            log.info("Failed to discover pubkeys from {}: {}", peerUrl, ex.getMessage());
18✔
188
        }
×
189
    }
3✔
190

191
    static Document getPeerState(ClientSession s, String peerUrl) {
192
        try (MongoCursor<Document> cursor = collection(Collection.PEER_STATE.toString())
27✔
193
                .find(s, new Document("_id", peerUrl)).cursor()) {
9✔
194
            return cursor.hasNext() ? cursor.next() : null;
33✔
195
        }
196
    }
197

198
    static void updatePeerState(ClientSession s, String peerUrl, long setupId, long seqNum) {
199
        collection(Collection.PEER_STATE.toString()).updateOne(s,
63✔
200
                new Document("_id", peerUrl),
201
                new Document("$set", new Document("_id", peerUrl)
202
                        .append("setupId", setupId)
12✔
203
                        .append("seqNum", seqNum)
12✔
204
                        // TODO(transition): Remove loadCounter after all peers upgraded
205
                        .append("loadCounter", seqNum)
9✔
206
                        .append("lastChecked", System.currentTimeMillis())),
24✔
207
                new com.mongodb.client.model.UpdateOptions().upsert(true));
3✔
208
    }
3✔
209

210
    static void deletePeerState(ClientSession s, String peerUrl) {
211
        collection(Collection.PEER_STATE.toString()).deleteOne(s, new Document("_id", peerUrl));
33✔
212
    }
3✔
213

214
    static boolean isTestInstance(HttpResponse resp) {
215
        return "true".equals(getHeader(resp, "Nanopub-Registry-Test-Instance"));
18✔
216
    }
217

218
    static String getHeader(HttpResponse resp, String name) {
219
        return resp.getFirstHeader(name) != null ? resp.getFirstHeader(name).getValue() : null;
33✔
220
    }
221

222
    static Long getHeaderLong(HttpResponse resp, String name) {
223
        String value = getHeader(resp, name);
12✔
224
        if (value == null || "null".equals(value)) return null;
24✔
225
        try {
226
            return Long.parseLong(value);
12✔
227
        } catch (NumberFormatException ex) {
3✔
228
            return null;
6✔
229
        }
230
    }
231

232
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc