• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

knowledgepixels / nanopub-registry / 22896903711

10 Mar 2026 09:56AM UTC coverage: 26.992% (-0.2%) from 27.147%
22896903711

push

github

tkuhn
fix: improve resilience of peer sync

- Handle malformed nanopub timestamps gracefully instead of rejecting
  the nanopub; treat as no timestamp
- Skip individual nanopubs that fail during full/recent fetch instead of
  aborting the entire stream
- Catch all exceptions (not just IOException) in loadAllNanopubs so
  updatePeerState is always called
- Checkpoint fullFetchPosition every 1000 nanopubs so progress survives
  process restarts

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

170 of 690 branches covered (24.64%)

Branch coverage included in aggregate %.

555 of 1996 relevant lines covered (27.81%)

4.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

40.0
src/main/java/com/knowledgepixels/registry/RegistryPeerConnector.java
1
package com.knowledgepixels.registry;
2

3
import com.mongodb.ErrorCategory;
4
import com.mongodb.MongoWriteException;
5
import com.mongodb.client.ClientSession;
6
import com.mongodb.client.MongoCursor;
7
import org.apache.http.HttpResponse;
8
import org.apache.http.client.methods.HttpGet;
9
import org.apache.http.client.methods.HttpHead;
10
import org.apache.http.util.EntityUtils;
11
import org.bson.Document;
12
import org.nanopub.Nanopub;
13
import org.nanopub.NanopubUtils;
14
import org.nanopub.jelly.NanopubStream;
15
import org.slf4j.Logger;
16
import org.slf4j.LoggerFactory;
17

18
import java.io.IOException;
19
import java.io.InputStream;
20
import java.util.ArrayList;
21
import java.util.Collections;
22
import java.util.List;
23
import java.util.concurrent.atomic.AtomicLong;
24

25
import static com.knowledgepixels.registry.RegistryDB.*;
26

27
/**
28
 * Checks peer Nanopub Registries for new nanopublications and loads them.
29
 */
30
public class RegistryPeerConnector {
31

32
    private RegistryPeerConnector() {}
33

34
    private static final Logger log = LoggerFactory.getLogger(RegistryPeerConnector.class);
12✔
35

36
    public static void checkPeers(ClientSession s) {
37
        List<String> peerUrls = new ArrayList<>(Utils.getPeerUrls());
×
38
        Collections.shuffle(peerUrls);
×
39

40
        for (String peerUrl : peerUrls) {
×
41
            try {
42
                checkPeer(s, peerUrl);
×
43
            } catch (Exception ex) {
×
44
                log.info("Error checking peer {}: {}", peerUrl, ex.getMessage());
×
45
            }
×
46
        }
×
47
    }
×
48

49
    static void checkPeer(ClientSession s, String peerUrl) throws IOException {
50
        log.info("Checking peer: {}", peerUrl);
×
51

52
        HttpResponse resp = NanopubUtils.getHttpClient().execute(new HttpHead(peerUrl));
×
53
        int httpStatus = resp.getStatusLine().getStatusCode();
×
54
        EntityUtils.consumeQuietly(resp.getEntity());
×
55
        if (httpStatus < 200 || httpStatus >= 300) {
×
56
            log.info("Failed to reach peer {}: {}", peerUrl, httpStatus);
×
57
            return;
×
58
        }
59

60
        String status = getHeader(resp, "Nanopub-Registry-Status");
×
61
        if (!"ready".equals(status) && !"updating".equals(status)) {
×
62
            log.info("Peer {} in non-ready state: {}", peerUrl, status);
×
63
            return;
×
64
        }
65

66
        Long peerSetupId = getHeaderLong(resp, "Nanopub-Registry-Setup-Id");
×
67
        Long peerLoadCounter = getHeaderLong(resp, "Nanopub-Registry-Load-Counter");
×
68
        if (peerSetupId == null || peerLoadCounter == null) {
×
69
            log.info("Peer {} missing setupId or loadCounter headers", peerUrl);
×
70
            return;
×
71
        }
72

73
        syncWithPeer(s, peerUrl, peerSetupId, peerLoadCounter);
×
74
    }
×
75

76
    static void syncWithPeer(ClientSession s, String peerUrl, long peerSetupId, long peerLoadCounter) {
77
        Document peerState = getPeerState(s, peerUrl);
12✔
78
        Long lastSetupId = peerState != null ? peerState.getLong("setupId") : null;
24✔
79
        Long lastLoadCounter = peerState != null ? peerState.getLong("loadCounter") : null;
24✔
80
        Boolean fullFetchDone = peerState != null ? peerState.getBoolean("fullFetchDone") : null;
24✔
81

82
        if (lastSetupId != null && !lastSetupId.equals(peerSetupId)) {
21✔
83
            log.info("Peer {} was reset (setupId changed), resetting tracking", peerUrl);
12✔
84
            deletePeerState(s, peerUrl);
9✔
85
            lastLoadCounter = null;
6✔
86
            fullFetchDone = null;
6✔
87
        }
88

89
        if (lastLoadCounter != null && lastLoadCounter.equals(peerLoadCounter)) {
21✔
90
            log.info("Peer {} has no new nanopubs (loadCounter unchanged: {})", peerUrl, peerLoadCounter);
21✔
91
        } else if (lastLoadCounter != null) {
6✔
92
            // Fetch all nanopubs added since our last known position.
93
            // This works for any delta size; the full fetch covers the first-sync case.
94
            // TODO Add per-pubkey afterCounter tracking for more targeted incremental sync
95
            long delta = peerLoadCounter - lastLoadCounter;
15✔
96
            log.info("Peer {} has {} new nanopubs, fetching recent", peerUrl, delta);
18✔
97
            loadRecentNanopubs(s, peerUrl, lastLoadCounter);
15✔
98
        } else {
3✔
99
            log.info("Peer {} is new, full fetch will handle initial sync", peerUrl);
12✔
100
        }
101

102
        // TODO Remove full fetch once incremental sync covers all nanopubs (including non-approved pubkeys)
103
        boolean fullFetchSucceeded = fullFetchDone != null && fullFetchDone;
27!
104
        if (!fullFetchSucceeded) {
6✔
105
            Long fullFetchPosition = peerState != null ? peerState.getLong("fullFetchPosition") : null;
24✔
106
            long afterCounter = fullFetchPosition != null ? fullFetchPosition : -1;
12!
107
            fullFetchSucceeded = loadAllNanopubs(s, peerUrl, afterCounter);
15✔
108
        }
109

110
        discoverPubkeys(s, peerUrl);
9✔
111
        updatePeerState(s, peerUrl, peerSetupId, peerLoadCounter, fullFetchSucceeded);
18✔
112
    }
3✔
113

114
    private static boolean loadAllNanopubs(ClientSession s, String peerUrl, long afterCounter) {
115
        String requestUrl = peerUrl + "nanopubs.jelly?afterCounter=" + afterCounter;
12✔
116
        log.info("Full fetch of all nanopubs from: {} (resuming after counter {})", requestUrl, afterCounter);
18✔
117
        AtomicLong lastCounter = new AtomicLong(afterCounter);
15✔
118
        AtomicLong processedCount = new AtomicLong(0);
15✔
119
        boolean completed = false;
6✔
120
        try {
121
            HttpResponse resp = NanopubUtils.getHttpClient().execute(new HttpGet(requestUrl));
×
122
            int httpStatus = resp.getStatusLine().getStatusCode();
×
123
            if (httpStatus < 200 || httpStatus >= 300) {
×
124
                EntityUtils.consumeQuietly(resp.getEntity());
×
125
                log.info("Request failed: {} {}", requestUrl, httpStatus);
×
126
                return false;
×
127
            }
128
            // Use a dedicated session outside any wrapping transaction to avoid
129
            // MongoDB transaction timeout on large streams.
130
            try (InputStream is = resp.getEntity().getContent();
×
131
                 ClientSession loadSession = RegistryDB.getClient().startSession()) {
×
132
                NanopubStream.fromByteStream(is).getAsNanopubs().forEach(m -> {
×
133
                    if (m.isSuccess()) {
×
134
                        Nanopub np = null;
×
135
                        try {
136
                            np = m.getNanopub();
×
137
                            RegistryDB.loadNanopub(loadSession, np);
×
138
                            NanopubLoader.simpleLoad(loadSession, np);
×
139
                        } catch (Exception ex) {
×
140
                            log.warn("Skipping nanopub {} during full fetch: {}",
×
141
                                    np != null ? np.getUri() : "unknown", ex.getMessage());
×
142
                        }
×
143
                    }
144
                    if (m.getCounter() > 0) {
×
145
                        lastCounter.set(m.getCounter());
×
146
                    }
147
                    long count = processedCount.incrementAndGet();
×
148
                    if (count % 1000 == 0) {
×
149
                        log.info("Full fetch progress: {} nanopubs processed (counter: {})", count, lastCounter.get());
×
150
                        saveFullFetchPosition(s, peerUrl, lastCounter.get());
×
151
                    }
152
                });
×
153
            }
154
            completed = true;
×
155
            return true;
×
156
        } catch (Exception ex) {
3✔
157
            log.info("Failed to fetch all nanopubs from {}: {}", peerUrl, ex.getMessage());
18✔
158
            return false;
12✔
159
        } finally {
160
            if (!completed && lastCounter.get() > afterCounter) {
21!
161
                log.info("Full fetch interrupted at counter {}; saving position for resume", lastCounter.get());
×
162
                saveFullFetchPosition(s, peerUrl, lastCounter.get());
×
163
            }
164
        }
165
    }
166

167
    private static void saveFullFetchPosition(ClientSession s, String peerUrl, long position) {
168
        collection(Collection.PEER_STATE.toString()).updateOne(s,
×
169
                new Document("_id", peerUrl),
170
                new Document("$set", new Document("fullFetchPosition", position)),
×
171
                new com.mongodb.client.model.UpdateOptions().upsert(true));
×
172
    }
×
173

174
    private static void loadRecentNanopubs(ClientSession s, String peerUrl, long afterCounter) {
175
        String requestUrl = peerUrl + "nanopubs.jelly?afterCounter=" + afterCounter;
12✔
176
        log.info("Fetching recent nanopubs from: {}", requestUrl);
12✔
177
        try {
178
            HttpResponse resp = NanopubUtils.getHttpClient().execute(new HttpGet(requestUrl));
×
179
            int httpStatus = resp.getStatusLine().getStatusCode();
×
180
            if (httpStatus < 200 || httpStatus >= 300) {
×
181
                EntityUtils.consumeQuietly(resp.getEntity());
×
182
                log.info("Request failed: {} {}", requestUrl, httpStatus);
×
183
                return;
×
184
            }
185
            try (InputStream is = resp.getEntity().getContent()) {
×
186
                NanopubStream.fromByteStream(is).getAsNanopubs().forEach(m -> {
×
187
                    if (m.isSuccess()) {
×
188
                        Nanopub np = null;
×
189
                        try {
190
                            np = m.getNanopub();
×
191
                            RegistryDB.loadNanopub(s, np);
×
192
                            NanopubLoader.simpleLoad(s, np);
×
193
                        } catch (Exception ex) {
×
194
                            log.warn("Skipping nanopub {} during recent fetch: {}",
×
195
                                    np != null ? np.getUri() : "unknown", ex.getMessage());
×
196
                        }
×
197
                    }
198
                });
×
199
            }
200
        } catch (IOException ex) {
3✔
201
            log.info("Failed to fetch recent nanopubs from {}: {}", peerUrl, ex.getMessage());
18✔
202
        }
×
203
    }
3✔
204

205
    static void discoverPubkeys(ClientSession s, String peerUrl) {
206
        log.info("Discovering pubkeys from peer: {}", peerUrl);
12✔
207
        try {
208
            List<String> peerPubkeys = Utils.retrieveListFromJsonUrl(peerUrl + "pubkeys.json");
×
209
            int discovered = 0;
×
210
            for (String pubkeyHash : peerPubkeys) {
×
211
                Document filter = new Document("pubkey", pubkeyHash).append("type", NanopubLoader.INTRO_TYPE_HASH);
×
212
                if (!has(s, "lists", filter)) {
×
213
                    try {
214
                        insert(s, "lists", new Document("pubkey", pubkeyHash)
×
215
                                .append("type", NanopubLoader.INTRO_TYPE_HASH)
×
216
                                .append("status", EntryStatus.encountered.getValue()));
×
217
                    } catch (MongoWriteException e) {
×
218
                        if (e.getError().getCategory() != ErrorCategory.DUPLICATE_KEY) throw e;
×
219
                    }
×
220
                    discovered++;
×
221
                } else if (!has(s, "lists", new Document(filter).append("status", EntryStatus.loaded.getValue()))) {
×
222
                    // Set status to encountered if not already loaded (fixes null-status entries from older code)
223
                    collection("lists").updateMany(s, filter,
×
224
                            new Document("$set", new Document("status", EntryStatus.encountered.getValue())));
×
225
                    discovered++;
×
226
                }
227
            }
×
228
            log.info("Discovered {} new pubkeys from peer {}", discovered, peerUrl);
×
229
        } catch (Exception ex) {
3✔
230
            log.info("Failed to discover pubkeys from {}: {}", peerUrl, ex.getMessage());
18✔
231
        }
×
232
    }
3✔
233

234
    static Document getPeerState(ClientSession s, String peerUrl) {
235
        try (MongoCursor<Document> cursor = collection(Collection.PEER_STATE.toString())
27✔
236
                .find(s, new Document("_id", peerUrl)).cursor()) {
9✔
237
            return cursor.hasNext() ? cursor.next() : null;
33✔
238
        }
239
    }
240

241
    static void updatePeerState(ClientSession s, String peerUrl, long setupId, long loadCounter, boolean fullFetchDone) {
242
        collection(Collection.PEER_STATE.toString()).updateOne(s,
63✔
243
                new Document("_id", peerUrl),
244
                new Document("$set", new Document("_id", peerUrl)
245
                        .append("setupId", setupId)
12✔
246
                        .append("loadCounter", loadCounter)
12✔
247
                        .append("fullFetchDone", fullFetchDone)
9✔
248
                        .append("lastChecked", System.currentTimeMillis())),
24✔
249
                new com.mongodb.client.model.UpdateOptions().upsert(true));
3✔
250
    }
3✔
251

252
    static void deletePeerState(ClientSession s, String peerUrl) {
253
        collection(Collection.PEER_STATE.toString()).deleteOne(s, new Document("_id", peerUrl));
33✔
254
    }
3✔
255

256
    static String getHeader(HttpResponse resp, String name) {
257
        return resp.getFirstHeader(name) != null ? resp.getFirstHeader(name).getValue() : null;
33✔
258
    }
259

260
    static Long getHeaderLong(HttpResponse resp, String name) {
261
        String value = getHeader(resp, name);
12✔
262
        if (value == null || "null".equals(value)) return null;
24✔
263
        try {
264
            return Long.parseLong(value);
12✔
265
        } catch (NumberFormatException ex) {
3✔
266
            return null;
6✔
267
        }
268
    }
269

270
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc