• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 17488107584

05 Sep 2025 08:33AM UTC coverage: 61.19% (-0.03%) from 61.216%
17488107584

push

github

web-flow
fix: false negative retrieving seed peers in connectivity manager at startup (#7474)

Description
---
This PR fixes a race condition between adding seed peers at startup and
retrieving them from the peer manager in the connectivity manager.

Motivation and Context
---
The warning message `WARN Failed to get seed peers from PeerManager,
using empty list // comms/core/src/connectivity/manager.rs:207` is a
timing issue. In this context the seed peer list is passed to the
proactive dialer to exclude them from being proactively dialed if other
connections exist. On a node with proper connectivity, seed peer dial
requests are coming through straight after.

- `2025-09-04 16:28:05.638210600 [p2p::initialization] DEBUG Adding seed
peer [PeerFlags(SEED)[0984896e74022c44] `
- `2025-09-04 16:28:05.648016500 [comms::connectivity::manager] DEBUG
ConnectivityManager started`
- `2025-09-04 16:28:05.648239900 [comms::connectivity::manager] WARN
Failed to get seed peers from PeerManager, using empty list`
- `2025-09-04 16:28:05.648850400 [comms::dht::connectivity] DEBUG Adding
5 neighbouring peer(s), removing 0 peers: 0984896e74022c442c1034852c, `
- `2025-09-04 16:28:05.648912900 [comms::connectivity::manager] TRACE
Request (11267568784269984977): DialPeer { node_id:
NodeId(0984896e74022c442c1034852c), reply_tx: None }` ...
- `2025-09-04 16:31:05.655859300 [comms::connectivity::manager] DEBUG
(5619097807157888458) Starting proactive dialing execution - current
connections: 10, target: 8`

How Has This Been Tested?
---
System-level testing

What process can a PR reviewer use to test or verify this change?
---
Code review

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be included exactly as is in the CHANGELOG, so please think
about it carefully. -->


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard for... (continued)

15 of 25 new or added lines in 3 files covered. (60.0%)

37 existing lines in 9 files now uncovered.

72924 of 119176 relevant lines covered (61.19%)

300608.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.84
/comms/core/src/peer_manager/peer_storage_sql.rs
1
//  Copyright 2019 The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{cmp::min, time::Duration};
24

25
use log::*;
26
use multiaddr::Multiaddr;
27

28
use crate::{
29
    net_address::PeerAddressSource,
30
    peer_manager::{
31
        database::{PeerDatabaseSql, ThisPeerIdentity},
32
        peer::Peer,
33
        peer_id::PeerId,
34
        NodeDistance,
35
        NodeId,
36
        PeerFeatures,
37
        PeerFlags,
38
        PeerManagerError,
39
    },
40
    types::{CommsDatabase, CommsPublicKey, TransportProtocol},
41
};
42

43
const LOG_TARGET: &str = "comms::peer_manager::peer_storage_sql";
44
// The maximum number of peers to return in peer manager
45
const PEER_MANAGER_SYNC_PEERS: usize = 100;
46
// The maximum amount of time a peer can be inactive before being considered stale:
47
// ((5 days, 24h, 60m, 60s)/2 = 2.5 days)
48
pub const STALE_PEER_THRESHOLD_DURATION: Duration = Duration::from_secs(5 * 24 * 60 * 60 / 2);
49

50
/// PeerStorageSql provides a mechanism to keep a datastore and a local copy of all peers in sync and allow fast
51
/// searches using the node_id, public key or net_address of a peer.
52
#[derive(Clone)]
53
pub struct PeerStorageSql {
54
    peer_db: PeerDatabaseSql,
55
}
56

57
impl PeerStorageSql {
58
    /// Constructs a new PeerStorageSql, with indexes populated from the given datastore
59
    pub fn new_indexed(database: PeerDatabaseSql) -> Result<PeerStorageSql, PeerManagerError> {
172✔
60
        trace!(
172✔
61
            target: LOG_TARGET,
×
62
            "Peer storage is initialized. {} total entries.",
×
63
            database.size(),
×
64
        );
65

66
        Ok(PeerStorageSql { peer_db: database })
172✔
67
    }
172✔
68

69
    /// Get this peer's identity
70
    pub fn this_peer_identity(&self) -> ThisPeerIdentity {
1,517✔
71
        self.peer_db.this_peer_identity()
1,517✔
72
    }
1,517✔
73

74
    /// Get the size of the database
75
    pub fn count(&self) -> usize {
5,101✔
76
        self.peer_db.size()
5,101✔
77
    }
5,101✔
78

79
    /// Adds or updates a peer and sets the last connection as successful.
80
    /// If the peer is marked as offline, it will be unmarked.
81
    pub fn add_or_update_peer(&self, peer: Peer) -> Result<PeerId, PeerManagerError> {
3,633✔
82
        Ok(self.peer_db.add_or_update_peer(peer)?)
3,633✔
83
    }
3,633✔
84

85
    /// Adds a peer an online peer if the peer does not already exist. When a peer already
86
    /// exists, the stored version will be replaced with the newly provided peer.
87
    pub fn add_or_update_online_peer(
1✔
88
        &self,
1✔
89
        pubkey: &CommsPublicKey,
1✔
90
        node_id: &NodeId,
1✔
91
        addresses: &[Multiaddr],
1✔
92
        peer_features: &PeerFeatures,
1✔
93
        source: &PeerAddressSource,
1✔
94
    ) -> Result<Peer, PeerManagerError> {
1✔
95
        Ok(self
1✔
96
            .peer_db
1✔
97
            .add_or_update_online_peer(pubkey, node_id, addresses, peer_features, source)?)
1✔
98
    }
1✔
99

100
    /// The peer with the specified node id will be soft deleted (marked as deleted)
101
    pub fn soft_delete_peer(&self, node_id: &NodeId) -> Result<(), PeerManagerError> {
1✔
102
        self.peer_db.soft_delete_peer(node_id)?;
1✔
103
        Ok(())
1✔
104
    }
1✔
105

106
    /// Find the peer with the provided NodeID
107
    pub fn get_peer_by_node_id(&self, node_id: &NodeId) -> Result<Option<Peer>, PeerManagerError> {
362✔
108
        Ok(self.peer_db.get_peer_by_node_id(node_id)?)
362✔
109
    }
362✔
110

111
    /// Get all peers based on a list of their node_ids
112
    pub fn get_peers_by_node_ids(&self, node_ids: &[NodeId]) -> Result<Vec<Peer>, PeerManagerError> {
1✔
113
        Ok(self.peer_db.get_peers_by_node_ids(node_ids)?)
1✔
114
    }
1✔
115

116
    /// Get all peers based on a list of their node_ids
117
    pub fn get_peer_public_keys_by_node_ids(
×
118
        &self,
×
119
        node_ids: &[NodeId],
×
120
    ) -> Result<Vec<CommsPublicKey>, PeerManagerError> {
×
121
        Ok(self.peer_db.get_peer_public_keys_by_node_ids(node_ids)?)
×
122
    }
×
123

124
    /// Get all banned peers
125
    pub fn get_banned_peers(&self) -> Result<Vec<Peer>, PeerManagerError> {
×
126
        Ok(self.peer_db.get_banned_peers()?)
×
127
    }
×
128

129
    pub fn find_all_starts_with(&self, partial: &[u8]) -> Result<Vec<Peer>, PeerManagerError> {
×
130
        Ok(self.peer_db.find_all_peers_match_partial_key(partial)?)
×
131
    }
×
132

133
    /// Find the peer with the provided PublicKey
134
    pub fn find_by_public_key(&self, public_key: &CommsPublicKey) -> Result<Option<Peer>, PeerManagerError> {
98✔
135
        Ok(self.peer_db.get_peer_by_public_key(public_key)?)
98✔
136
    }
98✔
137

138
    /// Check if a peer exist using the specified public_key
139
    pub fn exists_public_key(&self, public_key: &CommsPublicKey) -> Result<bool, PeerManagerError> {
17✔
140
        if let Ok(val) = self.peer_db.peer_exists_by_public_key(public_key) {
17✔
141
            Ok(val.is_some())
17✔
142
        } else {
143
            Ok(false)
×
144
        }
145
    }
17✔
146

147
    /// Check if a peer exist using the specified node_id
148
    pub fn exists_node_id(&self, node_id: &NodeId) -> Result<bool, PeerManagerError> {
×
149
        if let Ok(val) = self.peer_db.peer_exists_by_node_id(node_id) {
×
150
            Ok(val.is_some())
×
151
        } else {
152
            Ok(false)
×
153
        }
154
    }
×
155

156
    /// Return the peer by corresponding to the provided NodeId if it is not banned
157
    pub fn direct_identity_node_id(&self, node_id: &NodeId) -> Result<Peer, PeerManagerError> {
27✔
158
        let peer = self
27✔
159
            .get_peer_by_node_id(node_id)?
27✔
160
            .ok_or(PeerManagerError::peer_not_found(node_id))?;
27✔
161

162
        if peer.is_banned() {
26✔
163
            Err(PeerManagerError::BannedPeer)
×
164
        } else {
165
            Ok(peer)
26✔
166
        }
167
    }
27✔
168

169
    /// Return the peer by corresponding to the provided public key if it is not banned
170
    pub fn direct_identity_public_key(&self, public_key: &CommsPublicKey) -> Result<Peer, PeerManagerError> {
20✔
171
        let peer = self
20✔
172
            .find_by_public_key(public_key)?
20✔
173
            .ok_or(PeerManagerError::peer_not_found(&NodeId::from_public_key(public_key)))?;
20✔
174

175
        if peer.is_banned() {
20✔
176
            Err(PeerManagerError::BannedPeer)
×
177
        } else {
178
            Ok(peer)
20✔
179
        }
180
    }
20✔
181

182
    /// Return all peers, optionally filtering on supplied feature
183
    pub fn all(&self, features: Option<PeerFeatures>) -> Result<Vec<Peer>, PeerManagerError> {
1✔
184
        Ok(self.peer_db.get_all_peers(features)?)
1✔
185
    }
1✔
186

187
    /// Return "good" peers for syncing
188
    /// Criteria:
189
    ///  - Peer is not banned
190
    ///  - Peer has been seen within a defined time span (within the threshold)
191
    ///  - Only returns a maximum number of syncable peers (corresponds with the max possible number of requestable
192
    ///    peers to sync)
193
    ///  - Uses 0 as max PEER_MANAGER_SYNC_PEERS
194
    ///  - Peers has an address that is reachable - with supported transport protocols
195
    pub fn discovery_syncing(
8✔
196
        &self,
8✔
197
        mut n: usize,
8✔
198
        excluded_peers: &[NodeId],
8✔
199
        features: Option<PeerFeatures>,
8✔
200
        external_addresses_only: bool,
8✔
201
        transport_protocols: &[TransportProtocol],
8✔
202
    ) -> Result<Vec<Peer>, PeerManagerError> {
8✔
203
        if n == 0 {
8✔
204
            n = PEER_MANAGER_SYNC_PEERS;
×
205
        } else {
8✔
206
            n = min(n, PEER_MANAGER_SYNC_PEERS);
8✔
207
        }
8✔
208

209
        Ok(self.peer_db.get_n_random_active_peers(
8✔
210
            n,
8✔
211
            excluded_peers,
8✔
212
            features,
8✔
213
            None,
8✔
214
            Some(STALE_PEER_THRESHOLD_DURATION),
8✔
215
            external_addresses_only,
8✔
216
            transport_protocols,
8✔
217
        )?)
8✔
218
    }
8✔
219

220
    /// Compile a list of all known peers
221
    pub fn get_not_banned_or_deleted_peers(&self) -> Result<Vec<Peer>, PeerManagerError> {
1✔
222
        Ok(self
1✔
223
            .peer_db
1✔
224
            .get_n_not_banned_or_deleted_peers(PEER_MANAGER_SYNC_PEERS)?)
1✔
225
    }
1✔
226

227
    /// Get available dial candidates that are communication nodes, not banned, not deleted, reachable,
228
    /// and not in the excluded node IDs list
229
    pub fn get_available_dial_candidates(
16✔
230
        &self,
16✔
231
        exclude_node_ids: &[NodeId],
16✔
232
        limit: Option<usize>,
16✔
233
        transport_protocols: &[TransportProtocol],
16✔
234
    ) -> Result<Vec<Peer>, PeerManagerError> {
16✔
235
        Ok(self
16✔
236
            .peer_db
16✔
237
            .get_available_dial_candidates(exclude_node_ids, limit, transport_protocols)?)
16✔
238
    }
16✔
239

240
    /// Compile a list of closest `n` active peers
241
    pub fn closest_n_active_peers(
1,618✔
242
        &self,
1,618✔
243
        region_node_id: &NodeId,
1,618✔
244
        n: usize,
1,618✔
245
        excluded_peers: &[NodeId],
1,618✔
246
        features: Option<PeerFeatures>,
1,618✔
247
        peer_flags: Option<PeerFlags>,
1,618✔
248
        stale_peer_threshold: Option<Duration>,
1,618✔
249
        exclude_if_all_address_failed: bool,
1,618✔
250
        exclusion_distance: Option<NodeDistance>,
1,618✔
251
        external_addresses_only: bool,
1,618✔
252
        transport_protocols: &[TransportProtocol],
1,618✔
253
    ) -> Result<Vec<Peer>, PeerManagerError> {
1,618✔
254
        Ok(self.peer_db.get_closest_n_active_peers(
1,618✔
255
            region_node_id,
1,618✔
256
            n,
1,618✔
257
            excluded_peers,
1,618✔
258
            features,
1,618✔
259
            peer_flags,
1,618✔
260
            stale_peer_threshold,
1,618✔
261
            exclude_if_all_address_failed,
1,618✔
262
            exclusion_distance,
1,618✔
263
            external_addresses_only,
1,618✔
264
            transport_protocols,
1,618✔
265
        )?)
1,618✔
266
    }
1,618✔
267

268
    /// Get all seed peers
269
    pub fn get_seed_peers(&self) -> Result<Vec<Peer>, PeerManagerError> {
15✔
270
        let seed_peers = self.peer_db.get_seed_peers()?;
15✔
271
        trace!(
15✔
NEW
272
            target: LOG_TARGET,
×
NEW
273
            "Get seed peers: {:?}",
×
NEW
274
            seed_peers.iter().map(|p| p.node_id.short_str()).collect::<Vec<_>>(),
×
275
        );
276
        Ok(seed_peers)
15✔
277
    }
15✔
278

279
    /// Compile a random list of communication node peers of size _n_ that are not banned or offline and
280
    /// external addresses support protocols defined in the `transport_protocols` vector.
281
    pub fn random_peers(
73✔
282
        &self,
73✔
283
        n: usize,
73✔
284
        exclude_peers: &[NodeId],
73✔
285
        flags: Option<PeerFlags>,
73✔
286
        transport_protocols: &[TransportProtocol],
73✔
287
    ) -> Result<Vec<Peer>, PeerManagerError> {
73✔
288
        Ok(self
73✔
289
            .peer_db
73✔
290
            .get_n_random_peers(n, exclude_peers, flags, transport_protocols)?)
73✔
291
    }
73✔
292

293
    /// Get the closest `n` not failed, banned or deleted peers, ordered by their distance to the given node ID.
294
    pub fn get_closest_n_good_standing_peers(
2✔
295
        &self,
2✔
296
        n: usize,
2✔
297
        features: PeerFeatures,
2✔
298
    ) -> Result<Vec<Peer>, PeerManagerError> {
2✔
299
        Ok(self.peer_db.get_closest_n_good_standing_peers(n, features)?)
2✔
300
    }
2✔
301

302
    /// Check if a specific node_id is in the network region of the N nearest neighbours of the region specified by
303
    /// region_node_id. If there are less than N known peers, this will _always_ return true
304
    pub fn in_network_region(&self, node_id: &NodeId, n: usize) -> Result<bool, PeerManagerError> {
4✔
305
        let region_node_id = self.this_peer_identity().node_id;
4✔
306
        let region_node_distance = region_node_id.distance(node_id);
4✔
307
        let node_threshold = self.calc_region_threshold(n, PeerFeatures::COMMUNICATION_NODE)?;
4✔
308
        // Is node ID in the base node threshold?
309
        if region_node_distance <= node_threshold {
4✔
310
            return Ok(true);
3✔
311
        }
1✔
312
        let client_threshold = self.calc_region_threshold(n, PeerFeatures::COMMUNICATION_CLIENT)?; // Is node ID in the base client threshold?
1✔
313
        Ok(region_node_distance <= client_threshold)
1✔
314
    }
4✔
315

316
    /// Calculate the threshold for the region specified by region_node_id.
317
    pub fn calc_region_threshold(&self, n: usize, features: PeerFeatures) -> Result<NodeDistance, PeerManagerError> {
9✔
318
        let region_node_id = self.this_peer_identity().node_id;
9✔
319
        if n == 0 {
9✔
320
            return Ok(NodeDistance::max_distance());
×
321
        }
9✔
322

323
        let closest_peers = self.peer_db.get_closest_n_good_standing_peer_node_ids(n, features)?;
9✔
324
        let mut dists = Vec::new();
9✔
325
        for node_id in closest_peers {
42✔
326
            dists.push(region_node_id.distance(&node_id));
33✔
327
        }
33✔
328

329
        if dists.is_empty() {
9✔
330
            return Ok(NodeDistance::max_distance());
×
331
        }
9✔
332

9✔
333
        // If we have less than `n` matching peers in our threshold group, the threshold should be max
9✔
334
        if dists.len() < n {
9✔
335
            return Ok(NodeDistance::max_distance());
1✔
336
        }
8✔
337

8✔
338
        Ok(dists.pop().expect("dists cannot be empty at this point"))
8✔
339
    }
9✔
340

341
    /// Unban the peer
342
    pub fn unban_peer(&self, node_id: &NodeId) -> Result<(), PeerManagerError> {
×
343
        let _node_id = self.peer_db.reset_banned(node_id)?;
×
344
        Ok(())
×
345
    }
×
346

347
    /// Unban the peer
348
    pub fn unban_all_peers(&self) -> Result<usize, PeerManagerError> {
×
349
        let number_unbanned = self.peer_db.reset_all_banned()?;
×
350
        Ok(number_unbanned)
×
351
    }
×
352

353
    pub fn reset_offline_non_wallet_peers(&self) -> Result<usize, PeerManagerError> {
×
354
        let number_offline = self.peer_db.reset_offline_non_wallet_peers()?;
×
355
        Ok(number_offline)
×
356
    }
×
357

358
    /// Ban the peer for the given duration
359
    pub fn ban_peer(
×
360
        &self,
×
361
        public_key: &CommsPublicKey,
×
362
        duration: Duration,
×
363
        reason: String,
×
364
    ) -> Result<NodeId, PeerManagerError> {
×
365
        let node_id = NodeId::from_key(public_key);
×
366
        self.peer_db
×
367
            .set_banned(&node_id, duration, reason)?
×
368
            .ok_or(PeerManagerError::peer_not_found(&NodeId::from_public_key(public_key)))
×
369
    }
×
370

371
    /// Ban the peer for the given duration
372
    pub fn ban_peer_by_node_id(
5✔
373
        &self,
5✔
374
        node_id: &NodeId,
5✔
375
        duration: Duration,
5✔
376
        reason: String,
5✔
377
    ) -> Result<NodeId, PeerManagerError> {
5✔
378
        self.peer_db
5✔
379
            .set_banned(node_id, duration, reason)?
5✔
380
            .ok_or(PeerManagerError::peer_not_found(node_id))
5✔
381
    }
5✔
382

383
    pub fn is_peer_banned(&self, node_id: &NodeId) -> Result<bool, PeerManagerError> {
143✔
384
        let peer = self
143✔
385
            .get_peer_by_node_id(node_id)?
143✔
386
            .ok_or(PeerManagerError::peer_not_found(node_id))?;
143✔
387
        Ok(peer.is_banned())
143✔
388
    }
143✔
389

390
    /// This will store metadata inside of the metadata field in the peer provided by the nodeID.
391
    /// It will return None if the value was empty and the old value if the value was updated
392
    pub fn set_peer_metadata(
140✔
393
        &self,
140✔
394
        node_id: &NodeId,
140✔
395
        key: u8,
140✔
396
        data: Vec<u8>,
140✔
397
    ) -> Result<Option<Vec<u8>>, PeerManagerError> {
140✔
398
        Ok(self.peer_db.set_metadata(node_id, key, data)?)
140✔
399
    }
140✔
400
}
401

402
#[allow(clippy::from_over_into)]
403
impl Into<CommsDatabase> for PeerStorageSql {
404
    fn into(self) -> CommsDatabase {
×
405
        self.peer_db
×
406
    }
×
407
}
408

409
#[cfg(test)]
410
mod test {
411
    #![allow(clippy::indexing_slicing)]
412
    use std::{borrow::BorrowMut, iter::repeat_with};
413

414
    use chrono::{DateTime, Utc};
415
    use multiaddr::Multiaddr;
416
    use rand::Rng;
417
    use tari_common_sqlite::connection::DbConnection;
418

419
    use super::*;
420
    use crate::{
421
        net_address::{MultiaddrWithStats, MultiaddressesWithStats, PeerAddressSource},
422
        peer_manager::{create_test_peer_add_internal_addresses, database::MIGRATIONS, peer::PeerFlags},
423
    };
424

425
    fn get_peer_db_sql_test_db() -> Result<PeerDatabaseSql, PeerManagerError> {
6✔
426
        let db_connection = DbConnection::connect_temp_file_and_migrate(MIGRATIONS).unwrap();
6✔
427
        Ok(PeerDatabaseSql::new(
6✔
428
            db_connection,
6✔
429
            &create_test_peer(PeerFeatures::COMMUNICATION_NODE, false),
6✔
430
        )?)
6✔
431
    }
6✔
432

433
    fn get_peer_storage_sql_test_db() -> Result<PeerStorageSql, PeerManagerError> {
5✔
434
        PeerStorageSql::new_indexed(get_peer_db_sql_test_db()?)
5✔
435
    }
5✔
436

437
    #[test]
438
    fn test_restore() {
1✔
439
        // Create Peers
1✔
440
        let mut rng = rand::rngs::OsRng;
1✔
441
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
1✔
442
        let node_id = NodeId::from_key(&pk);
1✔
443
        let net_address1 = "/ip4/1.2.3.4/tcp/8000".parse::<Multiaddr>().unwrap();
1✔
444
        let net_address2 = "/ip4/5.6.7.8/tcp/8000".parse::<Multiaddr>().unwrap();
1✔
445
        let net_address3 = "/ip4/5.6.7.8/tcp/7000".parse::<Multiaddr>().unwrap();
1✔
446
        let mut net_addresses =
1✔
447
            MultiaddressesWithStats::from_addresses_with_source(vec![net_address1], &PeerAddressSource::Config);
1✔
448
        net_addresses.add_address(&net_address2, &PeerAddressSource::Config);
1✔
449
        net_addresses.add_address(&net_address3, &PeerAddressSource::Config);
1✔
450
        let peer1 = Peer::new(
1✔
451
            pk,
1✔
452
            node_id,
1✔
453
            net_addresses,
1✔
454
            PeerFlags::default(),
1✔
455
            PeerFeatures::empty(),
1✔
456
            Default::default(),
1✔
457
            Default::default(),
1✔
458
        );
1✔
459

1✔
460
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
1✔
461
        let node_id = NodeId::from_key(&pk);
1✔
462
        let net_address4 = "/ip4/9.10.11.12/tcp/7000".parse::<Multiaddr>().unwrap();
1✔
463
        let net_addresses =
1✔
464
            MultiaddressesWithStats::from_addresses_with_source(vec![net_address4], &PeerAddressSource::Config);
1✔
465
        let peer2: Peer = Peer::new(
1✔
466
            pk,
1✔
467
            node_id,
1✔
468
            net_addresses,
1✔
469
            PeerFlags::default(),
1✔
470
            PeerFeatures::empty(),
1✔
471
            Default::default(),
1✔
472
            Default::default(),
1✔
473
        );
1✔
474

1✔
475
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
1✔
476
        let node_id = NodeId::from_key(&pk);
1✔
477
        let net_address5 = "/ip4/13.14.15.16/tcp/6000".parse::<Multiaddr>().unwrap();
1✔
478
        let net_address6 = "/ip4/17.18.19.20/tcp/8000".parse::<Multiaddr>().unwrap();
1✔
479
        let mut net_addresses =
1✔
480
            MultiaddressesWithStats::from_addresses_with_source(vec![net_address5], &PeerAddressSource::Config);
1✔
481
        net_addresses.add_address(&net_address6, &PeerAddressSource::Config);
1✔
482
        let peer3 = Peer::new(
1✔
483
            pk,
1✔
484
            node_id,
1✔
485
            net_addresses,
1✔
486
            PeerFlags::default(),
1✔
487
            PeerFeatures::empty(),
1✔
488
            Default::default(),
1✔
489
            Default::default(),
1✔
490
        );
1✔
491

1✔
492
        // Create new datastore with a peer database
1✔
493
        let mut db = Some(get_peer_db_sql_test_db().unwrap());
1✔
494
        {
1✔
495
            let peer_storage = db.take().unwrap();
1✔
496

1✔
497
            // Test adding and searching for peers
1✔
498
            assert!(peer_storage.add_or_update_peer(peer1.clone()).is_ok());
1✔
499
            assert!(peer_storage.add_or_update_peer(peer2.clone()).is_ok());
1✔
500
            assert!(peer_storage.add_or_update_peer(peer3.clone()).is_ok());
1✔
501

502
            assert_eq!(peer_storage.size(), 3);
1✔
503
            assert!(peer_storage.get_peer_by_public_key(&peer1.public_key).is_ok());
1✔
504
            assert!(peer_storage.get_peer_by_public_key(&peer2.public_key).is_ok());
1✔
505
            assert!(peer_storage.get_peer_by_public_key(&peer3.public_key).is_ok());
1✔
506
            db = Some(peer_storage);
1✔
507
        }
1✔
508
        // Restore from existing database
1✔
509
        let peer_storage = PeerStorageSql::new_indexed(db.take().unwrap()).unwrap();
1✔
510

1✔
511
        assert_eq!(peer_storage.peer_db.size(), 3);
1✔
512
        assert!(peer_storage.find_by_public_key(&peer1.public_key).is_ok());
1✔
513
        assert!(peer_storage.find_by_public_key(&peer2.public_key).is_ok());
1✔
514
        assert!(peer_storage.find_by_public_key(&peer3.public_key).is_ok());
1✔
515
    }
1✔
516

517
    #[allow(clippy::too_many_lines)]
518
    #[test]
519
    fn test_add_delete_find_peer() {
1✔
520
        let peer_storage = get_peer_storage_sql_test_db().unwrap();
1✔
521

1✔
522
        // Create Peers
1✔
523
        let mut rng = rand::rngs::OsRng;
1✔
524
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
1✔
525
        let node_id = NodeId::from_key(&pk);
1✔
526
        let net_address1 = "/ip4/1.2.3.4/tcp/8000".parse::<Multiaddr>().unwrap();
1✔
527
        let net_address2 = "/ip4/5.6.7.8/tcp/8000".parse::<Multiaddr>().unwrap();
1✔
528
        let net_address3 = "/ip4/5.6.7.8/tcp/7000".parse::<Multiaddr>().unwrap();
1✔
529
        let mut net_addresses =
1✔
530
            MultiaddressesWithStats::from_addresses_with_source(vec![net_address1], &PeerAddressSource::Config);
1✔
531
        net_addresses.add_address(&net_address2, &PeerAddressSource::Config);
1✔
532
        net_addresses.add_address(&net_address3, &PeerAddressSource::Config);
1✔
533
        let peer1 = Peer::new(
1✔
534
            pk,
1✔
535
            node_id,
1✔
536
            net_addresses,
1✔
537
            PeerFlags::default(),
1✔
538
            PeerFeatures::empty(),
1✔
539
            Default::default(),
1✔
540
            Default::default(),
1✔
541
        );
1✔
542

1✔
543
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
1✔
544
        let node_id = NodeId::from_key(&pk);
1✔
545
        let net_address4 = "/ip4/9.10.11.12/tcp/7000".parse::<Multiaddr>().unwrap();
1✔
546
        let net_addresses =
1✔
547
            MultiaddressesWithStats::from_addresses_with_source(vec![net_address4], &PeerAddressSource::Config);
1✔
548
        let peer2: Peer = Peer::new(
1✔
549
            pk,
1✔
550
            node_id,
1✔
551
            net_addresses,
1✔
552
            PeerFlags::default(),
1✔
553
            PeerFeatures::empty(),
1✔
554
            Default::default(),
1✔
555
            Default::default(),
1✔
556
        );
1✔
557

1✔
558
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
1✔
559
        let node_id = NodeId::from_key(&pk);
1✔
560
        let net_address5 = "/ip4/13.14.15.16/tcp/6000".parse::<Multiaddr>().unwrap();
1✔
561
        let net_address6 = "/ip4/17.18.19.20/tcp/8000".parse::<Multiaddr>().unwrap();
1✔
562
        let mut net_addresses =
1✔
563
            MultiaddressesWithStats::from_addresses_with_source(vec![net_address5], &PeerAddressSource::Config);
1✔
564
        net_addresses.add_address(&net_address6, &PeerAddressSource::Config);
1✔
565
        let peer3 = Peer::new(
1✔
566
            pk,
1✔
567
            node_id,
1✔
568
            net_addresses,
1✔
569
            PeerFlags::default(),
1✔
570
            PeerFeatures::empty(),
1✔
571
            Default::default(),
1✔
572
            Default::default(),
1✔
573
        );
1✔
574
        // Test adding and searching for peers
1✔
575
        peer_storage.add_or_update_peer(peer1.clone()).unwrap(); // assert!(peer_storage.add_or_update_peer(peer1.clone()).is_ok());
1✔
576
        assert!(peer_storage.add_or_update_peer(peer2.clone()).is_ok());
1✔
577
        assert!(peer_storage.add_or_update_peer(peer3.clone()).is_ok());
1✔
578

579
        assert_eq!(peer_storage.peer_db.size(), 3);
1✔
580

581
        assert_eq!(
1✔
582
            peer_storage
1✔
583
                .find_by_public_key(&peer1.public_key)
1✔
584
                .unwrap()
1✔
585
                .unwrap()
1✔
586
                .public_key,
1✔
587
            peer1.public_key
1✔
588
        );
1✔
589
        assert_eq!(
1✔
590
            peer_storage
1✔
591
                .find_by_public_key(&peer2.public_key)
1✔
592
                .unwrap()
1✔
593
                .unwrap()
1✔
594
                .public_key,
1✔
595
            peer2.public_key
1✔
596
        );
1✔
597
        assert_eq!(
1✔
598
            peer_storage
1✔
599
                .find_by_public_key(&peer3.public_key)
1✔
600
                .unwrap()
1✔
601
                .unwrap()
1✔
602
                .public_key,
1✔
603
            peer3.public_key
1✔
604
        );
1✔
605

606
        assert_eq!(
1✔
607
            peer_storage
1✔
608
                .get_peer_by_node_id(&peer1.node_id)
1✔
609
                .unwrap()
1✔
610
                .unwrap()
1✔
611
                .node_id,
1✔
612
            peer1.node_id
1✔
613
        );
1✔
614
        assert_eq!(
1✔
615
            peer_storage
1✔
616
                .get_peer_by_node_id(&peer2.node_id)
1✔
617
                .unwrap()
1✔
618
                .unwrap()
1✔
619
                .node_id,
1✔
620
            peer2.node_id
1✔
621
        );
1✔
622
        assert_eq!(
1✔
623
            peer_storage
1✔
624
                .get_peer_by_node_id(&peer3.node_id)
1✔
625
                .unwrap()
1✔
626
                .unwrap()
1✔
627
                .node_id,
1✔
628
            peer3.node_id
1✔
629
        );
1✔
630

631
        peer_storage.find_by_public_key(&peer1.public_key).unwrap().unwrap();
1✔
632
        peer_storage.find_by_public_key(&peer2.public_key).unwrap().unwrap();
1✔
633
        peer_storage.find_by_public_key(&peer3.public_key).unwrap().unwrap();
1✔
634

1✔
635
        // Test delete of border case peer
1✔
636
        assert!(peer_storage.soft_delete_peer(&peer3.node_id).is_ok());
1✔
637

638
        // It is a logical delete, so there should still be 3 peers in the db
639
        assert_eq!(peer_storage.peer_db.size(), 3);
1✔
640

641
        assert_eq!(
1✔
642
            peer_storage
1✔
643
                .find_by_public_key(&peer1.public_key)
1✔
644
                .unwrap()
1✔
645
                .unwrap()
1✔
646
                .public_key,
1✔
647
            peer1.public_key
1✔
648
        );
1✔
649
        assert_eq!(
1✔
650
            peer_storage
1✔
651
                .find_by_public_key(&peer2.public_key)
1✔
652
                .unwrap()
1✔
653
                .unwrap()
1✔
654
                .public_key,
1✔
655
            peer2.public_key
1✔
656
        );
1✔
657
        assert!(peer_storage
1✔
658
            .find_by_public_key(&peer3.public_key)
1✔
659
            .unwrap()
1✔
660
            .unwrap()
1✔
661
            .deleted_at
1✔
662
            .is_some());
1✔
663

664
        assert_eq!(
1✔
665
            peer_storage
1✔
666
                .get_peer_by_node_id(&peer1.node_id)
1✔
667
                .unwrap()
1✔
668
                .unwrap()
1✔
669
                .node_id,
1✔
670
            peer1.node_id
1✔
671
        );
1✔
672
        assert_eq!(
1✔
673
            peer_storage
1✔
674
                .get_peer_by_node_id(&peer2.node_id)
1✔
675
                .unwrap()
1✔
676
                .unwrap()
1✔
677
                .node_id,
1✔
678
            peer2.node_id
1✔
679
        );
1✔
680
        assert!(peer_storage
1✔
681
            .get_peer_by_node_id(&peer3.node_id)
1✔
682
            .unwrap()
1✔
683
            .unwrap()
1✔
684
            .deleted_at
1✔
685
            .is_some());
1✔
686
    }
1✔
687

688
    fn create_test_peer(features: PeerFeatures, ban: bool) -> Peer {
30✔
689
        let mut rng = rand::rngs::OsRng;
30✔
690

30✔
691
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
30✔
692
        let node_id = NodeId::from_key(&pk);
30✔
693

30✔
694
        let mut net_addresses = MultiaddressesWithStats::from_addresses_with_source(vec![], &PeerAddressSource::Config);
30✔
695

696
        // Create 1 to 4 random addresses
697
        for _i in 1..=rand::thread_rng().gen_range(1..4) {
57✔
698
            let n = [
57✔
699
                rand::thread_rng().gen_range(1..255),
57✔
700
                rand::thread_rng().gen_range(1..255),
57✔
701
                rand::thread_rng().gen_range(1..255),
57✔
702
                rand::thread_rng().gen_range(1..255),
57✔
703
                rand::thread_rng().gen_range(5000..9000),
57✔
704
            ];
57✔
705
            let net_address = format!("/ip4/{}.{}.{}.{}/tcp/{}", n[0], n[1], n[2], n[3], n[4])
57✔
706
                .parse::<Multiaddr>()
57✔
707
                .unwrap();
57✔
708
            net_addresses.add_address(&net_address, &PeerAddressSource::Config);
57✔
709
        }
57✔
710

711
        let mut peer = Peer::new(
30✔
712
            pk,
30✔
713
            node_id,
30✔
714
            net_addresses,
30✔
715
            PeerFlags::default(),
30✔
716
            features,
30✔
717
            Default::default(),
30✔
718
            Default::default(),
30✔
719
        );
30✔
720
        if ban {
30✔
721
            peer.ban_for(Duration::from_secs(600), "".to_string());
1✔
722
        }
29✔
723
        peer
30✔
724
    }
30✔
725

726
    #[test]
727
    fn test_in_network_region() {
1✔
728
        let peer_storage = get_peer_storage_sql_test_db().unwrap();
1✔
729

1✔
730
        let mut nodes = repeat_with(|| create_test_peer(PeerFeatures::COMMUNICATION_NODE, false))
5✔
731
            .take(5)
1✔
732
            .chain(repeat_with(|| create_test_peer(PeerFeatures::COMMUNICATION_CLIENT, false)).take(4))
4✔
733
            .collect::<Vec<_>>();
1✔
734

735
        for p in &nodes {
10✔
736
            peer_storage.add_or_update_peer(p.clone()).unwrap();
9✔
737
        }
9✔
738

739
        let main_peer_node_id = peer_storage.this_peer_identity().node_id;
1✔
740

1✔
741
        nodes.sort_by(|a, b| {
31✔
742
            a.node_id
31✔
743
                .distance(&main_peer_node_id)
31✔
744
                .cmp(&b.node_id.distance(&main_peer_node_id))
31✔
745
        });
31✔
746

1✔
747
        let db_nodes = peer_storage.peer_db.get_all_peers(None).unwrap();
1✔
748
        assert_eq!(db_nodes.len(), 9);
1✔
749

750
        let close_node = &nodes.first().unwrap().node_id;
1✔
751
        let far_node = &nodes.last().unwrap().node_id;
1✔
752

1✔
753
        let is_in_region = peer_storage.in_network_region(&main_peer_node_id, 1).unwrap();
1✔
754
        assert!(is_in_region);
1✔
755

756
        let is_in_region = peer_storage.in_network_region(close_node, 1).unwrap();
1✔
757
        assert!(is_in_region);
1✔
758

759
        let is_in_region = peer_storage.in_network_region(far_node, 9).unwrap();
1✔
760
        assert!(is_in_region);
1✔
761

762
        let is_in_region = peer_storage.in_network_region(far_node, 3).unwrap();
1✔
763
        assert!(!is_in_region);
1✔
764
    }
1✔
765

766
    #[test]
767
    fn get_just_seeds() {
1✔
768
        let peer_storage = get_peer_storage_sql_test_db().unwrap();
1✔
769

1✔
770
        let seeds = repeat_with(|| {
5✔
771
            let mut peer = create_test_peer(PeerFeatures::COMMUNICATION_NODE, false);
5✔
772
            peer.add_flags(PeerFlags::SEED);
5✔
773
            peer
5✔
774
        })
5✔
775
        .take(5)
1✔
776
        .collect::<Vec<_>>();
1✔
777

778
        for p in &seeds {
6✔
779
            peer_storage.add_or_update_peer(p.clone()).unwrap();
5✔
780
        }
5✔
781

782
        let nodes = repeat_with(|| create_test_peer(PeerFeatures::COMMUNICATION_NODE, false))
5✔
783
            .take(5)
1✔
784
            .collect::<Vec<_>>();
1✔
785

786
        for p in &nodes {
6✔
787
            peer_storage.add_or_update_peer(p.clone()).unwrap();
5✔
788
        }
5✔
789
        let retrieved_seeds = peer_storage.get_seed_peers().unwrap();
1✔
790
        assert_eq!(retrieved_seeds.len(), seeds.len());
1✔
791
        for seed in seeds {
6✔
792
            assert!(retrieved_seeds.iter().any(|p| p.node_id == seed.node_id));
15✔
793
        }
794
    }
1✔
795

796
    #[test]
797
    fn discovery_syncing_returns_correct_peers() {
1✔
798
        let peer_storage = get_peer_storage_sql_test_db().unwrap();
1✔
799

1✔
800
        // Threshold duration + a minute
1✔
801
        #[allow(clippy::cast_possible_wrap)] // Won't wrap around, numbers are static
1✔
802
        let above_the_threshold = Utc::now().timestamp() - (STALE_PEER_THRESHOLD_DURATION.as_secs() + 60) as i64;
1✔
803

1✔
804
        let never_seen_peer = create_test_peer(PeerFeatures::COMMUNICATION_NODE, false);
1✔
805
        let banned_peer = create_test_peer(PeerFeatures::COMMUNICATION_NODE, true);
1✔
806

1✔
807
        let mut not_active_peer = create_test_peer(PeerFeatures::COMMUNICATION_NODE, false);
1✔
808
        let address = not_active_peer.addresses.best().unwrap();
1✔
809
        let mut address = MultiaddrWithStats::new(address.address().clone(), PeerAddressSource::Config);
1✔
810
        address.mark_last_attempted(DateTime::from_timestamp(above_the_threshold, 0).unwrap().naive_utc());
1✔
811
        not_active_peer
1✔
812
            .addresses
1✔
813
            .merge(&MultiaddressesWithStats::from(vec![address]));
1✔
814

1✔
815
        let mut good_peer = create_test_peer(PeerFeatures::COMMUNICATION_NODE, false);
1✔
816
        let good_addresses = good_peer.addresses.borrow_mut();
1✔
817
        let good_address = good_addresses.addresses()[0].address().clone();
1✔
818
        good_addresses.mark_last_seen_now(&good_address);
1✔
819

1✔
820
        let mut good_seed = create_test_peer(PeerFeatures::COMMUNICATION_NODE, false);
1✔
821
        good_seed.flags = PeerFlags::SEED;
1✔
822
        let good_addresses = good_seed.addresses.borrow_mut();
1✔
823
        let good_address = good_addresses.addresses()[0].address().clone();
1✔
824
        good_addresses.mark_last_seen_now(&good_address);
1✔
825

1✔
826
        assert!(peer_storage.add_or_update_peer(never_seen_peer).is_ok());
1✔
827
        assert!(peer_storage.add_or_update_peer(not_active_peer).is_ok());
1✔
828
        assert!(peer_storage.add_or_update_peer(banned_peer).is_ok());
1✔
829
        assert!(peer_storage.add_or_update_peer(good_peer).is_ok());
1✔
830
        assert!(peer_storage.add_or_update_peer(good_seed.clone()).is_ok());
1✔
831

832
        assert_eq!(peer_storage.all(None).unwrap().len(), 5);
1✔
833
        assert_eq!(
1✔
834
            peer_storage
1✔
835
                .discovery_syncing(
1✔
836
                    100,
1✔
837
                    &[good_seed.node_id],
1✔
838
                    Some(PeerFeatures::COMMUNICATION_NODE),
1✔
839
                    false,
1✔
840
                    &[]
1✔
841
                )
1✔
842
                .unwrap()
1✔
843
                .len(),
1✔
844
            1
1✔
845
        );
1✔
846
        assert_eq!(
1✔
847
            peer_storage
1✔
848
                .discovery_syncing(100, &[], Some(PeerFeatures::COMMUNICATION_NODE), false, &[])
1✔
849
                .unwrap()
1✔
850
                .len(),
1✔
851
            2
1✔
852
        );
1✔
853
    }
1✔
854

855
    #[test]
856
    fn discovery_syncing_peers_with_external_addresses_only() {
1✔
857
        let peer_storage = get_peer_storage_sql_test_db().unwrap();
1✔
858
        let nodes = repeat_with(|| create_test_peer_add_internal_addresses(false, PeerFeatures::COMMUNICATION_NODE))
5✔
859
            .take(5)
1✔
860
            .collect::<Vec<_>>();
1✔
861
        let wallets =
1✔
862
            repeat_with(|| create_test_peer_add_internal_addresses(false, PeerFeatures::COMMUNICATION_CLIENT))
5✔
863
                .take(5)
1✔
864
                .collect::<Vec<_>>();
1✔
865
        for peer in nodes.iter().chain(wallets.iter()) {
10✔
866
            peer_storage.add_or_update_peer(peer.clone()).unwrap();
10✔
867
        }
10✔
868

869
        // Assert that peers have internal and external addresses
870
        let nodes_all_addresses = peer_storage
1✔
871
            .discovery_syncing(100, &[], Some(PeerFeatures::COMMUNICATION_NODE), false, &[])
1✔
872
            .unwrap();
1✔
873
        assert!(nodes_all_addresses
1✔
874
            .iter()
1✔
875
            .all(|p| { p.addresses.addresses().iter().any(|addr| addr.is_external()) }));
5✔
876
        assert!(nodes_all_addresses
1✔
877
            .iter()
1✔
878
            .all(|p| { p.addresses.addresses().iter().any(|addr| !addr.is_external()) }));
15✔
879

880
        // Assert that peers have external addresses only
881
        let nodes_external_addresses_only = peer_storage
1✔
882
            .discovery_syncing(100, &[], Some(PeerFeatures::COMMUNICATION_NODE), true, &[])
1✔
883
            .unwrap();
1✔
884
        assert!(nodes_external_addresses_only
1✔
885
            .iter()
1✔
886
            .all(|p| { p.addresses.addresses().iter().all(|addr| addr.is_external()) }));
10✔
887
    }
1✔
888
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc