• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

knowledgepixels / nanopub-registry / 23244613129

18 Mar 2026 12:26PM UTC coverage: 30.259% (+0.05%) from 30.21%
23244613129

push

github

web-flow
Merge pull request #85 from knowledgepixels/feature/remove-legacy-server-push

Stop pushing nanopubs to legacy server and skip test-instance peers

185 of 680 branches covered (27.21%)

Branch coverage included in aggregate %.

622 of 1987 relevant lines covered (31.3%)

5.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

33.93
src/main/java/com/knowledgepixels/registry/RegistryPeerConnector.java
1
package com.knowledgepixels.registry;
2

3
import com.mongodb.ErrorCategory;
4
import com.mongodb.MongoWriteException;
5
import com.mongodb.client.ClientSession;
6
import com.mongodb.client.MongoCursor;
7
import org.apache.http.HttpResponse;
8
import org.apache.http.client.methods.HttpGet;
9
import org.apache.http.client.methods.HttpHead;
10
import org.apache.http.util.EntityUtils;
11
import org.bson.Document;
12
import org.nanopub.Nanopub;
13
import org.nanopub.NanopubUtils;
14
import org.nanopub.jelly.NanopubStream;
15
import org.slf4j.Logger;
16
import org.slf4j.LoggerFactory;
17

18
import java.io.IOException;
19
import java.io.InputStream;
20
import java.util.ArrayList;
21
import java.util.Collections;
22
import java.util.List;
23

24
import static com.knowledgepixels.registry.RegistryDB.*;
25

26
/**
27
 * Checks peer Nanopub Registries for new nanopublications and loads them.
28
 */
29
public class RegistryPeerConnector {
30

31
    private RegistryPeerConnector() {}
32

33
    private static final Logger log = LoggerFactory.getLogger(RegistryPeerConnector.class);
12✔
34

35
    public static void checkPeers(ClientSession s) {
36
        List<String> peerUrls = new ArrayList<>(Utils.getPeerUrls());
×
37
        Collections.shuffle(peerUrls);
×
38

39
        for (String peerUrl : peerUrls) {
×
40
            try {
41
                checkPeer(s, peerUrl);
×
42
            } catch (Exception ex) {
×
43
                log.info("Error checking peer {}: {}", peerUrl, ex.getMessage());
×
44
            }
×
45
        }
×
46
    }
×
47

48
    static void checkPeer(ClientSession s, String peerUrl) throws IOException {
49
        log.info("Checking peer: {}", peerUrl);
×
50

51
        HttpResponse resp = NanopubUtils.getHttpClient().execute(new HttpHead(peerUrl));
×
52
        int httpStatus = resp.getStatusLine().getStatusCode();
×
53
        EntityUtils.consumeQuietly(resp.getEntity());
×
54
        if (httpStatus < 200 || httpStatus >= 300) {
×
55
            log.info("Failed to reach peer {}: {}", peerUrl, httpStatus);
×
56
            return;
×
57
        }
58

59
        if (isTestInstance(resp)) {
×
60
            log.info("Skipping peer {} because it is a test instance", peerUrl);
×
61
            return;
×
62
        }
63

64
        String status = getHeader(resp, "Nanopub-Registry-Status");
×
65
        if (!"ready".equals(status) && !"updating".equals(status)) {
×
66
            log.info("Peer {} in non-ready state: {}", peerUrl, status);
×
67
            return;
×
68
        }
69

70
        Long peerSetupId = getHeaderLong(resp, "Nanopub-Registry-Setup-Id");
×
71
        Long peerLoadCounter = getHeaderLong(resp, "Nanopub-Registry-Load-Counter");
×
72
        if (peerSetupId == null || peerLoadCounter == null) {
×
73
            log.info("Peer {} missing setupId or loadCounter headers", peerUrl);
×
74
            return;
×
75
        }
76

77
        syncWithPeer(s, peerUrl, peerSetupId, peerLoadCounter);
×
78
    }
×
79

80
    static void syncWithPeer(ClientSession s, String peerUrl, long peerSetupId, long peerLoadCounter) {
81
        Document peerState = getPeerState(s, peerUrl);
12✔
82
        Long lastSetupId = peerState != null ? peerState.getLong("setupId") : null;
24✔
83
        Long lastLoadCounter = peerState != null ? peerState.getLong("loadCounter") : null;
24✔
84

85
        if (lastSetupId != null && !lastSetupId.equals(peerSetupId)) {
21✔
86
            log.info("Peer {} was reset (setupId changed), resetting tracking", peerUrl);
12✔
87
            deletePeerState(s, peerUrl);
9✔
88
            lastLoadCounter = null;
6✔
89
        }
90

91
        if (lastLoadCounter != null && lastLoadCounter.equals(peerLoadCounter)) {
21!
92
            log.info("Peer {} has no new nanopubs (loadCounter unchanged: {})", peerUrl, peerLoadCounter);
21✔
93
        } else if (lastLoadCounter != null) {
6!
94
            // Fetch all nanopubs added since our last known position.
95
            // TODO Add per-pubkey afterCounter tracking for more targeted incremental sync
96
            long delta = peerLoadCounter - lastLoadCounter;
×
97
            log.info("Peer {} has {} new nanopubs, fetching recent", peerUrl, delta);
×
98
            loadRecentNanopubs(s, peerUrl, lastLoadCounter);
×
99
        } else {
×
100
            log.info("Peer {} is new, pubkey discovery will handle initial sync", peerUrl);
12✔
101
        }
102

103
        discoverPubkeys(s, peerUrl);
9✔
104
        updatePeerState(s, peerUrl, peerSetupId, peerLoadCounter);
15✔
105
    }
3✔
106

107
    private static void loadRecentNanopubs(ClientSession s, String peerUrl, long afterCounter) {
108
        String requestUrl = peerUrl + "nanopubs.jelly?afterCounter=" + afterCounter;
×
109
        log.info("Fetching recent nanopubs from: {}", requestUrl);
×
110
        try {
111
            HttpResponse resp = NanopubUtils.getHttpClient().execute(new HttpGet(requestUrl));
×
112
            int httpStatus = resp.getStatusLine().getStatusCode();
×
113
            if (httpStatus < 200 || httpStatus >= 300) {
×
114
                EntityUtils.consumeQuietly(resp.getEntity());
×
115
                log.info("Request failed: {} {}", requestUrl, httpStatus);
×
116
                return;
×
117
            }
118
            try (InputStream is = resp.getEntity().getContent()) {
×
119
                NanopubStream.fromByteStream(is).getAsNanopubs().forEach(m -> {
×
120
                    if (m.isSuccess()) {
×
121
                        Nanopub np = null;
×
122
                        try {
123
                            np = m.getNanopub();
×
124
                            RegistryDB.loadNanopub(s, np);
×
125
                            NanopubLoader.simpleLoad(s, np);
×
126
                        } catch (Exception ex) {
×
127
                            log.warn("Skipping nanopub {} during recent fetch: {}",
×
128
                                    np != null ? np.getUri() : "unknown", ex.getMessage());
×
129
                        }
×
130
                    }
131
                });
×
132
            }
133
        } catch (IOException ex) {
×
134
            log.info("Failed to fetch recent nanopubs from {}: {}", peerUrl, ex.getMessage());
×
135
        }
×
136
    }
×
137

138
    static void discoverPubkeys(ClientSession s, String peerUrl) {
139
        log.info("Discovering pubkeys from peer: {}", peerUrl);
12✔
140
        try {
141
            List<String> peerPubkeys = Utils.retrieveListFromJsonUrl(peerUrl + "pubkeys.json");
×
142
            int discovered = 0;
×
143
            for (String pubkeyHash : peerPubkeys) {
×
144
                Document filter = new Document("pubkey", pubkeyHash).append("type", NanopubLoader.INTRO_TYPE_HASH);
×
145
                if (!has(s, "lists", filter)) {
×
146
                    try {
147
                        insert(s, "lists", new Document("pubkey", pubkeyHash)
×
148
                                .append("type", NanopubLoader.INTRO_TYPE_HASH)
×
149
                                .append("status", EntryStatus.encountered.getValue()));
×
150
                    } catch (MongoWriteException e) {
×
151
                        if (e.getError().getCategory() != ErrorCategory.DUPLICATE_KEY) throw e;
×
152
                    }
×
153
                    discovered++;
×
154
                } else if (!has(s, "lists", new Document(filter).append("status", EntryStatus.loaded.getValue()))) {
×
155
                    // Set status to encountered if not already loaded (fixes null-status entries from older code)
156
                    collection("lists").updateMany(s, filter,
×
157
                            new Document("$set", new Document("status", EntryStatus.encountered.getValue())));
×
158
                    discovered++;
×
159
                }
160
            }
×
161
            log.info("Discovered {} new pubkeys from peer {}", discovered, peerUrl);
×
162
        } catch (Exception ex) {
3✔
163
            log.info("Failed to discover pubkeys from {}: {}", peerUrl, ex.getMessage());
18✔
164
        }
×
165
    }
3✔
166

167
    static Document getPeerState(ClientSession s, String peerUrl) {
168
        try (MongoCursor<Document> cursor = collection(Collection.PEER_STATE.toString())
27✔
169
                .find(s, new Document("_id", peerUrl)).cursor()) {
9✔
170
            return cursor.hasNext() ? cursor.next() : null;
33✔
171
        }
172
    }
173

174
    static void updatePeerState(ClientSession s, String peerUrl, long setupId, long loadCounter) {
175
        collection(Collection.PEER_STATE.toString()).updateOne(s,
63✔
176
                new Document("_id", peerUrl),
177
                new Document("$set", new Document("_id", peerUrl)
178
                        .append("setupId", setupId)
12✔
179
                        .append("loadCounter", loadCounter)
9✔
180
                        .append("lastChecked", System.currentTimeMillis())),
24✔
181
                new com.mongodb.client.model.UpdateOptions().upsert(true));
3✔
182
    }
3✔
183

184
    static void deletePeerState(ClientSession s, String peerUrl) {
185
        collection(Collection.PEER_STATE.toString()).deleteOne(s, new Document("_id", peerUrl));
33✔
186
    }
3✔
187

188
    static boolean isTestInstance(HttpResponse resp) {
189
        return "true".equals(getHeader(resp, "Nanopub-Registry-Test-Instance"));
18✔
190
    }
191

192
    static String getHeader(HttpResponse resp, String name) {
193
        return resp.getFirstHeader(name) != null ? resp.getFirstHeader(name).getValue() : null;
33✔
194
    }
195

196
    static Long getHeaderLong(HttpResponse resp, String name) {
197
        String value = getHeader(resp, name);
12✔
198
        if (value == null || "null".equals(value)) return null;
24✔
199
        try {
200
            return Long.parseLong(value);
12✔
201
        } catch (NumberFormatException ex) {
3✔
202
            return null;
6✔
203
        }
204
    }
205

206
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc