• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 15326284040

29 May 2025 02:33PM UTC coverage: 72.791% (-0.8%) from 73.545%
15326284040

push

github

web-flow
fix: peer retention and connections (#7123)

Description
---
- Fixed peer retention in the database by improving delete all stale
peers logic.
- Improved error responses when peers could not be found in the
database.
- Improved DHT connection pools sync with peer database.

Motivation and Context
---
DHT neighbour and random pools were not in sync with actual connections
and peers in the peer db.

Issue - Why `PeerManagerError(PeerNotFoundError)`
```
2025-05-28 04:49:55.101009900 [comms::dht::connectivity] ERROR Error refreshing neighbour peer pool: PeerManagerError(PeerNotFoundError)
2025-05-28 04:49:55.101120100 [comms::connectivity::manager] TRACE Request (14743808475136314793): GetAllowList(Sender { inner: Some(Inner { state: State { is_complete: false, is_closed: false, is_rx_task_set: true, is_tx_task_set: false } }) })
2025-05-28 04:49:55.101160300 [comms::connectivity::manager] TRACE Request (14743808475136314793) done
2025-05-28 04:49:55.104823200 [comms::dht::connectivity] ERROR Error refreshing random peer pool: PeerManagerError(PeerNotFoundError)
```
Issue  -  Why `0 connected` but `active DHT connections: 10/12`
```
2025-05-28 04:49:55.104929100 [comms::dht::connectivity] DEBUG DHT connectivity status: neighbour pool: 8/8 (0 connected), random pool: 4/4 (0 connected, last refreshed 12777s ago), active DHT connections: 10/12
```
Issue - Why `Inbound pipeline returned an error: 'The requested peer
does not exist'`
```
2025-05-28 10:42:21.447513700 [comms::pipeline::inbound] WARN  Inbound pipeline returned an error: 'The requested peer does not exist'
```
How Has This Been Tested?
---
- Adapted unit test for improved delete all stale peers logic.
- System-level testing [**TBD**]

What process can a PR reviewer use to test or verify this change?
---
- Code review.
- System-level testing

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be ... (continued)

163 of 194 new or added lines in 11 files covered. (84.02%)

1028 existing lines in 27 files now uncovered.

82035 of 112699 relevant lines covered (72.79%)

252963.2 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.25
/comms/core/src/peer_manager/peer_storage_sql.rs
1
//  Copyright 2019 The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{cmp::min, time::Duration};
24

25
use log::*;
26
use multiaddr::Multiaddr;
27

28
use crate::{
29
    net_address::PeerAddressSource,
30
    peer_manager::{
31
        database::{PeerDatabaseSql, ThisPeerIdentity},
32
        peer::Peer,
33
        peer_id::PeerId,
34
        NodeDistance,
35
        NodeId,
36
        PeerFeatures,
37
        PeerManagerError,
38
    },
39
    types::{CommsDatabase, CommsPublicKey},
40
};
41

42
const LOG_TARGET: &str = "comms::peer_manager::peer_storage_sql";
43
// The maximum number of peers to return in peer manager
44
const PEER_MANAGER_SYNC_PEERS: usize = 100;
45
// The maximum amount of time a peer can be inactive before being considered stale:
46
// ((5 days, 24h, 60m, 60s)/2 = 2.5 days)
47
pub const STALE_PEER_THRESHOLD_DURATION: Duration = Duration::from_secs(5 * 24 * 60 * 60 / 2);
48
// Wallet peer connections are not verified in the way node peer connections are, thus a stale wallet connection may be
49
// totally valid, just not verified. Any stale wallet peers that are not neighbours will be deleted.
50
const MAX_NEIGHBOUR_WALLET_PEER_COUNT: usize = 25;
51

52
/// PeerStorageSql provides a mechanism to keep a datastore and a local copy of all peers in sync and allow fast
53
/// searches using the node_id, public key or net_address of a peer.
54
#[derive(Clone)]
55
pub struct PeerStorageSql {
56
    peer_db: PeerDatabaseSql,
57
}
58

59
impl PeerStorageSql {
60
    /// Constructs a new PeerStorageSql, with indexes populated from the given datastore
61
    pub fn new_indexed(database: PeerDatabaseSql) -> Result<PeerStorageSql, PeerManagerError> {
242✔
62
        trace!(
242✔
63
            target: LOG_TARGET,
×
64
            "Peer storage is initialized. {} total entries.",
×
65
            database.size(),
×
66
        );
67

68
        Ok(PeerStorageSql { peer_db: database })
242✔
69
    }
242✔
70

71
    /// Get this peer's identity
72
    pub fn this_peer_identity(&self) -> ThisPeerIdentity {
1,517✔
73
        self.peer_db.this_peer_identity()
1,517✔
74
    }
1,517✔
75

76
    /// Get the size of the database
77
    pub fn count(&self) -> usize {
212,489✔
78
        self.peer_db.size()
212,489✔
79
    }
212,489✔
80

81
    /// Adds or updates a peer and sets the last connection as successful.
82
    /// If the peer is marked as offline, it will be unmarked.
83
    pub fn add_or_update_peer(&self, peer: Peer) -> Result<PeerId, PeerManagerError> {
211,010✔
84
        Ok(self.peer_db.add_or_update_peer(peer)?)
211,010✔
85
    }
211,010✔
86

87
    /// Adds a peer an online peer if the peer does not already exist. When a peer already
88
    /// exists, the stored version will be replaced with the newly provided peer.
89
    pub fn add_or_update_online_peer(
1✔
90
        &self,
1✔
91
        pubkey: &CommsPublicKey,
1✔
92
        node_id: &NodeId,
1✔
93
        addresses: &[Multiaddr],
1✔
94
        peer_features: &PeerFeatures,
1✔
95
        source: &PeerAddressSource,
1✔
96
    ) -> Result<Peer, PeerManagerError> {
1✔
97
        Ok(self
1✔
98
            .peer_db
1✔
99
            .add_or_update_online_peer(pubkey, node_id, addresses, peer_features, source)?)
1✔
100
    }
1✔
101

102
    /// The peer with the specified node id will be soft deleted (marked as deleted)
103
    pub fn soft_delete_peer(&self, node_id: &NodeId) -> Result<(), PeerManagerError> {
1✔
104
        self.peer_db.soft_delete_peer(node_id)?;
1✔
105
        Ok(())
1✔
106
    }
1✔
107

108
    /// Find the peer with the provided NodeID
109
    pub fn get_peer_by_node_id(&self, node_id: &NodeId) -> Result<Option<Peer>, PeerManagerError> {
622,510✔
110
        Ok(self.peer_db.get_peer_by_node_id(node_id)?)
622,510✔
111
    }
622,510✔
112

113
    /// Get all peers based on a list of their node_ids
114
    pub fn get_peers_by_node_ids(&self, node_ids: &[NodeId]) -> Result<Vec<Peer>, PeerManagerError> {
1✔
115
        Ok(self.peer_db.get_peers_by_node_ids(node_ids)?)
1✔
116
    }
1✔
117

118
    /// Get all peers based on a list of their node_ids
119
    pub fn get_peer_public_keys_by_node_ids(
×
120
        &self,
×
121
        node_ids: &[NodeId],
×
122
    ) -> Result<Vec<CommsPublicKey>, PeerManagerError> {
×
123
        Ok(self.peer_db.get_peer_public_keys_by_node_ids(node_ids)?)
×
124
    }
×
125

126
    /// Get all banned peers
127
    pub fn get_banned_peers(&self) -> Result<Vec<Peer>, PeerManagerError> {
×
128
        Ok(self.peer_db.get_banned_peers()?)
×
129
    }
×
130

131
    pub fn find_all_starts_with(&self, partial: &[u8]) -> Result<Vec<Peer>, PeerManagerError> {
×
132
        Ok(self.peer_db.find_all_peers_match_partial_key(partial)?)
×
133
    }
×
134

135
    /// Find the peer with the provided PublicKey
136
    pub fn find_by_public_key(&self, public_key: &CommsPublicKey) -> Result<Option<Peer>, PeerManagerError> {
207,449✔
137
        Ok(self.peer_db.get_peer_by_public_key(public_key)?)
207,449✔
138
    }
207,449✔
139

140
    /// Check if a peer exist using the specified public_key
141
    pub fn exists_public_key(&self, public_key: &CommsPublicKey) -> Result<bool, PeerManagerError> {
17✔
142
        if let Ok(val) = self.peer_db.peer_exists_by_public_key(public_key) {
17✔
143
            Ok(val.is_some())
17✔
144
        } else {
145
            Ok(false)
×
146
        }
147
    }
17✔
148

149
    /// Check if a peer exist using the specified node_id
150
    pub fn exists_node_id(&self, node_id: &NodeId) -> Result<bool, PeerManagerError> {
×
151
        if let Ok(val) = self.peer_db.peer_exists_by_node_id(node_id) {
×
152
            Ok(val.is_some())
×
153
        } else {
154
            Ok(false)
×
155
        }
156
    }
×
157

158
    /// Return the peer by corresponding to the provided NodeId if it is not banned
159
    pub fn direct_identity_node_id(&self, node_id: &NodeId) -> Result<Peer, PeerManagerError> {
207,329✔
160
        let peer = self
207,329✔
161
            .get_peer_by_node_id(node_id)?
207,329✔
162
            .ok_or(PeerManagerError::peer_not_found(node_id))?;
207,314✔
163

164
        if peer.is_banned() {
207,313✔
165
            Err(PeerManagerError::BannedPeer)
×
166
        } else {
167
            Ok(peer)
207,313✔
168
        }
169
    }
207,329✔
170

171
    /// Return the peer by corresponding to the provided public key if it is not banned
172
    pub fn direct_identity_public_key(&self, public_key: &CommsPublicKey) -> Result<Peer, PeerManagerError> {
207,334✔
173
        let peer = self
207,334✔
174
            .find_by_public_key(public_key)?
207,334✔
175
            .ok_or(PeerManagerError::peer_not_found(&NodeId::from_public_key(public_key)))?;
207,334✔
176

177
        if peer.is_banned() {
207,328✔
178
            Err(PeerManagerError::BannedPeer)
×
179
        } else {
180
            Ok(peer)
207,328✔
181
        }
182
    }
207,334✔
183

184
    /// Return all peers, optionally filtering on supplied feature
185
    pub fn all(&self, features: Option<PeerFeatures>) -> Result<Vec<Peer>, PeerManagerError> {
1✔
186
        Ok(self.peer_db.get_all_peers(features)?)
1✔
187
    }
1✔
188

189
    /// Return "good" peers for syncing
190
    /// Criteria:
191
    ///  - Peer is not banned
192
    ///  - Peer has been seen within a defined time span (within the threshold)
193
    ///  - Only returns a maximum number of syncable peers (corresponds with the max possible number of requestable
194
    ///    peers to sync)
195
    ///  - Uses 0 as max PEER_MANAGER_SYNC_PEERS
196
    pub fn discovery_syncing(
5✔
197
        &self,
5✔
198
        mut n: usize,
5✔
199
        excluded_peers: &[NodeId],
5✔
200
        features: Option<PeerFeatures>,
5✔
201
    ) -> Result<Vec<Peer>, PeerManagerError> {
5✔
202
        if n == 0 {
5✔
203
            n = PEER_MANAGER_SYNC_PEERS;
×
204
        } else {
5✔
205
            n = min(n, PEER_MANAGER_SYNC_PEERS);
5✔
206
        }
5✔
207

208
        Ok(self
5✔
209
            .peer_db
5✔
210
            .get_n_random_active_peers(n, excluded_peers, features, Some(STALE_PEER_THRESHOLD_DURATION))?)
5✔
211
    }
5✔
212

213
    /// Compile a list of all known peers
214
    pub fn get_not_banned_or_deleted_peers(&self) -> Result<Vec<Peer>, PeerManagerError> {
1✔
215
        Ok(self
1✔
216
            .peer_db
1✔
217
            .get_n_not_banned_or_deleted_peers(PEER_MANAGER_SYNC_PEERS)?)
1✔
218
    }
1✔
219

220
    /// Compile a list of closest `n` active peers
221
    pub fn closest_n_active_peers(
2,244✔
222
        &self,
2,244✔
223
        region_node_id: &NodeId,
2,244✔
224
        n: usize,
2,244✔
225
        excluded_peers: &[NodeId],
2,244✔
226
        features: Option<PeerFeatures>,
2,244✔
227
        stale_peer_threshold: Option<Duration>,
2,244✔
228
        exclude_if_all_address_failed: bool,
2,244✔
229
        exclusion_distance: Option<NodeDistance>,
2,244✔
230
    ) -> Result<Vec<Peer>, PeerManagerError> {
2,244✔
231
        Ok(self.peer_db.get_closest_n_active_peers(
2,244✔
232
            region_node_id,
2,244✔
233
            n,
2,244✔
234
            excluded_peers,
2,244✔
235
            features,
2,244✔
236
            stale_peer_threshold,
2,244✔
237
            exclude_if_all_address_failed,
2,244✔
238
            exclusion_distance,
2,244✔
239
        )?)
2,244✔
240
    }
2,244✔
241

242
    pub fn get_seed_peers(&self) -> Result<Vec<Peer>, PeerManagerError> {
1✔
243
        Ok(self.peer_db.get_seed_peers()?)
1✔
244
    }
1✔
245

246
    /// Delete all stale peers, removing them from the database and returning their node_ids
247
    pub fn hard_delete_all_stale_peers(&self) -> Result<Vec<NodeId>, PeerManagerError> {
694✔
248
        Ok(self
694✔
249
            .peer_db
694✔
250
            .hard_delete_all_stale_peers(STALE_PEER_THRESHOLD_DURATION, MAX_NEIGHBOUR_WALLET_PEER_COUNT)?)
694✔
251
    }
694✔
252

253
    /// Compile a random list of communication node peers of size _n_ that are not banned or offline
254
    pub fn random_peers(&self, n: usize, exclude_peers: &[NodeId]) -> Result<Vec<Peer>, PeerManagerError> {
576✔
255
        Ok(self.peer_db.get_n_random_peers(n, exclude_peers)?)
576✔
256
    }
576✔
257

258
    /// Get the closest `n` not failed, banned or deleted peers, ordered by their distance to the given node ID.
259
    pub fn get_closest_n_good_standing_peers(
2✔
260
        &self,
2✔
261
        n: usize,
2✔
262
        features: PeerFeatures,
2✔
263
    ) -> Result<Vec<Peer>, PeerManagerError> {
2✔
264
        Ok(self.peer_db.get_closest_n_good_standing_peers(n, features)?)
2✔
265
    }
2✔
266

267
    /// Check if a specific node_id is in the network region of the N nearest neighbours of the region specified by
268
    /// region_node_id. If there are less than N known peers, this will _always_ return true
269
    pub fn in_network_region(&self, node_id: &NodeId, n: usize) -> Result<bool, PeerManagerError> {
4✔
270
        let region_node_id = self.this_peer_identity().node_id;
4✔
271
        let region_node_distance = region_node_id.distance(node_id);
4✔
272
        let node_threshold = self.calc_region_threshold(n, PeerFeatures::COMMUNICATION_NODE)?;
4✔
273
        // Is node ID in the base node threshold?
274
        if region_node_distance <= node_threshold {
4✔
275
            return Ok(true);
3✔
276
        }
1✔
277
        let client_threshold = self.calc_region_threshold(n, PeerFeatures::COMMUNICATION_CLIENT)?; // Is node ID in the base client threshold?
1✔
278
        Ok(region_node_distance <= client_threshold)
1✔
279
    }
4✔
280

281
    /// Calculate the threshold for the region specified by region_node_id.
282
    pub fn calc_region_threshold(&self, n: usize, features: PeerFeatures) -> Result<NodeDistance, PeerManagerError> {
9✔
283
        let region_node_id = self.this_peer_identity().node_id;
9✔
284
        if n == 0 {
9✔
285
            return Ok(NodeDistance::max_distance());
×
286
        }
9✔
287

288
        let closest_peers = self.peer_db.get_closest_n_good_standing_peer_node_ids(n, features)?;
9✔
289
        let mut dists = Vec::new();
9✔
290
        for node_id in closest_peers {
42✔
291
            dists.push(region_node_id.distance(&node_id));
33✔
292
        }
33✔
293

294
        if dists.is_empty() {
9✔
295
            return Ok(NodeDistance::max_distance());
×
296
        }
9✔
297

9✔
298
        // If we have less than `n` matching peers in our threshold group, the threshold should be max
9✔
299
        if dists.len() < n {
9✔
300
            return Ok(NodeDistance::max_distance());
1✔
301
        }
8✔
302

8✔
303
        Ok(dists.pop().expect("dists cannot be empty at this point"))
8✔
304
    }
9✔
305

306
    /// Unban the peer
307
    pub fn unban_peer(&self, node_id: &NodeId) -> Result<(), PeerManagerError> {
×
308
        let _node_id = self.peer_db.reset_banned(node_id)?;
×
309
        Ok(())
×
310
    }
×
311

312
    /// Unban the peer
313
    pub fn unban_all_peers(&self) -> Result<usize, PeerManagerError> {
×
314
        let number_unbanned = self.peer_db.reset_all_banned()?;
×
315
        Ok(number_unbanned)
×
316
    }
×
317

318
    pub fn reset_offline_non_wallet_peers(&self) -> Result<usize, PeerManagerError> {
×
319
        let number_offline = self.peer_db.reset_offline_non_wallet_peers()?;
×
320
        Ok(number_offline)
×
321
    }
×
322

323
    /// Ban the peer for the given duration
324
    pub fn ban_peer(
×
325
        &self,
×
326
        public_key: &CommsPublicKey,
×
327
        duration: Duration,
×
328
        reason: String,
×
329
    ) -> Result<NodeId, PeerManagerError> {
×
330
        let node_id = NodeId::from_key(public_key);
×
331
        self.peer_db
×
332
            .set_banned(&node_id, duration, reason)?
×
NEW
333
            .ok_or(PeerManagerError::peer_not_found(&NodeId::from_public_key(public_key)))
×
334
    }
×
335

336
    /// Ban the peer for the given duration
337
    pub fn ban_peer_by_node_id(
11✔
338
        &self,
11✔
339
        node_id: &NodeId,
11✔
340
        duration: Duration,
11✔
341
        reason: String,
11✔
342
    ) -> Result<NodeId, PeerManagerError> {
11✔
343
        self.peer_db
11✔
344
            .set_banned(node_id, duration, reason)?
11✔
345
            .ok_or(PeerManagerError::peer_not_found(node_id))
11✔
346
    }
11✔
347

348
    pub fn is_peer_banned(&self, node_id: &NodeId) -> Result<bool, PeerManagerError> {
302✔
349
        let peer = self
302✔
350
            .get_peer_by_node_id(node_id)?
302✔
351
            .ok_or(PeerManagerError::peer_not_found(node_id))?;
300✔
352
        Ok(peer.is_banned())
300✔
353
    }
302✔
354

355
    /// This will store metadata inside of the metadata field in the peer provided by the nodeID.
356
    /// It will return None if the value was empty and the old value if the value was updated
357
    pub fn set_peer_metadata(
140✔
358
        &self,
140✔
359
        node_id: &NodeId,
140✔
360
        key: u8,
140✔
361
        data: Vec<u8>,
140✔
362
    ) -> Result<Option<Vec<u8>>, PeerManagerError> {
140✔
363
        Ok(self.peer_db.set_metadata(node_id, key, data)?)
140✔
364
    }
140✔
365
}
366

367
#[allow(clippy::from_over_into)]
368
impl Into<CommsDatabase> for PeerStorageSql {
369
    fn into(self) -> CommsDatabase {
×
370
        self.peer_db
×
371
    }
×
372
}
373

374
#[cfg(test)]
375
mod test {
376
    use std::{borrow::BorrowMut, iter::repeat_with};
377

378
    use chrono::{DateTime, Utc};
379
    use multiaddr::Multiaddr;
380
    use rand::Rng;
381
    use tari_common_sqlite::connection::DbConnection;
382

383
    use super::*;
384
    use crate::{
385
        net_address::{MultiaddrWithStats, MultiaddressesWithStats, PeerAddressSource},
386
        peer_manager::{database::MIGRATIONS, peer::PeerFlags},
387
    };
388

389
    fn get_peer_db_sql_test_db() -> Result<PeerDatabaseSql, PeerManagerError> {
5✔
390
        let db_connection = DbConnection::connect_temp_file_and_migrate(MIGRATIONS).unwrap();
5✔
391
        Ok(PeerDatabaseSql::new(
5✔
392
            db_connection,
5✔
393
            &create_test_peer(PeerFeatures::COMMUNICATION_NODE, false),
5✔
394
        )?)
5✔
395
    }
5✔
396

397
    fn get_peer_storage_sql_test_db() -> Result<PeerStorageSql, PeerManagerError> {
4✔
398
        PeerStorageSql::new_indexed(get_peer_db_sql_test_db()?)
4✔
399
    }
4✔
400

401
    #[test]
402
    fn test_restore() {
1✔
403
        // Create Peers
1✔
404
        let mut rng = rand::rngs::OsRng;
1✔
405
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
1✔
406
        let node_id = NodeId::from_key(&pk);
1✔
407
        let net_address1 = "/ip4/1.2.3.4/tcp/8000".parse::<Multiaddr>().unwrap();
1✔
408
        let net_address2 = "/ip4/5.6.7.8/tcp/8000".parse::<Multiaddr>().unwrap();
1✔
409
        let net_address3 = "/ip4/5.6.7.8/tcp/7000".parse::<Multiaddr>().unwrap();
1✔
410
        let mut net_addresses =
1✔
411
            MultiaddressesWithStats::from_addresses_with_source(vec![net_address1], &PeerAddressSource::Config);
1✔
412
        net_addresses.add_address(&net_address2, &PeerAddressSource::Config);
1✔
413
        net_addresses.add_address(&net_address3, &PeerAddressSource::Config);
1✔
414
        let peer1 = Peer::new(
1✔
415
            pk,
1✔
416
            node_id,
1✔
417
            net_addresses,
1✔
418
            PeerFlags::default(),
1✔
419
            PeerFeatures::empty(),
1✔
420
            Default::default(),
1✔
421
            Default::default(),
1✔
422
        );
1✔
423

1✔
424
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
1✔
425
        let node_id = NodeId::from_key(&pk);
1✔
426
        let net_address4 = "/ip4/9.10.11.12/tcp/7000".parse::<Multiaddr>().unwrap();
1✔
427
        let net_addresses =
1✔
428
            MultiaddressesWithStats::from_addresses_with_source(vec![net_address4], &PeerAddressSource::Config);
1✔
429
        let peer2: Peer = Peer::new(
1✔
430
            pk,
1✔
431
            node_id,
1✔
432
            net_addresses,
1✔
433
            PeerFlags::default(),
1✔
434
            PeerFeatures::empty(),
1✔
435
            Default::default(),
1✔
436
            Default::default(),
1✔
437
        );
1✔
438

1✔
439
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
1✔
440
        let node_id = NodeId::from_key(&pk);
1✔
441
        let net_address5 = "/ip4/13.14.15.16/tcp/6000".parse::<Multiaddr>().unwrap();
1✔
442
        let net_address6 = "/ip4/17.18.19.20/tcp/8000".parse::<Multiaddr>().unwrap();
1✔
443
        let mut net_addresses =
1✔
444
            MultiaddressesWithStats::from_addresses_with_source(vec![net_address5], &PeerAddressSource::Config);
1✔
445
        net_addresses.add_address(&net_address6, &PeerAddressSource::Config);
1✔
446
        let peer3 = Peer::new(
1✔
447
            pk,
1✔
448
            node_id,
1✔
449
            net_addresses,
1✔
450
            PeerFlags::default(),
1✔
451
            PeerFeatures::empty(),
1✔
452
            Default::default(),
1✔
453
            Default::default(),
1✔
454
        );
1✔
455

1✔
456
        // Create new datastore with a peer database
1✔
457
        let mut db = Some(get_peer_db_sql_test_db().unwrap());
1✔
458
        {
1✔
459
            let peer_storage = db.take().unwrap();
1✔
460

1✔
461
            // Test adding and searching for peers
1✔
462
            assert!(peer_storage.add_or_update_peer(peer1.clone()).is_ok());
1✔
463
            assert!(peer_storage.add_or_update_peer(peer2.clone()).is_ok());
1✔
464
            assert!(peer_storage.add_or_update_peer(peer3.clone()).is_ok());
1✔
465

466
            assert_eq!(peer_storage.size(), 3);
1✔
467
            assert!(peer_storage.get_peer_by_public_key(&peer1.public_key).is_ok());
1✔
468
            assert!(peer_storage.get_peer_by_public_key(&peer2.public_key).is_ok());
1✔
469
            assert!(peer_storage.get_peer_by_public_key(&peer3.public_key).is_ok());
1✔
470
            db = Some(peer_storage);
1✔
471
        }
1✔
472
        // Restore from existing database
1✔
473
        let peer_storage = PeerStorageSql::new_indexed(db.take().unwrap()).unwrap();
1✔
474

1✔
475
        assert_eq!(peer_storage.peer_db.size(), 3);
1✔
476
        assert!(peer_storage.find_by_public_key(&peer1.public_key).is_ok());
1✔
477
        assert!(peer_storage.find_by_public_key(&peer2.public_key).is_ok());
1✔
478
        assert!(peer_storage.find_by_public_key(&peer3.public_key).is_ok());
1✔
479
    }
1✔
480

481
    #[allow(clippy::too_many_lines)]
482
    #[test]
483
    fn test_add_delete_find_peer() {
1✔
484
        let peer_storage = get_peer_storage_sql_test_db().unwrap();
1✔
485

1✔
486
        // Create Peers
1✔
487
        let mut rng = rand::rngs::OsRng;
1✔
488
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
1✔
489
        let node_id = NodeId::from_key(&pk);
1✔
490
        let net_address1 = "/ip4/1.2.3.4/tcp/8000".parse::<Multiaddr>().unwrap();
1✔
491
        let net_address2 = "/ip4/5.6.7.8/tcp/8000".parse::<Multiaddr>().unwrap();
1✔
492
        let net_address3 = "/ip4/5.6.7.8/tcp/7000".parse::<Multiaddr>().unwrap();
1✔
493
        let mut net_addresses =
1✔
494
            MultiaddressesWithStats::from_addresses_with_source(vec![net_address1], &PeerAddressSource::Config);
1✔
495
        net_addresses.add_address(&net_address2, &PeerAddressSource::Config);
1✔
496
        net_addresses.add_address(&net_address3, &PeerAddressSource::Config);
1✔
497
        let peer1 = Peer::new(
1✔
498
            pk,
1✔
499
            node_id,
1✔
500
            net_addresses,
1✔
501
            PeerFlags::default(),
1✔
502
            PeerFeatures::empty(),
1✔
503
            Default::default(),
1✔
504
            Default::default(),
1✔
505
        );
1✔
506

1✔
507
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
1✔
508
        let node_id = NodeId::from_key(&pk);
1✔
509
        let net_address4 = "/ip4/9.10.11.12/tcp/7000".parse::<Multiaddr>().unwrap();
1✔
510
        let net_addresses =
1✔
511
            MultiaddressesWithStats::from_addresses_with_source(vec![net_address4], &PeerAddressSource::Config);
1✔
512
        let peer2: Peer = Peer::new(
1✔
513
            pk,
1✔
514
            node_id,
1✔
515
            net_addresses,
1✔
516
            PeerFlags::default(),
1✔
517
            PeerFeatures::empty(),
1✔
518
            Default::default(),
1✔
519
            Default::default(),
1✔
520
        );
1✔
521

1✔
522
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
1✔
523
        let node_id = NodeId::from_key(&pk);
1✔
524
        let net_address5 = "/ip4/13.14.15.16/tcp/6000".parse::<Multiaddr>().unwrap();
1✔
525
        let net_address6 = "/ip4/17.18.19.20/tcp/8000".parse::<Multiaddr>().unwrap();
1✔
526
        let mut net_addresses =
1✔
527
            MultiaddressesWithStats::from_addresses_with_source(vec![net_address5], &PeerAddressSource::Config);
1✔
528
        net_addresses.add_address(&net_address6, &PeerAddressSource::Config);
1✔
529
        let peer3 = Peer::new(
1✔
530
            pk,
1✔
531
            node_id,
1✔
532
            net_addresses,
1✔
533
            PeerFlags::default(),
1✔
534
            PeerFeatures::empty(),
1✔
535
            Default::default(),
1✔
536
            Default::default(),
1✔
537
        );
1✔
538
        // Test adding and searching for peers
1✔
539
        peer_storage.add_or_update_peer(peer1.clone()).unwrap(); // assert!(peer_storage.add_or_update_peer(peer1.clone()).is_ok());
1✔
540
        assert!(peer_storage.add_or_update_peer(peer2.clone()).is_ok());
1✔
541
        assert!(peer_storage.add_or_update_peer(peer3.clone()).is_ok());
1✔
542

543
        assert_eq!(peer_storage.peer_db.size(), 3);
1✔
544

545
        assert_eq!(
1✔
546
            peer_storage
1✔
547
                .find_by_public_key(&peer1.public_key)
1✔
548
                .unwrap()
1✔
549
                .unwrap()
1✔
550
                .public_key,
1✔
551
            peer1.public_key
1✔
552
        );
1✔
553
        assert_eq!(
1✔
554
            peer_storage
1✔
555
                .find_by_public_key(&peer2.public_key)
1✔
556
                .unwrap()
1✔
557
                .unwrap()
1✔
558
                .public_key,
1✔
559
            peer2.public_key
1✔
560
        );
1✔
561
        assert_eq!(
1✔
562
            peer_storage
1✔
563
                .find_by_public_key(&peer3.public_key)
1✔
564
                .unwrap()
1✔
565
                .unwrap()
1✔
566
                .public_key,
1✔
567
            peer3.public_key
1✔
568
        );
1✔
569

570
        assert_eq!(
1✔
571
            peer_storage
1✔
572
                .get_peer_by_node_id(&peer1.node_id)
1✔
573
                .unwrap()
1✔
574
                .unwrap()
1✔
575
                .node_id,
1✔
576
            peer1.node_id
1✔
577
        );
1✔
578
        assert_eq!(
1✔
579
            peer_storage
1✔
580
                .get_peer_by_node_id(&peer2.node_id)
1✔
581
                .unwrap()
1✔
582
                .unwrap()
1✔
583
                .node_id,
1✔
584
            peer2.node_id
1✔
585
        );
1✔
586
        assert_eq!(
1✔
587
            peer_storage
1✔
588
                .get_peer_by_node_id(&peer3.node_id)
1✔
589
                .unwrap()
1✔
590
                .unwrap()
1✔
591
                .node_id,
1✔
592
            peer3.node_id
1✔
593
        );
1✔
594

595
        peer_storage.find_by_public_key(&peer1.public_key).unwrap().unwrap();
1✔
596
        peer_storage.find_by_public_key(&peer2.public_key).unwrap().unwrap();
1✔
597
        peer_storage.find_by_public_key(&peer3.public_key).unwrap().unwrap();
1✔
598

1✔
599
        // Test delete of border case peer
1✔
600
        assert!(peer_storage.soft_delete_peer(&peer3.node_id).is_ok());
1✔
601

602
        // It is a logical delete, so there should still be 3 peers in the db
603
        assert_eq!(peer_storage.peer_db.size(), 3);
1✔
604

605
        assert_eq!(
1✔
606
            peer_storage
1✔
607
                .find_by_public_key(&peer1.public_key)
1✔
608
                .unwrap()
1✔
609
                .unwrap()
1✔
610
                .public_key,
1✔
611
            peer1.public_key
1✔
612
        );
1✔
613
        assert_eq!(
1✔
614
            peer_storage
1✔
615
                .find_by_public_key(&peer2.public_key)
1✔
616
                .unwrap()
1✔
617
                .unwrap()
1✔
618
                .public_key,
1✔
619
            peer2.public_key
1✔
620
        );
1✔
621
        assert!(peer_storage
1✔
622
            .find_by_public_key(&peer3.public_key)
1✔
623
            .unwrap()
1✔
624
            .unwrap()
1✔
625
            .deleted_at
1✔
626
            .is_some());
1✔
627

628
        assert_eq!(
1✔
629
            peer_storage
1✔
630
                .get_peer_by_node_id(&peer1.node_id)
1✔
631
                .unwrap()
1✔
632
                .unwrap()
1✔
633
                .node_id,
1✔
634
            peer1.node_id
1✔
635
        );
1✔
636
        assert_eq!(
1✔
637
            peer_storage
1✔
638
                .get_peer_by_node_id(&peer2.node_id)
1✔
639
                .unwrap()
1✔
640
                .unwrap()
1✔
641
                .node_id,
1✔
642
            peer2.node_id
1✔
643
        );
1✔
644
        assert!(peer_storage
1✔
645
            .get_peer_by_node_id(&peer3.node_id)
1✔
646
            .unwrap()
1✔
647
            .unwrap()
1✔
648
            .deleted_at
1✔
649
            .is_some());
1✔
650
    }
1✔
651

652
    fn create_test_peer(features: PeerFeatures, ban: bool) -> Peer {
28✔
653
        let mut rng = rand::rngs::OsRng;
28✔
654

28✔
655
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
28✔
656
        let node_id = NodeId::from_key(&pk);
28✔
657

28✔
658
        let mut net_addresses = MultiaddressesWithStats::from_addresses_with_source(vec![], &PeerAddressSource::Config);
28✔
659

660
        // Create 1 to 4 random addresses
661
        for _i in 1..=rand::thread_rng().gen_range(1..4) {
55✔
662
            let n = [
55✔
663
                rand::thread_rng().gen_range(1..255),
55✔
664
                rand::thread_rng().gen_range(1..255),
55✔
665
                rand::thread_rng().gen_range(1..255),
55✔
666
                rand::thread_rng().gen_range(1..255),
55✔
667
                rand::thread_rng().gen_range(5000..9000),
55✔
668
            ];
55✔
669
            let net_address = format!("/ip4/{}.{}.{}.{}/tcp/{}", n[0], n[1], n[2], n[3], n[4])
55✔
670
                .parse::<Multiaddr>()
55✔
671
                .unwrap();
55✔
672
            net_addresses.add_address(&net_address, &PeerAddressSource::Config);
55✔
673
        }
55✔
674

675
        let mut peer = Peer::new(
28✔
676
            pk,
28✔
677
            node_id,
28✔
678
            net_addresses,
28✔
679
            PeerFlags::default(),
28✔
680
            features,
28✔
681
            Default::default(),
28✔
682
            Default::default(),
28✔
683
        );
28✔
684
        if ban {
28✔
685
            peer.ban_for(Duration::from_secs(600), "".to_string());
1✔
686
        }
27✔
687
        peer
28✔
688
    }
28✔
689

690
    #[test]
691
    fn test_in_network_region() {
1✔
692
        let peer_storage = get_peer_storage_sql_test_db().unwrap();
1✔
693

1✔
694
        let mut nodes = repeat_with(|| create_test_peer(PeerFeatures::COMMUNICATION_NODE, false))
5✔
695
            .take(5)
1✔
696
            .chain(repeat_with(|| create_test_peer(PeerFeatures::COMMUNICATION_CLIENT, false)).take(4))
4✔
697
            .collect::<Vec<_>>();
1✔
698

699
        for p in &nodes {
10✔
700
            peer_storage.add_or_update_peer(p.clone()).unwrap();
9✔
701
        }
9✔
702

703
        let main_peer_node_id = peer_storage.this_peer_identity().node_id;
1✔
704

1✔
705
        nodes.sort_by(|a, b| {
21✔
706
            a.node_id
21✔
707
                .distance(&main_peer_node_id)
21✔
708
                .cmp(&b.node_id.distance(&main_peer_node_id))
21✔
709
        });
21✔
710

1✔
711
        let db_nodes = peer_storage.peer_db.get_all_peers(None).unwrap();
1✔
712
        assert_eq!(db_nodes.len(), 9);
1✔
713

714
        let close_node = &nodes.first().unwrap().node_id;
1✔
715
        let far_node = &nodes.last().unwrap().node_id;
1✔
716

1✔
717
        let is_in_region = peer_storage.in_network_region(&main_peer_node_id, 1).unwrap();
1✔
718
        assert!(is_in_region);
1✔
719

720
        let is_in_region = peer_storage.in_network_region(close_node, 1).unwrap();
1✔
721
        assert!(is_in_region);
1✔
722

723
        let is_in_region = peer_storage.in_network_region(far_node, 9).unwrap();
1✔
724
        assert!(is_in_region);
1✔
725

726
        let is_in_region = peer_storage.in_network_region(far_node, 3).unwrap();
1✔
727
        assert!(!is_in_region);
1✔
728
    }
1✔
729

730
    #[test]
731
    fn get_just_seeds() {
1✔
732
        let peer_storage = get_peer_storage_sql_test_db().unwrap();
1✔
733

1✔
734
        let seeds = repeat_with(|| {
5✔
735
            let mut peer = create_test_peer(PeerFeatures::COMMUNICATION_NODE, false);
5✔
736
            peer.add_flags(PeerFlags::SEED);
5✔
737
            peer
5✔
738
        })
5✔
739
        .take(5)
1✔
740
        .collect::<Vec<_>>();
1✔
741

742
        for p in &seeds {
6✔
743
            peer_storage.add_or_update_peer(p.clone()).unwrap();
5✔
744
        }
5✔
745

746
        let nodes = repeat_with(|| create_test_peer(PeerFeatures::COMMUNICATION_NODE, false))
5✔
747
            .take(5)
1✔
748
            .collect::<Vec<_>>();
1✔
749

750
        for p in &nodes {
6✔
751
            peer_storage.add_or_update_peer(p.clone()).unwrap();
5✔
752
        }
5✔
753
        let retrieved_seeds = peer_storage.get_seed_peers().unwrap();
1✔
754
        assert_eq!(retrieved_seeds.len(), seeds.len());
1✔
755
        for seed in seeds {
6✔
756
            assert!(retrieved_seeds.iter().any(|p| p.node_id == seed.node_id));
15✔
757
        }
758
    }
1✔
759

760
    #[test]
761
    fn discovery_syncing_returns_correct_peers() {
1✔
762
        let peer_storage = get_peer_storage_sql_test_db().unwrap();
1✔
763

1✔
764
        // Threshold duration + a minute
1✔
765
        #[allow(clippy::cast_possible_wrap)] // Won't wrap around, numbers are static
1✔
766
        let above_the_threshold = Utc::now().timestamp() - (STALE_PEER_THRESHOLD_DURATION.as_secs() + 60) as i64;
1✔
767

1✔
768
        let never_seen_peer = create_test_peer(PeerFeatures::COMMUNICATION_NODE, false);
1✔
769
        let banned_peer = create_test_peer(PeerFeatures::COMMUNICATION_NODE, true);
1✔
770

1✔
771
        let mut not_active_peer = create_test_peer(PeerFeatures::COMMUNICATION_NODE, false);
1✔
772
        let address = not_active_peer.addresses.best().unwrap();
1✔
773
        let mut address = MultiaddrWithStats::new(address.address().clone(), PeerAddressSource::Config);
1✔
774
        address.mark_last_attempted(DateTime::from_timestamp(above_the_threshold, 0).unwrap().naive_utc());
1✔
775
        not_active_peer
1✔
776
            .addresses
1✔
777
            .merge(&MultiaddressesWithStats::from(vec![address]));
1✔
778

1✔
779
        let mut good_peer = create_test_peer(PeerFeatures::COMMUNICATION_NODE, false);
1✔
780
        let good_addresses = good_peer.addresses.borrow_mut();
1✔
781
        let good_address = good_addresses.addresses()[0].address().clone();
1✔
782
        good_addresses.mark_last_seen_now(&good_address);
1✔
783

1✔
784
        assert!(peer_storage.add_or_update_peer(never_seen_peer).is_ok());
1✔
785
        assert!(peer_storage.add_or_update_peer(not_active_peer).is_ok());
1✔
786
        assert!(peer_storage.add_or_update_peer(banned_peer).is_ok());
1✔
787
        assert!(peer_storage.add_or_update_peer(good_peer).is_ok());
1✔
788

789
        assert_eq!(peer_storage.all(None).unwrap().len(), 4);
1✔
790
        assert_eq!(
1✔
791
            peer_storage
1✔
792
                .discovery_syncing(100, &[], Some(PeerFeatures::COMMUNICATION_NODE))
1✔
793
                .unwrap()
1✔
794
                .len(),
1✔
795
            1
1✔
796
        );
1✔
797
    }
1✔
798
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc