• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 15757511687

19 Jun 2025 12:10PM UTC coverage: 72.219% (-0.02%) from 72.239%
15757511687

push

github

web-flow
fix: peer dialling (#7218)

## Description

Implements proactive peer dialing functionality to resolve connectivity
issues where nodes consistently dial fewer peers than needed to reach
target connection counts. The system now automatically dials 2-3x the
target number of peers to account for connection failures and ensures
robust network connectivity.

**Key Components:**
- **Proactive Dialer**: Core service that calculates optimal dial counts
based on historical success rates and intelligently selects healthy peer
candidates
- **Enhanced Connection Stats**: Extended `PeerConnectionStats` with
circuit breaker pattern integration and health scoring capabilities
- **Peer Health Metrics**: Circuit breaker pattern with time-windowed
success rate tracking and health scoring for peer selection
- **Health-Aware Peer Selection**: Enhanced `ConnectivitySelection` with
health-based prioritization for peer connection choices
- **Peer Discovery Bridge**: Triggers additional peer discovery when
insufficient candidates are available
- **Comprehensive Metrics**: Prometheus metrics for monitoring dial
attempts, success rates, circuit breaker states, and performance

**Architecture:**
- Actor-based service pattern with async message passing
- Integrates into existing ConnectivityManager's connection pool refresh
cycle (currently 180 seconds/3 minutes)
- Configurable multipliers and thresholds for different network
conditions
- Backward compatible with existing connectivity management

**Production Validation:** This implementation has been tested and
verified working on a live mainnet node, with logs confirming proper
execution and logic flow.

---

## Motivation and Context

**Problem:** Nodes were consistently dialing only ~5 peers instead of
sufficient peers to reach the target of 8 connections, leading to poor
network connectivity and potential isolation issues.

**Root Cause:** The existing connectivity manager only dialed exactly
the number of missing connectio... (continued)

522 of 760 new or added lines in 10 files covered. (68.68%)

72 existing lines in 14 files now uncovered.

82790 of 114637 relevant lines covered (72.22%)

240690.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.44
/comms/core/src/peer_manager/peer_storage_sql.rs
1
//  Copyright 2019 The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{cmp::min, time::Duration};
24

25
use log::*;
26
use multiaddr::Multiaddr;
27

28
use crate::{
29
    net_address::PeerAddressSource,
30
    peer_manager::{
31
        database::{PeerDatabaseSql, ThisPeerIdentity},
32
        peer::Peer,
33
        peer_id::PeerId,
34
        NodeDistance,
35
        NodeId,
36
        PeerFeatures,
37
        PeerManagerError,
38
    },
39
    types::{CommsDatabase, CommsPublicKey},
40
};
41

42
const LOG_TARGET: &str = "comms::peer_manager::peer_storage_sql";
43
// The maximum number of peers to return in peer manager
44
const PEER_MANAGER_SYNC_PEERS: usize = 100;
45
// The maximum amount of time a peer can be inactive before being considered stale:
46
// ((5 days, 24h, 60m, 60s)/2 = 2.5 days)
47
pub const STALE_PEER_THRESHOLD_DURATION: Duration = Duration::from_secs(5 * 24 * 60 * 60 / 2);
48

49
/// PeerStorageSql provides a mechanism to keep a datastore and a local copy of all peers in sync and allow fast
50
/// searches using the node_id, public key or net_address of a peer.
51
#[derive(Clone)]
52
pub struct PeerStorageSql {
53
    peer_db: PeerDatabaseSql,
54
}
55

56
impl PeerStorageSql {
57
    /// Constructs a new PeerStorageSql, with indexes populated from the given datastore
58
    pub fn new_indexed(database: PeerDatabaseSql) -> Result<PeerStorageSql, PeerManagerError> {
242✔
59
        trace!(
242✔
60
            target: LOG_TARGET,
×
61
            "Peer storage is initialized. {} total entries.",
×
62
            database.size(),
×
63
        );
64

65
        Ok(PeerStorageSql { peer_db: database })
242✔
66
    }
242✔
67

68
    /// Get this peer's identity
69
    pub fn this_peer_identity(&self) -> ThisPeerIdentity {
1,517✔
70
        self.peer_db.this_peer_identity()
1,517✔
71
    }
1,517✔
72

73
    /// Get the size of the database
74
    pub fn count(&self) -> usize {
46,828✔
75
        self.peer_db.size()
46,828✔
76
    }
46,828✔
77

78
    /// Adds or updates a peer and sets the last connection as successful.
79
    /// If the peer is marked as offline, it will be unmarked.
80
    pub fn add_or_update_peer(&self, peer: Peer) -> Result<PeerId, PeerManagerError> {
45,349✔
81
        Ok(self.peer_db.add_or_update_peer(peer)?)
45,349✔
82
    }
45,349✔
83

84
    /// Adds a peer an online peer if the peer does not already exist. When a peer already
85
    /// exists, the stored version will be replaced with the newly provided peer.
86
    pub fn add_or_update_online_peer(
1✔
87
        &self,
1✔
88
        pubkey: &CommsPublicKey,
1✔
89
        node_id: &NodeId,
1✔
90
        addresses: &[Multiaddr],
1✔
91
        peer_features: &PeerFeatures,
1✔
92
        source: &PeerAddressSource,
1✔
93
    ) -> Result<Peer, PeerManagerError> {
1✔
94
        Ok(self
1✔
95
            .peer_db
1✔
96
            .add_or_update_online_peer(pubkey, node_id, addresses, peer_features, source)?)
1✔
97
    }
1✔
98

99
    /// The peer with the specified node id will be soft deleted (marked as deleted)
100
    pub fn soft_delete_peer(&self, node_id: &NodeId) -> Result<(), PeerManagerError> {
1✔
101
        self.peer_db.soft_delete_peer(node_id)?;
1✔
102
        Ok(())
1✔
103
    }
1✔
104

105
    /// Find the peer with the provided NodeID
106
    pub fn get_peer_by_node_id(&self, node_id: &NodeId) -> Result<Option<Peer>, PeerManagerError> {
127,188✔
107
        Ok(self.peer_db.get_peer_by_node_id(node_id)?)
127,188✔
108
    }
127,188✔
109

110
    /// Get all peers based on a list of their node_ids
111
    pub fn get_peers_by_node_ids(&self, node_ids: &[NodeId]) -> Result<Vec<Peer>, PeerManagerError> {
1✔
112
        Ok(self.peer_db.get_peers_by_node_ids(node_ids)?)
1✔
113
    }
1✔
114

115
    /// Get all peers based on a list of their node_ids
116
    pub fn get_peer_public_keys_by_node_ids(
×
117
        &self,
×
118
        node_ids: &[NodeId],
×
119
    ) -> Result<Vec<CommsPublicKey>, PeerManagerError> {
×
120
        Ok(self.peer_db.get_peer_public_keys_by_node_ids(node_ids)?)
×
121
    }
×
122

123
    /// Get all banned peers
124
    pub fn get_banned_peers(&self) -> Result<Vec<Peer>, PeerManagerError> {
×
125
        Ok(self.peer_db.get_banned_peers()?)
×
126
    }
×
127

128
    pub fn find_all_starts_with(&self, partial: &[u8]) -> Result<Vec<Peer>, PeerManagerError> {
×
129
        Ok(self.peer_db.find_all_peers_match_partial_key(partial)?)
×
130
    }
×
131

132
    /// Find the peer with the provided PublicKey
133
    pub fn find_by_public_key(&self, public_key: &CommsPublicKey) -> Result<Option<Peer>, PeerManagerError> {
41,778✔
134
        Ok(self.peer_db.get_peer_by_public_key(public_key)?)
41,778✔
135
    }
41,778✔
136

137
    /// Check if a peer exist using the specified public_key
138
    pub fn exists_public_key(&self, public_key: &CommsPublicKey) -> Result<bool, PeerManagerError> {
17✔
139
        if let Ok(val) = self.peer_db.peer_exists_by_public_key(public_key) {
17✔
140
            Ok(val.is_some())
17✔
141
        } else {
142
            Ok(false)
×
143
        }
144
    }
17✔
145

146
    /// Check if a peer exist using the specified node_id
147
    pub fn exists_node_id(&self, node_id: &NodeId) -> Result<bool, PeerManagerError> {
×
148
        if let Ok(val) = self.peer_db.peer_exists_by_node_id(node_id) {
×
149
            Ok(val.is_some())
×
150
        } else {
151
            Ok(false)
×
152
        }
153
    }
×
154

155
    /// Return the peer by corresponding to the provided NodeId if it is not banned
156
    pub fn direct_identity_node_id(&self, node_id: &NodeId) -> Result<Peer, PeerManagerError> {
41,104✔
157
        let peer = self
41,104✔
158
            .get_peer_by_node_id(node_id)?
41,104✔
159
            .ok_or(PeerManagerError::peer_not_found(node_id))?;
41,104✔
160

161
        if peer.is_banned() {
41,103✔
UNCOV
162
            Err(PeerManagerError::BannedPeer)
×
163
        } else {
164
            Ok(peer)
41,103✔
165
        }
166
    }
41,104✔
167

168
    /// Return the peer by corresponding to the provided public key if it is not banned
169
    pub fn direct_identity_public_key(&self, public_key: &CommsPublicKey) -> Result<Peer, PeerManagerError> {
41,126✔
170
        let peer = self
41,126✔
171
            .find_by_public_key(public_key)?
41,126✔
172
            .ok_or(PeerManagerError::peer_not_found(&NodeId::from_public_key(public_key)))?;
41,126✔
173

174
        if peer.is_banned() {
41,120✔
175
            Err(PeerManagerError::BannedPeer)
1✔
176
        } else {
177
            Ok(peer)
41,119✔
178
        }
179
    }
41,126✔
180

181
    /// Return all peers, optionally filtering on supplied feature
182
    pub fn all(&self, features: Option<PeerFeatures>) -> Result<Vec<Peer>, PeerManagerError> {
1✔
183
        Ok(self.peer_db.get_all_peers(features)?)
1✔
184
    }
1✔
185

186
    /// Return "good" peers for syncing
187
    /// Criteria:
188
    ///  - Peer is not banned
189
    ///  - Peer has been seen within a defined time span (within the threshold)
190
    ///  - Only returns a maximum number of syncable peers (corresponds with the max possible number of requestable
191
    ///    peers to sync)
192
    ///  - Uses 0 as max PEER_MANAGER_SYNC_PEERS
193
    pub fn discovery_syncing(
5✔
194
        &self,
5✔
195
        mut n: usize,
5✔
196
        excluded_peers: &[NodeId],
5✔
197
        features: Option<PeerFeatures>,
5✔
198
    ) -> Result<Vec<Peer>, PeerManagerError> {
5✔
199
        if n == 0 {
5✔
200
            n = PEER_MANAGER_SYNC_PEERS;
×
201
        } else {
5✔
202
            n = min(n, PEER_MANAGER_SYNC_PEERS);
5✔
203
        }
5✔
204

205
        Ok(self
5✔
206
            .peer_db
5✔
207
            .get_n_random_active_peers(n, excluded_peers, features, Some(STALE_PEER_THRESHOLD_DURATION))?)
5✔
208
    }
5✔
209

210
    /// Compile a list of all known peers
211
    pub fn get_not_banned_or_deleted_peers(&self) -> Result<Vec<Peer>, PeerManagerError> {
1✔
212
        Ok(self
1✔
213
            .peer_db
1✔
214
            .get_n_not_banned_or_deleted_peers(PEER_MANAGER_SYNC_PEERS)?)
1✔
215
    }
1✔
216

217
    /// Get available dial candidates that are communication nodes, not banned, not deleted,
218
    /// and not in the excluded node IDs list
219
    pub fn get_available_dial_candidates(
599✔
220
        &self,
599✔
221
        exclude_node_ids: &[NodeId],
599✔
222
        limit: Option<usize>,
599✔
223
    ) -> Result<Vec<Peer>, PeerManagerError> {
599✔
224
        Ok(self.peer_db.get_available_dial_candidates(exclude_node_ids, limit)?)
599✔
225
    }
599✔
226

227
    /// Compile a list of closest `n` active peers
228
    pub fn closest_n_active_peers(
3,150✔
229
        &self,
3,150✔
230
        region_node_id: &NodeId,
3,150✔
231
        n: usize,
3,150✔
232
        excluded_peers: &[NodeId],
3,150✔
233
        features: Option<PeerFeatures>,
3,150✔
234
        stale_peer_threshold: Option<Duration>,
3,150✔
235
        exclude_if_all_address_failed: bool,
3,150✔
236
        exclusion_distance: Option<NodeDistance>,
3,150✔
237
    ) -> Result<Vec<Peer>, PeerManagerError> {
3,150✔
238
        Ok(self.peer_db.get_closest_n_active_peers(
3,150✔
239
            region_node_id,
3,150✔
240
            n,
3,150✔
241
            excluded_peers,
3,150✔
242
            features,
3,150✔
243
            stale_peer_threshold,
3,150✔
244
            exclude_if_all_address_failed,
3,150✔
245
            exclusion_distance,
3,150✔
246
        )?)
3,150✔
247
    }
3,150✔
248

249
    pub fn get_seed_peers(&self) -> Result<Vec<Peer>, PeerManagerError> {
1✔
250
        Ok(self.peer_db.get_seed_peers()?)
1✔
251
    }
1✔
252

253
    /// Compile a random list of communication node peers of size _n_ that are not banned or offline
254
    pub fn random_peers(&self, n: usize, exclude_peers: &[NodeId]) -> Result<Vec<Peer>, PeerManagerError> {
994✔
255
        Ok(self.peer_db.get_n_random_peers(n, exclude_peers)?)
994✔
256
    }
994✔
257

258
    /// Get the closest `n` not failed, banned or deleted peers, ordered by their distance to the given node ID.
259
    pub fn get_closest_n_good_standing_peers(
2✔
260
        &self,
2✔
261
        n: usize,
2✔
262
        features: PeerFeatures,
2✔
263
    ) -> Result<Vec<Peer>, PeerManagerError> {
2✔
264
        Ok(self.peer_db.get_closest_n_good_standing_peers(n, features)?)
2✔
265
    }
2✔
266

267
    /// Check if a specific node_id is in the network region of the N nearest neighbours of the region specified by
268
    /// region_node_id. If there are less than N known peers, this will _always_ return true
269
    pub fn in_network_region(&self, node_id: &NodeId, n: usize) -> Result<bool, PeerManagerError> {
4✔
270
        let region_node_id = self.this_peer_identity().node_id;
4✔
271
        let region_node_distance = region_node_id.distance(node_id);
4✔
272
        let node_threshold = self.calc_region_threshold(n, PeerFeatures::COMMUNICATION_NODE)?;
4✔
273
        // Is node ID in the base node threshold?
274
        if region_node_distance <= node_threshold {
4✔
275
            return Ok(true);
3✔
276
        }
1✔
277
        let client_threshold = self.calc_region_threshold(n, PeerFeatures::COMMUNICATION_CLIENT)?; // Is node ID in the base client threshold?
1✔
278
        Ok(region_node_distance <= client_threshold)
1✔
279
    }
4✔
280

281
    /// Calculate the threshold for the region specified by region_node_id.
282
    pub fn calc_region_threshold(&self, n: usize, features: PeerFeatures) -> Result<NodeDistance, PeerManagerError> {
9✔
283
        let region_node_id = self.this_peer_identity().node_id;
9✔
284
        if n == 0 {
9✔
285
            return Ok(NodeDistance::max_distance());
×
286
        }
9✔
287

288
        let closest_peers = self.peer_db.get_closest_n_good_standing_peer_node_ids(n, features)?;
9✔
289
        let mut dists = Vec::new();
9✔
290
        for node_id in closest_peers {
42✔
291
            dists.push(region_node_id.distance(&node_id));
33✔
292
        }
33✔
293

294
        if dists.is_empty() {
9✔
295
            return Ok(NodeDistance::max_distance());
×
296
        }
9✔
297

9✔
298
        // If we have less than `n` matching peers in our threshold group, the threshold should be max
9✔
299
        if dists.len() < n {
9✔
300
            return Ok(NodeDistance::max_distance());
1✔
301
        }
8✔
302

8✔
303
        Ok(dists.pop().expect("dists cannot be empty at this point"))
8✔
304
    }
9✔
305

306
    /// Unban the peer
307
    pub fn unban_peer(&self, node_id: &NodeId) -> Result<(), PeerManagerError> {
×
308
        let _node_id = self.peer_db.reset_banned(node_id)?;
×
309
        Ok(())
×
310
    }
×
311

312
    /// Unban the peer
313
    pub fn unban_all_peers(&self) -> Result<usize, PeerManagerError> {
×
314
        let number_unbanned = self.peer_db.reset_all_banned()?;
×
315
        Ok(number_unbanned)
×
316
    }
×
317

318
    pub fn reset_offline_non_wallet_peers(&self) -> Result<usize, PeerManagerError> {
×
319
        let number_offline = self.peer_db.reset_offline_non_wallet_peers()?;
×
320
        Ok(number_offline)
×
321
    }
×
322

323
    /// Ban the peer for the given duration
324
    pub fn ban_peer(
×
325
        &self,
×
326
        public_key: &CommsPublicKey,
×
327
        duration: Duration,
×
328
        reason: String,
×
329
    ) -> Result<NodeId, PeerManagerError> {
×
330
        let node_id = NodeId::from_key(public_key);
×
331
        self.peer_db
×
332
            .set_banned(&node_id, duration, reason)?
×
333
            .ok_or(PeerManagerError::peer_not_found(&NodeId::from_public_key(public_key)))
×
334
    }
×
335

336
    /// Ban the peer for the given duration
337
    pub fn ban_peer_by_node_id(
26✔
338
        &self,
26✔
339
        node_id: &NodeId,
26✔
340
        duration: Duration,
26✔
341
        reason: String,
26✔
342
    ) -> Result<NodeId, PeerManagerError> {
26✔
343
        self.peer_db
26✔
344
            .set_banned(node_id, duration, reason)?
26✔
345
            .ok_or(PeerManagerError::peer_not_found(node_id))
26✔
346
    }
26✔
347

348
    pub fn is_peer_banned(&self, node_id: &NodeId) -> Result<bool, PeerManagerError> {
3,061✔
349
        let peer = self
3,061✔
350
            .get_peer_by_node_id(node_id)?
3,061✔
351
            .ok_or(PeerManagerError::peer_not_found(node_id))?;
3,059✔
352
        Ok(peer.is_banned())
3,059✔
353
    }
3,061✔
354

355
    /// This will store metadata inside of the metadata field in the peer provided by the nodeID.
356
    /// It will return None if the value was empty and the old value if the value was updated
357
    pub fn set_peer_metadata(
2,440✔
358
        &self,
2,440✔
359
        node_id: &NodeId,
2,440✔
360
        key: u8,
2,440✔
361
        data: Vec<u8>,
2,440✔
362
    ) -> Result<Option<Vec<u8>>, PeerManagerError> {
2,440✔
363
        Ok(self.peer_db.set_metadata(node_id, key, data)?)
2,440✔
364
    }
2,440✔
365
}
366

367
#[allow(clippy::from_over_into)]
368
impl Into<CommsDatabase> for PeerStorageSql {
369
    fn into(self) -> CommsDatabase {
×
370
        self.peer_db
×
371
    }
×
372
}
373

374
#[cfg(test)]
375
mod test {
376
    use std::{borrow::BorrowMut, iter::repeat_with};
377

378
    use chrono::{DateTime, Utc};
379
    use multiaddr::Multiaddr;
380
    use rand::Rng;
381
    use tari_common_sqlite::connection::DbConnection;
382

383
    use super::*;
384
    use crate::{
385
        net_address::{MultiaddrWithStats, MultiaddressesWithStats, PeerAddressSource},
386
        peer_manager::{database::MIGRATIONS, peer::PeerFlags},
387
    };
388

389
    fn get_peer_db_sql_test_db() -> Result<PeerDatabaseSql, PeerManagerError> {
5✔
390
        let db_connection = DbConnection::connect_temp_file_and_migrate(MIGRATIONS).unwrap();
5✔
391
        Ok(PeerDatabaseSql::new(
5✔
392
            db_connection,
5✔
393
            &create_test_peer(PeerFeatures::COMMUNICATION_NODE, false),
5✔
394
        )?)
5✔
395
    }
5✔
396

397
    fn get_peer_storage_sql_test_db() -> Result<PeerStorageSql, PeerManagerError> {
4✔
398
        PeerStorageSql::new_indexed(get_peer_db_sql_test_db()?)
4✔
399
    }
4✔
400

401
    #[test]
402
    fn test_restore() {
1✔
403
        // Create Peers
1✔
404
        let mut rng = rand::rngs::OsRng;
1✔
405
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
1✔
406
        let node_id = NodeId::from_key(&pk);
1✔
407
        let net_address1 = "/ip4/1.2.3.4/tcp/8000".parse::<Multiaddr>().unwrap();
1✔
408
        let net_address2 = "/ip4/5.6.7.8/tcp/8000".parse::<Multiaddr>().unwrap();
1✔
409
        let net_address3 = "/ip4/5.6.7.8/tcp/7000".parse::<Multiaddr>().unwrap();
1✔
410
        let mut net_addresses =
1✔
411
            MultiaddressesWithStats::from_addresses_with_source(vec![net_address1], &PeerAddressSource::Config);
1✔
412
        net_addresses.add_address(&net_address2, &PeerAddressSource::Config);
1✔
413
        net_addresses.add_address(&net_address3, &PeerAddressSource::Config);
1✔
414
        let peer1 = Peer::new(
1✔
415
            pk,
1✔
416
            node_id,
1✔
417
            net_addresses,
1✔
418
            PeerFlags::default(),
1✔
419
            PeerFeatures::empty(),
1✔
420
            Default::default(),
1✔
421
            Default::default(),
1✔
422
        );
1✔
423

1✔
424
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
1✔
425
        let node_id = NodeId::from_key(&pk);
1✔
426
        let net_address4 = "/ip4/9.10.11.12/tcp/7000".parse::<Multiaddr>().unwrap();
1✔
427
        let net_addresses =
1✔
428
            MultiaddressesWithStats::from_addresses_with_source(vec![net_address4], &PeerAddressSource::Config);
1✔
429
        let peer2: Peer = Peer::new(
1✔
430
            pk,
1✔
431
            node_id,
1✔
432
            net_addresses,
1✔
433
            PeerFlags::default(),
1✔
434
            PeerFeatures::empty(),
1✔
435
            Default::default(),
1✔
436
            Default::default(),
1✔
437
        );
1✔
438

1✔
439
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
1✔
440
        let node_id = NodeId::from_key(&pk);
1✔
441
        let net_address5 = "/ip4/13.14.15.16/tcp/6000".parse::<Multiaddr>().unwrap();
1✔
442
        let net_address6 = "/ip4/17.18.19.20/tcp/8000".parse::<Multiaddr>().unwrap();
1✔
443
        let mut net_addresses =
1✔
444
            MultiaddressesWithStats::from_addresses_with_source(vec![net_address5], &PeerAddressSource::Config);
1✔
445
        net_addresses.add_address(&net_address6, &PeerAddressSource::Config);
1✔
446
        let peer3 = Peer::new(
1✔
447
            pk,
1✔
448
            node_id,
1✔
449
            net_addresses,
1✔
450
            PeerFlags::default(),
1✔
451
            PeerFeatures::empty(),
1✔
452
            Default::default(),
1✔
453
            Default::default(),
1✔
454
        );
1✔
455

1✔
456
        // Create new datastore with a peer database
1✔
457
        let mut db = Some(get_peer_db_sql_test_db().unwrap());
1✔
458
        {
1✔
459
            let peer_storage = db.take().unwrap();
1✔
460

1✔
461
            // Test adding and searching for peers
1✔
462
            assert!(peer_storage.add_or_update_peer(peer1.clone()).is_ok());
1✔
463
            assert!(peer_storage.add_or_update_peer(peer2.clone()).is_ok());
1✔
464
            assert!(peer_storage.add_or_update_peer(peer3.clone()).is_ok());
1✔
465

466
            assert_eq!(peer_storage.size(), 3);
1✔
467
            assert!(peer_storage.get_peer_by_public_key(&peer1.public_key).is_ok());
1✔
468
            assert!(peer_storage.get_peer_by_public_key(&peer2.public_key).is_ok());
1✔
469
            assert!(peer_storage.get_peer_by_public_key(&peer3.public_key).is_ok());
1✔
470
            db = Some(peer_storage);
1✔
471
        }
1✔
472
        // Restore from existing database
1✔
473
        let peer_storage = PeerStorageSql::new_indexed(db.take().unwrap()).unwrap();
1✔
474

1✔
475
        assert_eq!(peer_storage.peer_db.size(), 3);
1✔
476
        assert!(peer_storage.find_by_public_key(&peer1.public_key).is_ok());
1✔
477
        assert!(peer_storage.find_by_public_key(&peer2.public_key).is_ok());
1✔
478
        assert!(peer_storage.find_by_public_key(&peer3.public_key).is_ok());
1✔
479
    }
1✔
480

481
    #[allow(clippy::too_many_lines)]
482
    #[test]
483
    fn test_add_delete_find_peer() {
1✔
484
        let peer_storage = get_peer_storage_sql_test_db().unwrap();
1✔
485

1✔
486
        // Create Peers
1✔
487
        let mut rng = rand::rngs::OsRng;
1✔
488
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
1✔
489
        let node_id = NodeId::from_key(&pk);
1✔
490
        let net_address1 = "/ip4/1.2.3.4/tcp/8000".parse::<Multiaddr>().unwrap();
1✔
491
        let net_address2 = "/ip4/5.6.7.8/tcp/8000".parse::<Multiaddr>().unwrap();
1✔
492
        let net_address3 = "/ip4/5.6.7.8/tcp/7000".parse::<Multiaddr>().unwrap();
1✔
493
        let mut net_addresses =
1✔
494
            MultiaddressesWithStats::from_addresses_with_source(vec![net_address1], &PeerAddressSource::Config);
1✔
495
        net_addresses.add_address(&net_address2, &PeerAddressSource::Config);
1✔
496
        net_addresses.add_address(&net_address3, &PeerAddressSource::Config);
1✔
497
        let peer1 = Peer::new(
1✔
498
            pk,
1✔
499
            node_id,
1✔
500
            net_addresses,
1✔
501
            PeerFlags::default(),
1✔
502
            PeerFeatures::empty(),
1✔
503
            Default::default(),
1✔
504
            Default::default(),
1✔
505
        );
1✔
506

1✔
507
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
1✔
508
        let node_id = NodeId::from_key(&pk);
1✔
509
        let net_address4 = "/ip4/9.10.11.12/tcp/7000".parse::<Multiaddr>().unwrap();
1✔
510
        let net_addresses =
1✔
511
            MultiaddressesWithStats::from_addresses_with_source(vec![net_address4], &PeerAddressSource::Config);
1✔
512
        let peer2: Peer = Peer::new(
1✔
513
            pk,
1✔
514
            node_id,
1✔
515
            net_addresses,
1✔
516
            PeerFlags::default(),
1✔
517
            PeerFeatures::empty(),
1✔
518
            Default::default(),
1✔
519
            Default::default(),
1✔
520
        );
1✔
521

1✔
522
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
1✔
523
        let node_id = NodeId::from_key(&pk);
1✔
524
        let net_address5 = "/ip4/13.14.15.16/tcp/6000".parse::<Multiaddr>().unwrap();
1✔
525
        let net_address6 = "/ip4/17.18.19.20/tcp/8000".parse::<Multiaddr>().unwrap();
1✔
526
        let mut net_addresses =
1✔
527
            MultiaddressesWithStats::from_addresses_with_source(vec![net_address5], &PeerAddressSource::Config);
1✔
528
        net_addresses.add_address(&net_address6, &PeerAddressSource::Config);
1✔
529
        let peer3 = Peer::new(
1✔
530
            pk,
1✔
531
            node_id,
1✔
532
            net_addresses,
1✔
533
            PeerFlags::default(),
1✔
534
            PeerFeatures::empty(),
1✔
535
            Default::default(),
1✔
536
            Default::default(),
1✔
537
        );
1✔
538
        // Test adding and searching for peers
1✔
539
        peer_storage.add_or_update_peer(peer1.clone()).unwrap(); // assert!(peer_storage.add_or_update_peer(peer1.clone()).is_ok());
1✔
540
        assert!(peer_storage.add_or_update_peer(peer2.clone()).is_ok());
1✔
541
        assert!(peer_storage.add_or_update_peer(peer3.clone()).is_ok());
1✔
542

543
        assert_eq!(peer_storage.peer_db.size(), 3);
1✔
544

545
        assert_eq!(
1✔
546
            peer_storage
1✔
547
                .find_by_public_key(&peer1.public_key)
1✔
548
                .unwrap()
1✔
549
                .unwrap()
1✔
550
                .public_key,
1✔
551
            peer1.public_key
1✔
552
        );
1✔
553
        assert_eq!(
1✔
554
            peer_storage
1✔
555
                .find_by_public_key(&peer2.public_key)
1✔
556
                .unwrap()
1✔
557
                .unwrap()
1✔
558
                .public_key,
1✔
559
            peer2.public_key
1✔
560
        );
1✔
561
        assert_eq!(
1✔
562
            peer_storage
1✔
563
                .find_by_public_key(&peer3.public_key)
1✔
564
                .unwrap()
1✔
565
                .unwrap()
1✔
566
                .public_key,
1✔
567
            peer3.public_key
1✔
568
        );
1✔
569

570
        assert_eq!(
1✔
571
            peer_storage
1✔
572
                .get_peer_by_node_id(&peer1.node_id)
1✔
573
                .unwrap()
1✔
574
                .unwrap()
1✔
575
                .node_id,
1✔
576
            peer1.node_id
1✔
577
        );
1✔
578
        assert_eq!(
1✔
579
            peer_storage
1✔
580
                .get_peer_by_node_id(&peer2.node_id)
1✔
581
                .unwrap()
1✔
582
                .unwrap()
1✔
583
                .node_id,
1✔
584
            peer2.node_id
1✔
585
        );
1✔
586
        assert_eq!(
1✔
587
            peer_storage
1✔
588
                .get_peer_by_node_id(&peer3.node_id)
1✔
589
                .unwrap()
1✔
590
                .unwrap()
1✔
591
                .node_id,
1✔
592
            peer3.node_id
1✔
593
        );
1✔
594

595
        peer_storage.find_by_public_key(&peer1.public_key).unwrap().unwrap();
1✔
596
        peer_storage.find_by_public_key(&peer2.public_key).unwrap().unwrap();
1✔
597
        peer_storage.find_by_public_key(&peer3.public_key).unwrap().unwrap();
1✔
598

1✔
599
        // Test delete of border case peer
1✔
600
        assert!(peer_storage.soft_delete_peer(&peer3.node_id).is_ok());
1✔
601

602
        // It is a logical delete, so there should still be 3 peers in the db
603
        assert_eq!(peer_storage.peer_db.size(), 3);
1✔
604

605
        assert_eq!(
1✔
606
            peer_storage
1✔
607
                .find_by_public_key(&peer1.public_key)
1✔
608
                .unwrap()
1✔
609
                .unwrap()
1✔
610
                .public_key,
1✔
611
            peer1.public_key
1✔
612
        );
1✔
613
        assert_eq!(
1✔
614
            peer_storage
1✔
615
                .find_by_public_key(&peer2.public_key)
1✔
616
                .unwrap()
1✔
617
                .unwrap()
1✔
618
                .public_key,
1✔
619
            peer2.public_key
1✔
620
        );
1✔
621
        assert!(peer_storage
1✔
622
            .find_by_public_key(&peer3.public_key)
1✔
623
            .unwrap()
1✔
624
            .unwrap()
1✔
625
            .deleted_at
1✔
626
            .is_some());
1✔
627

628
        assert_eq!(
1✔
629
            peer_storage
1✔
630
                .get_peer_by_node_id(&peer1.node_id)
1✔
631
                .unwrap()
1✔
632
                .unwrap()
1✔
633
                .node_id,
1✔
634
            peer1.node_id
1✔
635
        );
1✔
636
        assert_eq!(
1✔
637
            peer_storage
1✔
638
                .get_peer_by_node_id(&peer2.node_id)
1✔
639
                .unwrap()
1✔
640
                .unwrap()
1✔
641
                .node_id,
1✔
642
            peer2.node_id
1✔
643
        );
1✔
644
        assert!(peer_storage
1✔
645
            .get_peer_by_node_id(&peer3.node_id)
1✔
646
            .unwrap()
1✔
647
            .unwrap()
1✔
648
            .deleted_at
1✔
649
            .is_some());
1✔
650
    }
1✔
651

652
    fn create_test_peer(features: PeerFeatures, ban: bool) -> Peer {
28✔
653
        let mut rng = rand::rngs::OsRng;
28✔
654

28✔
655
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
28✔
656
        let node_id = NodeId::from_key(&pk);
28✔
657

28✔
658
        let mut net_addresses = MultiaddressesWithStats::from_addresses_with_source(vec![], &PeerAddressSource::Config);
28✔
659

660
        // Create 1 to 4 random addresses
661
        for _i in 1..=rand::thread_rng().gen_range(1..4) {
61✔
662
            let n = [
61✔
663
                rand::thread_rng().gen_range(1..255),
61✔
664
                rand::thread_rng().gen_range(1..255),
61✔
665
                rand::thread_rng().gen_range(1..255),
61✔
666
                rand::thread_rng().gen_range(1..255),
61✔
667
                rand::thread_rng().gen_range(5000..9000),
61✔
668
            ];
61✔
669
            let net_address = format!("/ip4/{}.{}.{}.{}/tcp/{}", n[0], n[1], n[2], n[3], n[4])
61✔
670
                .parse::<Multiaddr>()
61✔
671
                .unwrap();
61✔
672
            net_addresses.add_address(&net_address, &PeerAddressSource::Config);
61✔
673
        }
61✔
674

675
        let mut peer = Peer::new(
28✔
676
            pk,
28✔
677
            node_id,
28✔
678
            net_addresses,
28✔
679
            PeerFlags::default(),
28✔
680
            features,
28✔
681
            Default::default(),
28✔
682
            Default::default(),
28✔
683
        );
28✔
684
        if ban {
28✔
685
            peer.ban_for(Duration::from_secs(600), "".to_string());
1✔
686
        }
27✔
687
        peer
28✔
688
    }
28✔
689

690
    #[test]
691
    fn test_in_network_region() {
1✔
692
        let peer_storage = get_peer_storage_sql_test_db().unwrap();
1✔
693

1✔
694
        let mut nodes = repeat_with(|| create_test_peer(PeerFeatures::COMMUNICATION_NODE, false))
5✔
695
            .take(5)
1✔
696
            .chain(repeat_with(|| create_test_peer(PeerFeatures::COMMUNICATION_CLIENT, false)).take(4))
4✔
697
            .collect::<Vec<_>>();
1✔
698

699
        for p in &nodes {
10✔
700
            peer_storage.add_or_update_peer(p.clone()).unwrap();
9✔
701
        }
9✔
702

703
        let main_peer_node_id = peer_storage.this_peer_identity().node_id;
1✔
704

1✔
705
        nodes.sort_by(|a, b| {
19✔
706
            a.node_id
19✔
707
                .distance(&main_peer_node_id)
19✔
708
                .cmp(&b.node_id.distance(&main_peer_node_id))
19✔
709
        });
19✔
710

1✔
711
        let db_nodes = peer_storage.peer_db.get_all_peers(None).unwrap();
1✔
712
        assert_eq!(db_nodes.len(), 9);
1✔
713

714
        let close_node = &nodes.first().unwrap().node_id;
1✔
715
        let far_node = &nodes.last().unwrap().node_id;
1✔
716

1✔
717
        let is_in_region = peer_storage.in_network_region(&main_peer_node_id, 1).unwrap();
1✔
718
        assert!(is_in_region);
1✔
719

720
        let is_in_region = peer_storage.in_network_region(close_node, 1).unwrap();
1✔
721
        assert!(is_in_region);
1✔
722

723
        let is_in_region = peer_storage.in_network_region(far_node, 9).unwrap();
1✔
724
        assert!(is_in_region);
1✔
725

726
        let is_in_region = peer_storage.in_network_region(far_node, 3).unwrap();
1✔
727
        assert!(!is_in_region);
1✔
728
    }
1✔
729

730
    #[test]
731
    fn get_just_seeds() {
1✔
732
        let peer_storage = get_peer_storage_sql_test_db().unwrap();
1✔
733

1✔
734
        let seeds = repeat_with(|| {
5✔
735
            let mut peer = create_test_peer(PeerFeatures::COMMUNICATION_NODE, false);
5✔
736
            peer.add_flags(PeerFlags::SEED);
5✔
737
            peer
5✔
738
        })
5✔
739
        .take(5)
1✔
740
        .collect::<Vec<_>>();
1✔
741

742
        for p in &seeds {
6✔
743
            peer_storage.add_or_update_peer(p.clone()).unwrap();
5✔
744
        }
5✔
745

746
        let nodes = repeat_with(|| create_test_peer(PeerFeatures::COMMUNICATION_NODE, false))
5✔
747
            .take(5)
1✔
748
            .collect::<Vec<_>>();
1✔
749

750
        for p in &nodes {
6✔
751
            peer_storage.add_or_update_peer(p.clone()).unwrap();
5✔
752
        }
5✔
753
        let retrieved_seeds = peer_storage.get_seed_peers().unwrap();
1✔
754
        assert_eq!(retrieved_seeds.len(), seeds.len());
1✔
755
        for seed in seeds {
6✔
756
            assert!(retrieved_seeds.iter().any(|p| p.node_id == seed.node_id));
15✔
757
        }
758
    }
1✔
759

760
    #[test]
761
    fn discovery_syncing_returns_correct_peers() {
1✔
762
        let peer_storage = get_peer_storage_sql_test_db().unwrap();
1✔
763

1✔
764
        // Threshold duration + a minute
1✔
765
        #[allow(clippy::cast_possible_wrap)] // Won't wrap around, numbers are static
1✔
766
        let above_the_threshold = Utc::now().timestamp() - (STALE_PEER_THRESHOLD_DURATION.as_secs() + 60) as i64;
1✔
767

1✔
768
        let never_seen_peer = create_test_peer(PeerFeatures::COMMUNICATION_NODE, false);
1✔
769
        let banned_peer = create_test_peer(PeerFeatures::COMMUNICATION_NODE, true);
1✔
770

1✔
771
        let mut not_active_peer = create_test_peer(PeerFeatures::COMMUNICATION_NODE, false);
1✔
772
        let address = not_active_peer.addresses.best().unwrap();
1✔
773
        let mut address = MultiaddrWithStats::new(address.address().clone(), PeerAddressSource::Config);
1✔
774
        address.mark_last_attempted(DateTime::from_timestamp(above_the_threshold, 0).unwrap().naive_utc());
1✔
775
        not_active_peer
1✔
776
            .addresses
1✔
777
            .merge(&MultiaddressesWithStats::from(vec![address]));
1✔
778

1✔
779
        let mut good_peer = create_test_peer(PeerFeatures::COMMUNICATION_NODE, false);
1✔
780
        let good_addresses = good_peer.addresses.borrow_mut();
1✔
781
        let good_address = good_addresses.addresses()[0].address().clone();
1✔
782
        good_addresses.mark_last_seen_now(&good_address);
1✔
783

1✔
784
        assert!(peer_storage.add_or_update_peer(never_seen_peer).is_ok());
1✔
785
        assert!(peer_storage.add_or_update_peer(not_active_peer).is_ok());
1✔
786
        assert!(peer_storage.add_or_update_peer(banned_peer).is_ok());
1✔
787
        assert!(peer_storage.add_or_update_peer(good_peer).is_ok());
1✔
788

789
        assert_eq!(peer_storage.all(None).unwrap().len(), 4);
1✔
790
        assert_eq!(
1✔
791
            peer_storage
1✔
792
                .discovery_syncing(100, &[], Some(PeerFeatures::COMMUNICATION_NODE))
1✔
793
                .unwrap()
1✔
794
                .len(),
1✔
795
            1
1✔
796
        );
1✔
797
    }
1✔
798
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc