• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 16017934618

02 Jul 2025 06:38AM UTC coverage: 71.89% (+0.3%) from 71.633%
16017934618

push

github

web-flow
feat!: expand gRPC readiness status to contain current processed block info (#7262)

Description
---
Fixes: #7222 
Continues previous PR #7240 to contain more granular data including
newest processed block.

The response now contains current progress of migration as current
processed block and total blocks and also current database version and
the latest db version:

```json
{
  "metadata": null,
  "initialSyncAchieved": false,
  "baseNodeState": "START_UP",
  "failedCheckpoints": false,
  "reward": "0",
  "sha3xEstimatedHashRate": "0",
  "moneroRandomxEstimatedHashRate": "0",
  "numConnections": "0",
  "livenessResults": [],
  "tariRandomxEstimatedHashRate": "0",
  "readinessStatus": {
    "migration": {
      "currentBlock": "2800",
      "totalBlocks": "3934",
      "progressPercentage": 71.17437722419929,
      "currentDbVersion": "1",
      "targetDbVersion": "3"
    },
    "timestamp": "1751223176570"
  }
}
```

Motivation and Context
---
This should help gRPC consumers like Tari Universe to get a better
feedback of overall `minotari_node` health status.

How Has This Been Tested?
---
Cleared whole database and launched older version of `minotari_node` in
my case it was 4.1.0. This should initialize blockchain database with
version `1`. After syncing launch newer version - in my case it was
4.5.0 and let `minotari_node` run migration on the database.
Periodically fetch `GetNetworkStatus` from `readiness_grpc_server` to
watch migration progress with command like this:
```sh
watch -n 1 grpcurl -plaintext --emit-defaults -import-path applications/minotari_app_grpc/proto/ -proto applications/minotari_app_grpc/proto/base_node.proto 127.0.0.1:33277 tari.rpc.BaseNode/GetNetworkState
```

What process can a PR reviewer use to test or verify this change?
---
Same as above. Preferably run older version of `minotari_node` like
4.0.0 or 4.1.0.

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excl... (continued)

342 of 399 new or added lines in 5 files covered. (85.71%)

24 existing lines in 7 files now uncovered.

83410 of 116025 relevant lines covered (71.89%)

238897.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.28
/comms/core/src/peer_manager/peer_storage_sql.rs
1
//  Copyright 2019 The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{cmp::min, time::Duration};
24

25
use log::*;
26
use multiaddr::Multiaddr;
27

28
use crate::{
29
    net_address::PeerAddressSource,
30
    peer_manager::{
31
        database::{PeerDatabaseSql, ThisPeerIdentity},
32
        peer::Peer,
33
        peer_id::PeerId,
34
        NodeDistance,
35
        NodeId,
36
        PeerFeatures,
37
        PeerManagerError,
38
    },
39
    types::{CommsDatabase, CommsPublicKey},
40
};
41

42
const LOG_TARGET: &str = "comms::peer_manager::peer_storage_sql";
43
// The maximum number of peers to return in peer manager
44
const PEER_MANAGER_SYNC_PEERS: usize = 100;
45
// The maximum amount of time a peer can be inactive before being considered stale:
46
// ((5 days, 24h, 60m, 60s)/2 = 2.5 days)
47
pub const STALE_PEER_THRESHOLD_DURATION: Duration = Duration::from_secs(5 * 24 * 60 * 60 / 2);
48

49
/// PeerStorageSql provides a mechanism to keep a datastore and a local copy of all peers in sync and allow fast
50
/// searches using the node_id, public key or net_address of a peer.
51
#[derive(Clone)]
52
pub struct PeerStorageSql {
53
    peer_db: PeerDatabaseSql,
54
}
55

56
impl PeerStorageSql {
57
    /// Constructs a new PeerStorageSql, with indexes populated from the given datastore
58
    pub fn new_indexed(database: PeerDatabaseSql) -> Result<PeerStorageSql, PeerManagerError> {
242✔
59
        trace!(
242✔
60
            target: LOG_TARGET,
×
61
            "Peer storage is initialized. {} total entries.",
×
62
            database.size(),
×
63
        );
64

65
        Ok(PeerStorageSql { peer_db: database })
242✔
66
    }
242✔
67

68
    /// Get this peer's identity
69
    pub fn this_peer_identity(&self) -> ThisPeerIdentity {
1,517✔
70
        self.peer_db.this_peer_identity()
1,517✔
71
    }
1,517✔
72

73
    /// Get the size of the database
74
    pub fn count(&self) -> usize {
51,354✔
75
        self.peer_db.size()
51,354✔
76
    }
51,354✔
77

78
    /// Adds or updates a peer and sets the last connection as successful.
79
    /// If the peer is marked as offline, it will be unmarked.
80
    pub fn add_or_update_peer(&self, peer: Peer) -> Result<PeerId, PeerManagerError> {
49,877✔
81
        Ok(self.peer_db.add_or_update_peer(peer)?)
49,877✔
82
    }
49,877✔
83

84
    /// Adds a peer an online peer if the peer does not already exist. When a peer already
85
    /// exists, the stored version will be replaced with the newly provided peer.
86
    pub fn add_or_update_online_peer(
1✔
87
        &self,
1✔
88
        pubkey: &CommsPublicKey,
1✔
89
        node_id: &NodeId,
1✔
90
        addresses: &[Multiaddr],
1✔
91
        peer_features: &PeerFeatures,
1✔
92
        source: &PeerAddressSource,
1✔
93
    ) -> Result<Peer, PeerManagerError> {
1✔
94
        Ok(self
1✔
95
            .peer_db
1✔
96
            .add_or_update_online_peer(pubkey, node_id, addresses, peer_features, source)?)
1✔
97
    }
1✔
98

99
    /// The peer with the specified node id will be soft deleted (marked as deleted)
100
    pub fn soft_delete_peer(&self, node_id: &NodeId) -> Result<(), PeerManagerError> {
1✔
101
        self.peer_db.soft_delete_peer(node_id)?;
1✔
102
        Ok(())
1✔
103
    }
1✔
104

105
    /// Find the peer with the provided NodeID
106
    pub fn get_peer_by_node_id(&self, node_id: &NodeId) -> Result<Option<Peer>, PeerManagerError> {
140,768✔
107
        Ok(self.peer_db.get_peer_by_node_id(node_id)?)
140,768✔
108
    }
140,768✔
109

110
    /// Get all peers based on a list of their node_ids
111
    pub fn get_peers_by_node_ids(&self, node_ids: &[NodeId]) -> Result<Vec<Peer>, PeerManagerError> {
1✔
112
        Ok(self.peer_db.get_peers_by_node_ids(node_ids)?)
1✔
113
    }
1✔
114

115
    /// Get all peers based on a list of their node_ids
116
    pub fn get_peer_public_keys_by_node_ids(
×
117
        &self,
×
118
        node_ids: &[NodeId],
×
119
    ) -> Result<Vec<CommsPublicKey>, PeerManagerError> {
×
120
        Ok(self.peer_db.get_peer_public_keys_by_node_ids(node_ids)?)
×
121
    }
×
122

123
    /// Get all banned peers
124
    pub fn get_banned_peers(&self) -> Result<Vec<Peer>, PeerManagerError> {
×
125
        Ok(self.peer_db.get_banned_peers()?)
×
126
    }
×
127

128
    pub fn find_all_starts_with(&self, partial: &[u8]) -> Result<Vec<Peer>, PeerManagerError> {
×
129
        Ok(self.peer_db.find_all_peers_match_partial_key(partial)?)
×
130
    }
×
131

132
    /// Find the peer with the provided PublicKey
133
    pub fn find_by_public_key(&self, public_key: &CommsPublicKey) -> Result<Option<Peer>, PeerManagerError> {
46,300✔
134
        Ok(self.peer_db.get_peer_by_public_key(public_key)?)
46,300✔
135
    }
46,300✔
136

137
    /// Check if a peer exist using the specified public_key
138
    pub fn exists_public_key(&self, public_key: &CommsPublicKey) -> Result<bool, PeerManagerError> {
17✔
139
        if let Ok(val) = self.peer_db.peer_exists_by_public_key(public_key) {
17✔
140
            Ok(val.is_some())
17✔
141
        } else {
142
            Ok(false)
×
143
        }
144
    }
17✔
145

146
    /// Check if a peer exist using the specified node_id
147
    pub fn exists_node_id(&self, node_id: &NodeId) -> Result<bool, PeerManagerError> {
×
148
        if let Ok(val) = self.peer_db.peer_exists_by_node_id(node_id) {
×
149
            Ok(val.is_some())
×
150
        } else {
151
            Ok(false)
×
152
        }
153
    }
×
154

155
    /// Return the peer by corresponding to the provided NodeId if it is not banned
156
    pub fn direct_identity_node_id(&self, node_id: &NodeId) -> Result<Peer, PeerManagerError> {
45,590✔
157
        let peer = self
45,590✔
158
            .get_peer_by_node_id(node_id)?
45,590✔
159
            .ok_or(PeerManagerError::peer_not_found(node_id))?;
45,590✔
160

161
        if peer.is_banned() {
45,589✔
UNCOV
162
            Err(PeerManagerError::BannedPeer)
×
163
        } else {
164
            Ok(peer)
45,589✔
165
        }
166
    }
45,590✔
167

168
    /// Return the peer by corresponding to the provided public key if it is not banned
169
    pub fn direct_identity_public_key(&self, public_key: &CommsPublicKey) -> Result<Peer, PeerManagerError> {
45,613✔
170
        let peer = self
45,613✔
171
            .find_by_public_key(public_key)?
45,613✔
172
            .ok_or(PeerManagerError::peer_not_found(&NodeId::from_public_key(public_key)))?;
45,613✔
173

174
        if peer.is_banned() {
45,607✔
UNCOV
175
            Err(PeerManagerError::BannedPeer)
×
176
        } else {
177
            Ok(peer)
45,607✔
178
        }
179
    }
45,613✔
180

181
    /// Return all peers, optionally filtering on supplied feature
182
    pub fn all(&self, features: Option<PeerFeatures>) -> Result<Vec<Peer>, PeerManagerError> {
1✔
183
        Ok(self.peer_db.get_all_peers(features)?)
1✔
184
    }
1✔
185

186
    /// Return "good" peers for syncing
187
    /// Criteria:
188
    ///  - Peer is not banned
189
    ///  - Peer has been seen within a defined time span (within the threshold)
190
    ///  - Only returns a maximum number of syncable peers (corresponds with the max possible number of requestable
191
    ///    peers to sync)
192
    ///  - Uses 0 as max PEER_MANAGER_SYNC_PEERS
193
    pub fn discovery_syncing(
5✔
194
        &self,
5✔
195
        mut n: usize,
5✔
196
        excluded_peers: &[NodeId],
5✔
197
        features: Option<PeerFeatures>,
5✔
198
    ) -> Result<Vec<Peer>, PeerManagerError> {
5✔
199
        if n == 0 {
5✔
200
            n = PEER_MANAGER_SYNC_PEERS;
×
201
        } else {
5✔
202
            n = min(n, PEER_MANAGER_SYNC_PEERS);
5✔
203
        }
5✔
204

205
        Ok(self
5✔
206
            .peer_db
5✔
207
            .get_n_random_active_peers(n, excluded_peers, features, Some(STALE_PEER_THRESHOLD_DURATION))?)
5✔
208
    }
5✔
209

210
    /// Compile a list of all known peers
211
    pub fn get_not_banned_or_deleted_peers(&self) -> Result<Vec<Peer>, PeerManagerError> {
1✔
212
        Ok(self
1✔
213
            .peer_db
1✔
214
            .get_n_not_banned_or_deleted_peers(PEER_MANAGER_SYNC_PEERS)?)
1✔
215
    }
1✔
216

217
    /// Get available dial candidates that are communication nodes, not banned, not deleted,
218
    /// and not in the excluded node IDs list
219
    pub fn get_available_dial_candidates(
615✔
220
        &self,
615✔
221
        exclude_node_ids: &[NodeId],
615✔
222
        limit: Option<usize>,
615✔
223
    ) -> Result<Vec<Peer>, PeerManagerError> {
615✔
224
        Ok(self.peer_db.get_available_dial_candidates(exclude_node_ids, limit)?)
615✔
225
    }
615✔
226

227
    /// Compile a list of closest `n` active peers
228
    pub fn closest_n_active_peers(
3,240✔
229
        &self,
3,240✔
230
        region_node_id: &NodeId,
3,240✔
231
        n: usize,
3,240✔
232
        excluded_peers: &[NodeId],
3,240✔
233
        features: Option<PeerFeatures>,
3,240✔
234
        stale_peer_threshold: Option<Duration>,
3,240✔
235
        exclude_if_all_address_failed: bool,
3,240✔
236
        exclusion_distance: Option<NodeDistance>,
3,240✔
237
    ) -> Result<Vec<Peer>, PeerManagerError> {
3,240✔
238
        Ok(self.peer_db.get_closest_n_active_peers(
3,240✔
239
            region_node_id,
3,240✔
240
            n,
3,240✔
241
            excluded_peers,
3,240✔
242
            features,
3,240✔
243
            stale_peer_threshold,
3,240✔
244
            exclude_if_all_address_failed,
3,240✔
245
            exclusion_distance,
3,240✔
246
        )?)
3,240✔
247
    }
3,240✔
248

249
    pub fn get_seed_peers(&self) -> Result<Vec<Peer>, PeerManagerError> {
1✔
250
        Ok(self.peer_db.get_seed_peers()?)
1✔
251
    }
1✔
252

253
    /// Compile a random list of communication node peers of size _n_ that are not banned or offline
254
    pub fn random_peers(&self, n: usize, exclude_peers: &[NodeId]) -> Result<Vec<Peer>, PeerManagerError> {
1,033✔
255
        Ok(self.peer_db.get_n_random_peers(n, exclude_peers)?)
1,033✔
256
    }
1,033✔
257

258
    /// Get the closest `n` not failed, banned or deleted peers, ordered by their distance to the given node ID.
259
    pub fn get_closest_n_good_standing_peers(
2✔
260
        &self,
2✔
261
        n: usize,
2✔
262
        features: PeerFeatures,
2✔
263
    ) -> Result<Vec<Peer>, PeerManagerError> {
2✔
264
        Ok(self.peer_db.get_closest_n_good_standing_peers(n, features)?)
2✔
265
    }
2✔
266

267
    /// Check if a specific node_id is in the network region of the N nearest neighbours of the region specified by
268
    /// region_node_id. If there are less than N known peers, this will _always_ return true
269
    pub fn in_network_region(&self, node_id: &NodeId, n: usize) -> Result<bool, PeerManagerError> {
4✔
270
        let region_node_id = self.this_peer_identity().node_id;
4✔
271
        let region_node_distance = region_node_id.distance(node_id);
4✔
272
        let node_threshold = self.calc_region_threshold(n, PeerFeatures::COMMUNICATION_NODE)?;
4✔
273
        // Is node ID in the base node threshold?
274
        if region_node_distance <= node_threshold {
4✔
275
            return Ok(true);
3✔
276
        }
1✔
277
        let client_threshold = self.calc_region_threshold(n, PeerFeatures::COMMUNICATION_CLIENT)?; // Is node ID in the base client threshold?
1✔
278
        Ok(region_node_distance <= client_threshold)
1✔
279
    }
4✔
280

281
    /// Calculate the threshold for the region specified by region_node_id.
282
    pub fn calc_region_threshold(&self, n: usize, features: PeerFeatures) -> Result<NodeDistance, PeerManagerError> {
9✔
283
        let region_node_id = self.this_peer_identity().node_id;
9✔
284
        if n == 0 {
9✔
285
            return Ok(NodeDistance::max_distance());
×
286
        }
9✔
287

288
        let closest_peers = self.peer_db.get_closest_n_good_standing_peer_node_ids(n, features)?;
9✔
289
        let mut dists = Vec::new();
9✔
290
        for node_id in closest_peers {
42✔
291
            dists.push(region_node_id.distance(&node_id));
33✔
292
        }
33✔
293

294
        if dists.is_empty() {
9✔
295
            return Ok(NodeDistance::max_distance());
×
296
        }
9✔
297

9✔
298
        // If we have less than `n` matching peers in our threshold group, the threshold should be max
9✔
299
        if dists.len() < n {
9✔
300
            return Ok(NodeDistance::max_distance());
1✔
301
        }
8✔
302

8✔
303
        Ok(dists.pop().expect("dists cannot be empty at this point"))
8✔
304
    }
9✔
305

306
    /// Unban the peer
307
    pub fn unban_peer(&self, node_id: &NodeId) -> Result<(), PeerManagerError> {
×
308
        let _node_id = self.peer_db.reset_banned(node_id)?;
×
309
        Ok(())
×
310
    }
×
311

312
    /// Unban the peer
313
    pub fn unban_all_peers(&self) -> Result<usize, PeerManagerError> {
×
314
        let number_unbanned = self.peer_db.reset_all_banned()?;
×
315
        Ok(number_unbanned)
×
316
    }
×
317

318
    pub fn reset_offline_non_wallet_peers(&self) -> Result<usize, PeerManagerError> {
×
319
        let number_offline = self.peer_db.reset_offline_non_wallet_peers()?;
×
320
        Ok(number_offline)
×
321
    }
×
322

323
    /// Ban the peer for the given duration
324
    pub fn ban_peer(
×
325
        &self,
×
326
        public_key: &CommsPublicKey,
×
327
        duration: Duration,
×
328
        reason: String,
×
329
    ) -> Result<NodeId, PeerManagerError> {
×
330
        let node_id = NodeId::from_key(public_key);
×
331
        self.peer_db
×
332
            .set_banned(&node_id, duration, reason)?
×
333
            .ok_or(PeerManagerError::peer_not_found(&NodeId::from_public_key(public_key)))
×
334
    }
×
335

336
    /// Ban the peer for the given duration
337
    pub fn ban_peer_by_node_id(
28✔
338
        &self,
28✔
339
        node_id: &NodeId,
28✔
340
        duration: Duration,
28✔
341
        reason: String,
28✔
342
    ) -> Result<NodeId, PeerManagerError> {
28✔
343
        self.peer_db
28✔
344
            .set_banned(node_id, duration, reason)?
28✔
345
            .ok_or(PeerManagerError::peer_not_found(node_id))
28✔
346
    }
28✔
347

348
    pub fn is_peer_banned(&self, node_id: &NodeId) -> Result<bool, PeerManagerError> {
3,135✔
349
        let peer = self
3,135✔
350
            .get_peer_by_node_id(node_id)?
3,135✔
351
            .ok_or(PeerManagerError::peer_not_found(node_id))?;
3,133✔
352
        Ok(peer.is_banned())
3,133✔
353
    }
3,135✔
354

355
    /// This will store metadata inside of the metadata field in the peer provided by the nodeID.
356
    /// It will return None if the value was empty and the old value if the value was updated
357
    pub fn set_peer_metadata(
2,441✔
358
        &self,
2,441✔
359
        node_id: &NodeId,
2,441✔
360
        key: u8,
2,441✔
361
        data: Vec<u8>,
2,441✔
362
    ) -> Result<Option<Vec<u8>>, PeerManagerError> {
2,441✔
363
        Ok(self.peer_db.set_metadata(node_id, key, data)?)
2,441✔
364
    }
2,441✔
365
}
366

367
#[allow(clippy::from_over_into)]
368
impl Into<CommsDatabase> for PeerStorageSql {
369
    fn into(self) -> CommsDatabase {
×
370
        self.peer_db
×
371
    }
×
372
}
373

374
#[cfg(test)]
375
mod test {
376
    use std::{borrow::BorrowMut, iter::repeat_with};
377

378
    use chrono::{DateTime, Utc};
379
    use multiaddr::Multiaddr;
380
    use rand::Rng;
381
    use tari_common_sqlite::connection::DbConnection;
382

383
    use super::*;
384
    use crate::{
385
        net_address::{MultiaddrWithStats, MultiaddressesWithStats, PeerAddressSource},
386
        peer_manager::{database::MIGRATIONS, peer::PeerFlags},
387
    };
388

389
    fn get_peer_db_sql_test_db() -> Result<PeerDatabaseSql, PeerManagerError> {
5✔
390
        let db_connection = DbConnection::connect_temp_file_and_migrate(MIGRATIONS).unwrap();
5✔
391
        Ok(PeerDatabaseSql::new(
5✔
392
            db_connection,
5✔
393
            &create_test_peer(PeerFeatures::COMMUNICATION_NODE, false),
5✔
394
        )?)
5✔
395
    }
5✔
396

397
    fn get_peer_storage_sql_test_db() -> Result<PeerStorageSql, PeerManagerError> {
4✔
398
        PeerStorageSql::new_indexed(get_peer_db_sql_test_db()?)
4✔
399
    }
4✔
400

401
    #[test]
402
    fn test_restore() {
1✔
403
        // Create Peers
1✔
404
        let mut rng = rand::rngs::OsRng;
1✔
405
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
1✔
406
        let node_id = NodeId::from_key(&pk);
1✔
407
        let net_address1 = "/ip4/1.2.3.4/tcp/8000".parse::<Multiaddr>().unwrap();
1✔
408
        let net_address2 = "/ip4/5.6.7.8/tcp/8000".parse::<Multiaddr>().unwrap();
1✔
409
        let net_address3 = "/ip4/5.6.7.8/tcp/7000".parse::<Multiaddr>().unwrap();
1✔
410
        let mut net_addresses =
1✔
411
            MultiaddressesWithStats::from_addresses_with_source(vec![net_address1], &PeerAddressSource::Config);
1✔
412
        net_addresses.add_address(&net_address2, &PeerAddressSource::Config);
1✔
413
        net_addresses.add_address(&net_address3, &PeerAddressSource::Config);
1✔
414
        let peer1 = Peer::new(
1✔
415
            pk,
1✔
416
            node_id,
1✔
417
            net_addresses,
1✔
418
            PeerFlags::default(),
1✔
419
            PeerFeatures::empty(),
1✔
420
            Default::default(),
1✔
421
            Default::default(),
1✔
422
        );
1✔
423

1✔
424
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
1✔
425
        let node_id = NodeId::from_key(&pk);
1✔
426
        let net_address4 = "/ip4/9.10.11.12/tcp/7000".parse::<Multiaddr>().unwrap();
1✔
427
        let net_addresses =
1✔
428
            MultiaddressesWithStats::from_addresses_with_source(vec![net_address4], &PeerAddressSource::Config);
1✔
429
        let peer2: Peer = Peer::new(
1✔
430
            pk,
1✔
431
            node_id,
1✔
432
            net_addresses,
1✔
433
            PeerFlags::default(),
1✔
434
            PeerFeatures::empty(),
1✔
435
            Default::default(),
1✔
436
            Default::default(),
1✔
437
        );
1✔
438

1✔
439
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
1✔
440
        let node_id = NodeId::from_key(&pk);
1✔
441
        let net_address5 = "/ip4/13.14.15.16/tcp/6000".parse::<Multiaddr>().unwrap();
1✔
442
        let net_address6 = "/ip4/17.18.19.20/tcp/8000".parse::<Multiaddr>().unwrap();
1✔
443
        let mut net_addresses =
1✔
444
            MultiaddressesWithStats::from_addresses_with_source(vec![net_address5], &PeerAddressSource::Config);
1✔
445
        net_addresses.add_address(&net_address6, &PeerAddressSource::Config);
1✔
446
        let peer3 = Peer::new(
1✔
447
            pk,
1✔
448
            node_id,
1✔
449
            net_addresses,
1✔
450
            PeerFlags::default(),
1✔
451
            PeerFeatures::empty(),
1✔
452
            Default::default(),
1✔
453
            Default::default(),
1✔
454
        );
1✔
455

1✔
456
        // Create new datastore with a peer database
1✔
457
        let mut db = Some(get_peer_db_sql_test_db().unwrap());
1✔
458
        {
1✔
459
            let peer_storage = db.take().unwrap();
1✔
460

1✔
461
            // Test adding and searching for peers
1✔
462
            assert!(peer_storage.add_or_update_peer(peer1.clone()).is_ok());
1✔
463
            assert!(peer_storage.add_or_update_peer(peer2.clone()).is_ok());
1✔
464
            assert!(peer_storage.add_or_update_peer(peer3.clone()).is_ok());
1✔
465

466
            assert_eq!(peer_storage.size(), 3);
1✔
467
            assert!(peer_storage.get_peer_by_public_key(&peer1.public_key).is_ok());
1✔
468
            assert!(peer_storage.get_peer_by_public_key(&peer2.public_key).is_ok());
1✔
469
            assert!(peer_storage.get_peer_by_public_key(&peer3.public_key).is_ok());
1✔
470
            db = Some(peer_storage);
1✔
471
        }
1✔
472
        // Restore from existing database
1✔
473
        let peer_storage = PeerStorageSql::new_indexed(db.take().unwrap()).unwrap();
1✔
474

1✔
475
        assert_eq!(peer_storage.peer_db.size(), 3);
1✔
476
        assert!(peer_storage.find_by_public_key(&peer1.public_key).is_ok());
1✔
477
        assert!(peer_storage.find_by_public_key(&peer2.public_key).is_ok());
1✔
478
        assert!(peer_storage.find_by_public_key(&peer3.public_key).is_ok());
1✔
479
    }
1✔
480

481
    #[allow(clippy::too_many_lines)]
482
    #[test]
483
    fn test_add_delete_find_peer() {
1✔
484
        let peer_storage = get_peer_storage_sql_test_db().unwrap();
1✔
485

1✔
486
        // Create Peers
1✔
487
        let mut rng = rand::rngs::OsRng;
1✔
488
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
1✔
489
        let node_id = NodeId::from_key(&pk);
1✔
490
        let net_address1 = "/ip4/1.2.3.4/tcp/8000".parse::<Multiaddr>().unwrap();
1✔
491
        let net_address2 = "/ip4/5.6.7.8/tcp/8000".parse::<Multiaddr>().unwrap();
1✔
492
        let net_address3 = "/ip4/5.6.7.8/tcp/7000".parse::<Multiaddr>().unwrap();
1✔
493
        let mut net_addresses =
1✔
494
            MultiaddressesWithStats::from_addresses_with_source(vec![net_address1], &PeerAddressSource::Config);
1✔
495
        net_addresses.add_address(&net_address2, &PeerAddressSource::Config);
1✔
496
        net_addresses.add_address(&net_address3, &PeerAddressSource::Config);
1✔
497
        let peer1 = Peer::new(
1✔
498
            pk,
1✔
499
            node_id,
1✔
500
            net_addresses,
1✔
501
            PeerFlags::default(),
1✔
502
            PeerFeatures::empty(),
1✔
503
            Default::default(),
1✔
504
            Default::default(),
1✔
505
        );
1✔
506

1✔
507
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
1✔
508
        let node_id = NodeId::from_key(&pk);
1✔
509
        let net_address4 = "/ip4/9.10.11.12/tcp/7000".parse::<Multiaddr>().unwrap();
1✔
510
        let net_addresses =
1✔
511
            MultiaddressesWithStats::from_addresses_with_source(vec![net_address4], &PeerAddressSource::Config);
1✔
512
        let peer2: Peer = Peer::new(
1✔
513
            pk,
1✔
514
            node_id,
1✔
515
            net_addresses,
1✔
516
            PeerFlags::default(),
1✔
517
            PeerFeatures::empty(),
1✔
518
            Default::default(),
1✔
519
            Default::default(),
1✔
520
        );
1✔
521

1✔
522
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
1✔
523
        let node_id = NodeId::from_key(&pk);
1✔
524
        let net_address5 = "/ip4/13.14.15.16/tcp/6000".parse::<Multiaddr>().unwrap();
1✔
525
        let net_address6 = "/ip4/17.18.19.20/tcp/8000".parse::<Multiaddr>().unwrap();
1✔
526
        let mut net_addresses =
1✔
527
            MultiaddressesWithStats::from_addresses_with_source(vec![net_address5], &PeerAddressSource::Config);
1✔
528
        net_addresses.add_address(&net_address6, &PeerAddressSource::Config);
1✔
529
        let peer3 = Peer::new(
1✔
530
            pk,
1✔
531
            node_id,
1✔
532
            net_addresses,
1✔
533
            PeerFlags::default(),
1✔
534
            PeerFeatures::empty(),
1✔
535
            Default::default(),
1✔
536
            Default::default(),
1✔
537
        );
1✔
538
        // Test adding and searching for peers
1✔
539
        peer_storage.add_or_update_peer(peer1.clone()).unwrap(); // assert!(peer_storage.add_or_update_peer(peer1.clone()).is_ok());
1✔
540
        assert!(peer_storage.add_or_update_peer(peer2.clone()).is_ok());
1✔
541
        assert!(peer_storage.add_or_update_peer(peer3.clone()).is_ok());
1✔
542

543
        assert_eq!(peer_storage.peer_db.size(), 3);
1✔
544

545
        assert_eq!(
1✔
546
            peer_storage
1✔
547
                .find_by_public_key(&peer1.public_key)
1✔
548
                .unwrap()
1✔
549
                .unwrap()
1✔
550
                .public_key,
1✔
551
            peer1.public_key
1✔
552
        );
1✔
553
        assert_eq!(
1✔
554
            peer_storage
1✔
555
                .find_by_public_key(&peer2.public_key)
1✔
556
                .unwrap()
1✔
557
                .unwrap()
1✔
558
                .public_key,
1✔
559
            peer2.public_key
1✔
560
        );
1✔
561
        assert_eq!(
1✔
562
            peer_storage
1✔
563
                .find_by_public_key(&peer3.public_key)
1✔
564
                .unwrap()
1✔
565
                .unwrap()
1✔
566
                .public_key,
1✔
567
            peer3.public_key
1✔
568
        );
1✔
569

570
        assert_eq!(
1✔
571
            peer_storage
1✔
572
                .get_peer_by_node_id(&peer1.node_id)
1✔
573
                .unwrap()
1✔
574
                .unwrap()
1✔
575
                .node_id,
1✔
576
            peer1.node_id
1✔
577
        );
1✔
578
        assert_eq!(
1✔
579
            peer_storage
1✔
580
                .get_peer_by_node_id(&peer2.node_id)
1✔
581
                .unwrap()
1✔
582
                .unwrap()
1✔
583
                .node_id,
1✔
584
            peer2.node_id
1✔
585
        );
1✔
586
        assert_eq!(
1✔
587
            peer_storage
1✔
588
                .get_peer_by_node_id(&peer3.node_id)
1✔
589
                .unwrap()
1✔
590
                .unwrap()
1✔
591
                .node_id,
1✔
592
            peer3.node_id
1✔
593
        );
1✔
594

595
        peer_storage.find_by_public_key(&peer1.public_key).unwrap().unwrap();
1✔
596
        peer_storage.find_by_public_key(&peer2.public_key).unwrap().unwrap();
1✔
597
        peer_storage.find_by_public_key(&peer3.public_key).unwrap().unwrap();
1✔
598

1✔
599
        // Test delete of border case peer
1✔
600
        assert!(peer_storage.soft_delete_peer(&peer3.node_id).is_ok());
1✔
601

602
        // It is a logical delete, so there should still be 3 peers in the db
603
        assert_eq!(peer_storage.peer_db.size(), 3);
1✔
604

605
        assert_eq!(
1✔
606
            peer_storage
1✔
607
                .find_by_public_key(&peer1.public_key)
1✔
608
                .unwrap()
1✔
609
                .unwrap()
1✔
610
                .public_key,
1✔
611
            peer1.public_key
1✔
612
        );
1✔
613
        assert_eq!(
1✔
614
            peer_storage
1✔
615
                .find_by_public_key(&peer2.public_key)
1✔
616
                .unwrap()
1✔
617
                .unwrap()
1✔
618
                .public_key,
1✔
619
            peer2.public_key
1✔
620
        );
1✔
621
        assert!(peer_storage
1✔
622
            .find_by_public_key(&peer3.public_key)
1✔
623
            .unwrap()
1✔
624
            .unwrap()
1✔
625
            .deleted_at
1✔
626
            .is_some());
1✔
627

628
        assert_eq!(
1✔
629
            peer_storage
1✔
630
                .get_peer_by_node_id(&peer1.node_id)
1✔
631
                .unwrap()
1✔
632
                .unwrap()
1✔
633
                .node_id,
1✔
634
            peer1.node_id
1✔
635
        );
1✔
636
        assert_eq!(
1✔
637
            peer_storage
1✔
638
                .get_peer_by_node_id(&peer2.node_id)
1✔
639
                .unwrap()
1✔
640
                .unwrap()
1✔
641
                .node_id,
1✔
642
            peer2.node_id
1✔
643
        );
1✔
644
        assert!(peer_storage
1✔
645
            .get_peer_by_node_id(&peer3.node_id)
1✔
646
            .unwrap()
1✔
647
            .unwrap()
1✔
648
            .deleted_at
1✔
649
            .is_some());
1✔
650
    }
1✔
651

652
    fn create_test_peer(features: PeerFeatures, ban: bool) -> Peer {
28✔
653
        let mut rng = rand::rngs::OsRng;
28✔
654

28✔
655
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
28✔
656
        let node_id = NodeId::from_key(&pk);
28✔
657

28✔
658
        let mut net_addresses = MultiaddressesWithStats::from_addresses_with_source(vec![], &PeerAddressSource::Config);
28✔
659

660
        // Create 1 to 4 random addresses
661
        for _i in 1..=rand::thread_rng().gen_range(1..4) {
60✔
662
            let n = [
60✔
663
                rand::thread_rng().gen_range(1..255),
60✔
664
                rand::thread_rng().gen_range(1..255),
60✔
665
                rand::thread_rng().gen_range(1..255),
60✔
666
                rand::thread_rng().gen_range(1..255),
60✔
667
                rand::thread_rng().gen_range(5000..9000),
60✔
668
            ];
60✔
669
            let net_address = format!("/ip4/{}.{}.{}.{}/tcp/{}", n[0], n[1], n[2], n[3], n[4])
60✔
670
                .parse::<Multiaddr>()
60✔
671
                .unwrap();
60✔
672
            net_addresses.add_address(&net_address, &PeerAddressSource::Config);
60✔
673
        }
60✔
674

675
        let mut peer = Peer::new(
28✔
676
            pk,
28✔
677
            node_id,
28✔
678
            net_addresses,
28✔
679
            PeerFlags::default(),
28✔
680
            features,
28✔
681
            Default::default(),
28✔
682
            Default::default(),
28✔
683
        );
28✔
684
        if ban {
28✔
685
            peer.ban_for(Duration::from_secs(600), "".to_string());
1✔
686
        }
27✔
687
        peer
28✔
688
    }
28✔
689

690
    #[test]
691
    fn test_in_network_region() {
1✔
692
        let peer_storage = get_peer_storage_sql_test_db().unwrap();
1✔
693

1✔
694
        let mut nodes = repeat_with(|| create_test_peer(PeerFeatures::COMMUNICATION_NODE, false))
5✔
695
            .take(5)
1✔
696
            .chain(repeat_with(|| create_test_peer(PeerFeatures::COMMUNICATION_CLIENT, false)).take(4))
4✔
697
            .collect::<Vec<_>>();
1✔
698

699
        for p in &nodes {
10✔
700
            peer_storage.add_or_update_peer(p.clone()).unwrap();
9✔
701
        }
9✔
702

703
        let main_peer_node_id = peer_storage.this_peer_identity().node_id;
1✔
704

1✔
705
        nodes.sort_by(|a, b| {
21✔
706
            a.node_id
21✔
707
                .distance(&main_peer_node_id)
21✔
708
                .cmp(&b.node_id.distance(&main_peer_node_id))
21✔
709
        });
21✔
710

1✔
711
        let db_nodes = peer_storage.peer_db.get_all_peers(None).unwrap();
1✔
712
        assert_eq!(db_nodes.len(), 9);
1✔
713

714
        let close_node = &nodes.first().unwrap().node_id;
1✔
715
        let far_node = &nodes.last().unwrap().node_id;
1✔
716

1✔
717
        let is_in_region = peer_storage.in_network_region(&main_peer_node_id, 1).unwrap();
1✔
718
        assert!(is_in_region);
1✔
719

720
        let is_in_region = peer_storage.in_network_region(close_node, 1).unwrap();
1✔
721
        assert!(is_in_region);
1✔
722

723
        let is_in_region = peer_storage.in_network_region(far_node, 9).unwrap();
1✔
724
        assert!(is_in_region);
1✔
725

726
        let is_in_region = peer_storage.in_network_region(far_node, 3).unwrap();
1✔
727
        assert!(!is_in_region);
1✔
728
    }
1✔
729

730
    #[test]
731
    fn get_just_seeds() {
1✔
732
        let peer_storage = get_peer_storage_sql_test_db().unwrap();
1✔
733

1✔
734
        let seeds = repeat_with(|| {
5✔
735
            let mut peer = create_test_peer(PeerFeatures::COMMUNICATION_NODE, false);
5✔
736
            peer.add_flags(PeerFlags::SEED);
5✔
737
            peer
5✔
738
        })
5✔
739
        .take(5)
1✔
740
        .collect::<Vec<_>>();
1✔
741

742
        for p in &seeds {
6✔
743
            peer_storage.add_or_update_peer(p.clone()).unwrap();
5✔
744
        }
5✔
745

746
        let nodes = repeat_with(|| create_test_peer(PeerFeatures::COMMUNICATION_NODE, false))
5✔
747
            .take(5)
1✔
748
            .collect::<Vec<_>>();
1✔
749

750
        for p in &nodes {
6✔
751
            peer_storage.add_or_update_peer(p.clone()).unwrap();
5✔
752
        }
5✔
753
        let retrieved_seeds = peer_storage.get_seed_peers().unwrap();
1✔
754
        assert_eq!(retrieved_seeds.len(), seeds.len());
1✔
755
        for seed in seeds {
6✔
756
            assert!(retrieved_seeds.iter().any(|p| p.node_id == seed.node_id));
15✔
757
        }
758
    }
1✔
759

760
    #[test]
761
    fn discovery_syncing_returns_correct_peers() {
1✔
762
        let peer_storage = get_peer_storage_sql_test_db().unwrap();
1✔
763

1✔
764
        // Threshold duration + a minute
1✔
765
        #[allow(clippy::cast_possible_wrap)] // Won't wrap around, numbers are static
1✔
766
        let above_the_threshold = Utc::now().timestamp() - (STALE_PEER_THRESHOLD_DURATION.as_secs() + 60) as i64;
1✔
767

1✔
768
        let never_seen_peer = create_test_peer(PeerFeatures::COMMUNICATION_NODE, false);
1✔
769
        let banned_peer = create_test_peer(PeerFeatures::COMMUNICATION_NODE, true);
1✔
770

1✔
771
        let mut not_active_peer = create_test_peer(PeerFeatures::COMMUNICATION_NODE, false);
1✔
772
        let address = not_active_peer.addresses.best().unwrap();
1✔
773
        let mut address = MultiaddrWithStats::new(address.address().clone(), PeerAddressSource::Config);
1✔
774
        address.mark_last_attempted(DateTime::from_timestamp(above_the_threshold, 0).unwrap().naive_utc());
1✔
775
        not_active_peer
1✔
776
            .addresses
1✔
777
            .merge(&MultiaddressesWithStats::from(vec![address]));
1✔
778

1✔
779
        let mut good_peer = create_test_peer(PeerFeatures::COMMUNICATION_NODE, false);
1✔
780
        let good_addresses = good_peer.addresses.borrow_mut();
1✔
781
        let good_address = good_addresses.addresses()[0].address().clone();
1✔
782
        good_addresses.mark_last_seen_now(&good_address);
1✔
783

1✔
784
        assert!(peer_storage.add_or_update_peer(never_seen_peer).is_ok());
1✔
785
        assert!(peer_storage.add_or_update_peer(not_active_peer).is_ok());
1✔
786
        assert!(peer_storage.add_or_update_peer(banned_peer).is_ok());
1✔
787
        assert!(peer_storage.add_or_update_peer(good_peer).is_ok());
1✔
788

789
        assert_eq!(peer_storage.all(None).unwrap().len(), 4);
1✔
790
        assert_eq!(
1✔
791
            peer_storage
1✔
792
                .discovery_syncing(100, &[], Some(PeerFeatures::COMMUNICATION_NODE))
1✔
793
                .unwrap()
1✔
794
                .len(),
1✔
795
            1
1✔
796
        );
1✔
797
    }
1✔
798
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc