• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 18097567115

29 Sep 2025 12:50PM UTC coverage: 58.554% (-2.3%) from 60.88%
18097567115

push

github

web-flow
chore(ci): switch rust toolchain to stable (#7524)

Description
switch rust toolchain to stable

Motivation and Context
use stable rust toolchain


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

* **Chores**
* Standardized Rust toolchain on stable across CI workflows for more
predictable builds.
* Streamlined setup by removing unnecessary components and aligning
toolchain configuration with environment variables.
  * Enabled an environment flag to improve rustup behavior during CI.
* Improved coverage workflow consistency with dynamic toolchain
selection.

* **Tests**
* Removed nightly-only requirements, simplifying test commands and
improving compatibility.
* Expanded CI triggers to include ci-* branches for better pre-merge
validation.
* Maintained existing job logic while improving reliability and
maintainability.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

66336 of 113291 relevant lines covered (58.55%)

551641.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.17
/comms/core/src/peer_manager/peer_storage_sql.rs
1
//  Copyright 2019 The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{cmp::min, time::Duration};
24

25
use log::*;
26
use multiaddr::Multiaddr;
27

28
use crate::{
29
    net_address::PeerAddressSource,
30
    peer_manager::{
31
        database::{PeerDatabaseSql, ThisPeerIdentity},
32
        peer::Peer,
33
        peer_id::PeerId,
34
        NodeDistance,
35
        NodeId,
36
        PeerFeatures,
37
        PeerFlags,
38
        PeerManagerError,
39
    },
40
    types::{CommsDatabase, CommsPublicKey, TransportProtocol},
41
};
42

43
const LOG_TARGET: &str = "comms::peer_manager::peer_storage_sql";
44
// The maximum number of peers to return in peer manager
45
const PEER_MANAGER_SYNC_PEERS: usize = 100;
46
// The maximum amount of time a peer can be inactive before being considered stale:
47
// ((5 days, 24h, 60m, 60s)/2 = 2.5 days)
48
pub const STALE_PEER_THRESHOLD_DURATION: Duration = Duration::from_secs(5 * 24 * 60 * 60 / 2);
49

50
/// PeerStorageSql provides a mechanism to keep a datastore and a local copy of all peers in sync and allow fast
51
/// searches using the node_id, public key or net_address of a peer.
52
#[derive(Clone)]
53
pub struct PeerStorageSql {
54
    peer_db: PeerDatabaseSql,
55
}
56

57
impl PeerStorageSql {
58
    /// Constructs a new PeerStorageSql, with indexes populated from the given datastore
59
    pub fn new_indexed(database: PeerDatabaseSql) -> Result<PeerStorageSql, PeerManagerError> {
172✔
60
        trace!(
172✔
61
            target: LOG_TARGET,
×
62
            "Peer storage is initialized. {} total entries.",
×
63
            database.size(),
×
64
        );
65

66
        Ok(PeerStorageSql { peer_db: database })
172✔
67
    }
172✔
68

69
    /// Get this peer's identity
70
    pub fn this_peer_identity(&self) -> ThisPeerIdentity {
1,517✔
71
        self.peer_db.this_peer_identity()
1,517✔
72
    }
1,517✔
73

74
    /// Get the size of the database
75
    pub fn count(&self) -> usize {
5,098✔
76
        self.peer_db.size()
5,098✔
77
    }
5,098✔
78

79
    /// Adds or updates a peer and sets the last connection as successful.
80
    /// If the peer is marked as offline, it will be unmarked.
81
    pub fn add_or_update_peer(&self, peer: Peer) -> Result<PeerId, PeerManagerError> {
3,630✔
82
        Ok(self.peer_db.add_or_update_peer(peer)?)
3,630✔
83
    }
3,630✔
84

85
    /// Adds a peer an online peer if the peer does not already exist. When a peer already
86
    /// exists, the stored version will be replaced with the newly provided peer.
87
    pub fn add_or_update_online_peer(
1✔
88
        &self,
1✔
89
        pubkey: &CommsPublicKey,
1✔
90
        node_id: &NodeId,
1✔
91
        addresses: &[Multiaddr],
1✔
92
        peer_features: &PeerFeatures,
1✔
93
        source: &PeerAddressSource,
1✔
94
    ) -> Result<Peer, PeerManagerError> {
1✔
95
        Ok(self
1✔
96
            .peer_db
1✔
97
            .add_or_update_online_peer(pubkey, node_id, addresses, peer_features, source)?)
1✔
98
    }
1✔
99

100
    /// The peer with the specified node id will be soft deleted (marked as deleted)
101
    pub fn soft_delete_peer(&self, node_id: &NodeId) -> Result<(), PeerManagerError> {
1✔
102
        self.peer_db.soft_delete_peer(node_id)?;
1✔
103
        Ok(())
1✔
104
    }
1✔
105

106
    /// Find the peer with the provided NodeID
107
    pub fn get_peer_by_node_id(&self, node_id: &NodeId) -> Result<Option<Peer>, PeerManagerError> {
368✔
108
        Ok(self.peer_db.get_peer_by_node_id(node_id)?)
368✔
109
    }
368✔
110

111
    /// Get all peers based on a list of their node_ids
112
    pub fn get_peers_by_node_ids(&self, node_ids: &[NodeId]) -> Result<Vec<Peer>, PeerManagerError> {
1✔
113
        Ok(self.peer_db.get_peers_by_node_ids(node_ids)?)
1✔
114
    }
1✔
115

116
    /// Get all peers based on a list of their node_ids
117
    pub fn get_peer_public_keys_by_node_ids(
×
118
        &self,
×
119
        node_ids: &[NodeId],
×
120
    ) -> Result<Vec<CommsPublicKey>, PeerManagerError> {
×
121
        Ok(self.peer_db.get_peer_public_keys_by_node_ids(node_ids)?)
×
122
    }
×
123

124
    /// Get all banned peers
125
    pub fn get_banned_peers(&self) -> Result<Vec<Peer>, PeerManagerError> {
×
126
        Ok(self.peer_db.get_banned_peers()?)
×
127
    }
×
128

129
    pub fn find_all_starts_with(&self, partial: &[u8]) -> Result<Vec<Peer>, PeerManagerError> {
×
130
        Ok(self.peer_db.find_all_peers_match_partial_key(partial)?)
×
131
    }
×
132

133
    /// Find the peer with the provided PublicKey
134
    pub fn find_by_public_key(&self, public_key: &CommsPublicKey) -> Result<Option<Peer>, PeerManagerError> {
96✔
135
        Ok(self.peer_db.get_peer_by_public_key(public_key)?)
96✔
136
    }
96✔
137

138
    /// Check if a peer exist using the specified public_key
139
    pub fn exists_public_key(&self, public_key: &CommsPublicKey) -> Result<bool, PeerManagerError> {
17✔
140
        if let Ok(val) = self.peer_db.peer_exists_by_public_key(public_key) {
17✔
141
            Ok(val.is_some())
17✔
142
        } else {
143
            Ok(false)
×
144
        }
145
    }
17✔
146

147
    /// Check if a peer exist using the specified node_id
148
    pub fn exists_node_id(&self, node_id: &NodeId) -> Result<bool, PeerManagerError> {
×
149
        if let Ok(val) = self.peer_db.peer_exists_by_node_id(node_id) {
×
150
            Ok(val.is_some())
×
151
        } else {
152
            Ok(false)
×
153
        }
154
    }
×
155

156
    /// Return the peer by corresponding to the provided NodeId if it is not banned
157
    pub fn direct_identity_node_id(&self, node_id: &NodeId) -> Result<Peer, PeerManagerError> {
27✔
158
        let peer = self
27✔
159
            .get_peer_by_node_id(node_id)?
27✔
160
            .ok_or(PeerManagerError::peer_not_found(node_id))?;
27✔
161

162
        if peer.is_banned() {
26✔
163
            Err(PeerManagerError::BannedPeer)
×
164
        } else {
165
            Ok(peer)
26✔
166
        }
167
    }
27✔
168

169
    /// Return the peer by corresponding to the provided public key if it is not banned
170
    pub fn direct_identity_public_key(&self, public_key: &CommsPublicKey) -> Result<Peer, PeerManagerError> {
20✔
171
        let peer = self
20✔
172
            .find_by_public_key(public_key)?
20✔
173
            .ok_or(PeerManagerError::peer_not_found(&NodeId::from_public_key(public_key)))?;
20✔
174

175
        if peer.is_banned() {
20✔
176
            Err(PeerManagerError::BannedPeer)
×
177
        } else {
178
            Ok(peer)
20✔
179
        }
180
    }
20✔
181

182
    /// Return all peers, optionally filtering on supplied feature
183
    pub fn all(&self, features: Option<PeerFeatures>) -> Result<Vec<Peer>, PeerManagerError> {
1✔
184
        Ok(self.peer_db.get_all_peers(features)?)
1✔
185
    }
1✔
186

187
    /// Return "good" peers for syncing
188
    /// Criteria:
189
    ///  - Peer is not banned
190
    ///  - Peer has been seen within a defined time span (within the threshold)
191
    ///  - Only returns a maximum number of syncable peers (corresponds with the max possible number of requestable
192
    ///    peers to sync)
193
    ///  - Uses 0 as max PEER_MANAGER_SYNC_PEERS
194
    ///  - Peers has an address that is reachable - with supported transport protocols
195
    pub fn discovery_syncing(
8✔
196
        &self,
8✔
197
        mut n: usize,
8✔
198
        excluded_peers: &[NodeId],
8✔
199
        features: Option<PeerFeatures>,
8✔
200
        external_addresses_only: bool,
8✔
201
    ) -> Result<Vec<Peer>, PeerManagerError> {
8✔
202
        if n == 0 {
8✔
203
            n = PEER_MANAGER_SYNC_PEERS;
×
204
        } else {
8✔
205
            n = min(n, PEER_MANAGER_SYNC_PEERS);
8✔
206
        }
8✔
207

208
        Ok(self.peer_db.get_n_random_active_peers(
8✔
209
            n,
8✔
210
            excluded_peers,
8✔
211
            features,
8✔
212
            None,
8✔
213
            Some(STALE_PEER_THRESHOLD_DURATION),
8✔
214
            external_addresses_only,
8✔
215
            &[],
8✔
216
        )?)
×
217
    }
8✔
218

219
    /// Compile a list of all known peers
220
    pub fn get_not_banned_or_deleted_peers(&self) -> Result<Vec<Peer>, PeerManagerError> {
1✔
221
        Ok(self
1✔
222
            .peer_db
1✔
223
            .get_n_not_banned_or_deleted_peers(PEER_MANAGER_SYNC_PEERS)?)
1✔
224
    }
1✔
225

226
    /// Get available dial candidates that are communication nodes, not banned, not deleted, reachable,
227
    /// and not in the excluded node IDs list
228
    pub fn get_available_dial_candidates(
24✔
229
        &self,
24✔
230
        exclude_node_ids: &[NodeId],
24✔
231
        limit: Option<usize>,
24✔
232
        transport_protocols: &[TransportProtocol],
24✔
233
    ) -> Result<Vec<Peer>, PeerManagerError> {
24✔
234
        Ok(self
24✔
235
            .peer_db
24✔
236
            .get_available_dial_candidates(exclude_node_ids, limit, transport_protocols)?)
24✔
237
    }
24✔
238

239
    /// Compile a list of closest `n` active peers
240
    pub fn closest_n_active_peers(
1,617✔
241
        &self,
1,617✔
242
        region_node_id: &NodeId,
1,617✔
243
        n: usize,
1,617✔
244
        excluded_peers: &[NodeId],
1,617✔
245
        features: Option<PeerFeatures>,
1,617✔
246
        peer_flags: Option<PeerFlags>,
1,617✔
247
        stale_peer_threshold: Option<Duration>,
1,617✔
248
        exclude_if_all_address_failed: bool,
1,617✔
249
        exclusion_distance: Option<NodeDistance>,
1,617✔
250
        external_addresses_only: bool,
1,617✔
251
        transport_protocols: &[TransportProtocol],
1,617✔
252
    ) -> Result<Vec<Peer>, PeerManagerError> {
1,617✔
253
        Ok(self.peer_db.get_closest_n_active_peers(
1,617✔
254
            region_node_id,
1,617✔
255
            n,
1,617✔
256
            excluded_peers,
1,617✔
257
            features,
1,617✔
258
            peer_flags,
1,617✔
259
            stale_peer_threshold,
1,617✔
260
            exclude_if_all_address_failed,
1,617✔
261
            exclusion_distance,
1,617✔
262
            external_addresses_only,
1,617✔
263
            transport_protocols,
1,617✔
264
        )?)
×
265
    }
1,617✔
266

267
    /// Get all seed peers
268
    pub fn get_seed_peers(&self) -> Result<Vec<Peer>, PeerManagerError> {
19✔
269
        let seed_peers = self.peer_db.get_seed_peers()?;
19✔
270
        trace!(
19✔
271
            target: LOG_TARGET,
×
272
            "Get seed peers: {:?}",
×
273
            seed_peers.iter().map(|p| p.node_id.short_str()).collect::<Vec<_>>(),
×
274
        );
275
        Ok(seed_peers)
19✔
276
    }
19✔
277

278
    /// Compile a random list of communication node peers of size _n_ that are not banned or offline and
279
    /// external addresses support protocols defined in the `transport_protocols` vector.
280
    pub fn random_peers(
74✔
281
        &self,
74✔
282
        n: usize,
74✔
283
        exclude_peers: &[NodeId],
74✔
284
        flags: Option<PeerFlags>,
74✔
285
        transport_protocols: &[TransportProtocol],
74✔
286
    ) -> Result<Vec<Peer>, PeerManagerError> {
74✔
287
        Ok(self
74✔
288
            .peer_db
74✔
289
            .get_n_random_peers(n, exclude_peers, flags, transport_protocols)?)
74✔
290
    }
74✔
291

292
    /// Get the closest `n` not failed, banned or deleted peers, ordered by their distance to the given node ID.
293
    pub fn get_closest_n_good_standing_peers(
2✔
294
        &self,
2✔
295
        n: usize,
2✔
296
        features: PeerFeatures,
2✔
297
    ) -> Result<Vec<Peer>, PeerManagerError> {
2✔
298
        Ok(self.peer_db.get_closest_n_good_standing_peers(n, features)?)
2✔
299
    }
2✔
300

301
    /// Check if a specific node_id is in the network region of the N nearest neighbours of the region specified by
302
    /// region_node_id. If there are less than N known peers, this will _always_ return true
303
    pub fn in_network_region(&self, node_id: &NodeId, n: usize) -> Result<bool, PeerManagerError> {
4✔
304
        let region_node_id = self.this_peer_identity().node_id;
4✔
305
        let region_node_distance = region_node_id.distance(node_id);
4✔
306
        let node_threshold = self.calc_region_threshold(n, PeerFeatures::COMMUNICATION_NODE)?;
4✔
307
        // Is node ID in the base node threshold?
308
        if region_node_distance <= node_threshold {
4✔
309
            return Ok(true);
3✔
310
        }
1✔
311
        let client_threshold = self.calc_region_threshold(n, PeerFeatures::COMMUNICATION_CLIENT)?; // Is node ID in the base client threshold?
1✔
312
        Ok(region_node_distance <= client_threshold)
1✔
313
    }
4✔
314

315
    /// Calculate the threshold for the region specified by region_node_id.
316
    pub fn calc_region_threshold(&self, n: usize, features: PeerFeatures) -> Result<NodeDistance, PeerManagerError> {
9✔
317
        let region_node_id = self.this_peer_identity().node_id;
9✔
318
        if n == 0 {
9✔
319
            return Ok(NodeDistance::max_distance());
×
320
        }
9✔
321

322
        let closest_peers = self.peer_db.get_closest_n_good_standing_peer_node_ids(n, features)?;
9✔
323
        let mut dists = Vec::new();
9✔
324
        for node_id in closest_peers {
42✔
325
            dists.push(region_node_id.distance(&node_id));
33✔
326
        }
33✔
327

328
        if dists.is_empty() {
9✔
329
            return Ok(NodeDistance::max_distance());
×
330
        }
9✔
331

332
        // If we have less than `n` matching peers in our threshold group, the threshold should be max
333
        if dists.len() < n {
9✔
334
            return Ok(NodeDistance::max_distance());
1✔
335
        }
8✔
336

337
        Ok(dists.pop().expect("dists cannot be empty at this point"))
8✔
338
    }
9✔
339

340
    /// Unban the peer
341
    pub fn unban_peer(&self, node_id: &NodeId) -> Result<(), PeerManagerError> {
×
342
        let _node_id = self.peer_db.reset_banned(node_id)?;
×
343
        Ok(())
×
344
    }
×
345

346
    /// Unban the peer
347
    pub fn unban_all_peers(&self) -> Result<usize, PeerManagerError> {
×
348
        let number_unbanned = self.peer_db.reset_all_banned()?;
×
349
        Ok(number_unbanned)
×
350
    }
×
351

352
    pub fn reset_offline_non_wallet_peers(&self) -> Result<usize, PeerManagerError> {
×
353
        let number_offline = self.peer_db.reset_offline_non_wallet_peers()?;
×
354
        Ok(number_offline)
×
355
    }
×
356

357
    /// Ban the peer for the given duration
358
    pub fn ban_peer(
×
359
        &self,
×
360
        public_key: &CommsPublicKey,
×
361
        duration: Duration,
×
362
        reason: String,
×
363
    ) -> Result<NodeId, PeerManagerError> {
×
364
        let node_id = NodeId::from_key(public_key);
×
365
        self.peer_db
×
366
            .set_banned(&node_id, duration, reason)?
×
367
            .ok_or(PeerManagerError::peer_not_found(&NodeId::from_public_key(public_key)))
×
368
    }
×
369

370
    /// Ban the peer for the given duration
371
    pub fn ban_peer_by_node_id(
5✔
372
        &self,
5✔
373
        node_id: &NodeId,
5✔
374
        duration: Duration,
5✔
375
        reason: String,
5✔
376
    ) -> Result<NodeId, PeerManagerError> {
5✔
377
        self.peer_db
5✔
378
            .set_banned(node_id, duration, reason)?
5✔
379
            .ok_or(PeerManagerError::peer_not_found(node_id))
5✔
380
    }
5✔
381

382
    pub fn is_peer_banned(&self, node_id: &NodeId) -> Result<bool, PeerManagerError> {
146✔
383
        let peer = self
146✔
384
            .get_peer_by_node_id(node_id)?
146✔
385
            .ok_or(PeerManagerError::peer_not_found(node_id))?;
146✔
386
        Ok(peer.is_banned())
146✔
387
    }
146✔
388

389
    /// This will store metadata inside of the metadata field in the peer provided by the nodeID.
390
    /// It will return None if the value was empty and the old value if the value was updated
391
    pub fn set_peer_metadata(
140✔
392
        &self,
140✔
393
        node_id: &NodeId,
140✔
394
        key: u8,
140✔
395
        data: Vec<u8>,
140✔
396
    ) -> Result<Option<Vec<u8>>, PeerManagerError> {
140✔
397
        Ok(self.peer_db.set_metadata(node_id, key, data)?)
140✔
398
    }
140✔
399
}
400

401
#[allow(clippy::from_over_into)]
402
impl Into<CommsDatabase> for PeerStorageSql {
403
    fn into(self) -> CommsDatabase {
×
404
        self.peer_db
×
405
    }
×
406
}
407

408
#[cfg(test)]
409
mod test {
410
    #![allow(clippy::indexing_slicing)]
411
    use std::{borrow::BorrowMut, iter::repeat_with};
412

413
    use chrono::{DateTime, Utc};
414
    use multiaddr::Multiaddr;
415
    use rand::Rng;
416
    use tari_common_sqlite::connection::DbConnection;
417

418
    use super::*;
419
    use crate::{
420
        net_address::{MultiaddrWithStats, MultiaddressesWithStats, PeerAddressSource},
421
        peer_manager::{create_test_peer_add_internal_addresses, database::MIGRATIONS, peer::PeerFlags},
422
    };
423

424
    fn get_peer_db_sql_test_db() -> Result<PeerDatabaseSql, PeerManagerError> {
6✔
425
        let db_connection = DbConnection::connect_temp_file_and_migrate(MIGRATIONS).unwrap();
6✔
426
        Ok(PeerDatabaseSql::new(
6✔
427
            db_connection,
6✔
428
            &create_test_peer(PeerFeatures::COMMUNICATION_NODE, false),
6✔
429
        )?)
×
430
    }
6✔
431

432
    fn get_peer_storage_sql_test_db() -> Result<PeerStorageSql, PeerManagerError> {
5✔
433
        PeerStorageSql::new_indexed(get_peer_db_sql_test_db()?)
5✔
434
    }
5✔
435

436
    #[test]
437
    fn test_restore() {
1✔
438
        // Create Peers
439
        let mut rng = rand::rngs::OsRng;
1✔
440
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
1✔
441
        let node_id = NodeId::from_key(&pk);
1✔
442
        let net_address1 = "/ip4/1.2.3.4/tcp/8000".parse::<Multiaddr>().unwrap();
1✔
443
        let net_address2 = "/ip4/5.6.7.8/tcp/8000".parse::<Multiaddr>().unwrap();
1✔
444
        let net_address3 = "/ip4/5.6.7.8/tcp/7000".parse::<Multiaddr>().unwrap();
1✔
445
        let mut net_addresses =
1✔
446
            MultiaddressesWithStats::from_addresses_with_source(vec![net_address1], &PeerAddressSource::Config);
1✔
447
        net_addresses.add_address(&net_address2, &PeerAddressSource::Config);
1✔
448
        net_addresses.add_address(&net_address3, &PeerAddressSource::Config);
1✔
449
        let peer1 = Peer::new(
1✔
450
            pk,
1✔
451
            node_id,
1✔
452
            net_addresses,
1✔
453
            PeerFlags::default(),
1✔
454
            PeerFeatures::empty(),
1✔
455
            Default::default(),
1✔
456
            Default::default(),
1✔
457
        );
458

459
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
1✔
460
        let node_id = NodeId::from_key(&pk);
1✔
461
        let net_address4 = "/ip4/9.10.11.12/tcp/7000".parse::<Multiaddr>().unwrap();
1✔
462
        let net_addresses =
1✔
463
            MultiaddressesWithStats::from_addresses_with_source(vec![net_address4], &PeerAddressSource::Config);
1✔
464
        let peer2: Peer = Peer::new(
1✔
465
            pk,
1✔
466
            node_id,
1✔
467
            net_addresses,
1✔
468
            PeerFlags::default(),
1✔
469
            PeerFeatures::empty(),
1✔
470
            Default::default(),
1✔
471
            Default::default(),
1✔
472
        );
473

474
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
1✔
475
        let node_id = NodeId::from_key(&pk);
1✔
476
        let net_address5 = "/ip4/13.14.15.16/tcp/6000".parse::<Multiaddr>().unwrap();
1✔
477
        let net_address6 = "/ip4/17.18.19.20/tcp/8000".parse::<Multiaddr>().unwrap();
1✔
478
        let mut net_addresses =
1✔
479
            MultiaddressesWithStats::from_addresses_with_source(vec![net_address5], &PeerAddressSource::Config);
1✔
480
        net_addresses.add_address(&net_address6, &PeerAddressSource::Config);
1✔
481
        let peer3 = Peer::new(
1✔
482
            pk,
1✔
483
            node_id,
1✔
484
            net_addresses,
1✔
485
            PeerFlags::default(),
1✔
486
            PeerFeatures::empty(),
1✔
487
            Default::default(),
1✔
488
            Default::default(),
1✔
489
        );
490

491
        // Create new datastore with a peer database
492
        let mut db = Some(get_peer_db_sql_test_db().unwrap());
1✔
493
        {
494
            let peer_storage = db.take().unwrap();
1✔
495

496
            // Test adding and searching for peers
497
            assert!(peer_storage.add_or_update_peer(peer1.clone()).is_ok());
1✔
498
            assert!(peer_storage.add_or_update_peer(peer2.clone()).is_ok());
1✔
499
            assert!(peer_storage.add_or_update_peer(peer3.clone()).is_ok());
1✔
500

501
            assert_eq!(peer_storage.size(), 3);
1✔
502
            assert!(peer_storage.get_peer_by_public_key(&peer1.public_key).is_ok());
1✔
503
            assert!(peer_storage.get_peer_by_public_key(&peer2.public_key).is_ok());
1✔
504
            assert!(peer_storage.get_peer_by_public_key(&peer3.public_key).is_ok());
1✔
505
            db = Some(peer_storage);
1✔
506
        }
507
        // Restore from existing database
508
        let peer_storage = PeerStorageSql::new_indexed(db.take().unwrap()).unwrap();
1✔
509

510
        assert_eq!(peer_storage.peer_db.size(), 3);
1✔
511
        assert!(peer_storage.find_by_public_key(&peer1.public_key).is_ok());
1✔
512
        assert!(peer_storage.find_by_public_key(&peer2.public_key).is_ok());
1✔
513
        assert!(peer_storage.find_by_public_key(&peer3.public_key).is_ok());
1✔
514
    }
1✔
515

516
    #[allow(clippy::too_many_lines)]
517
    #[test]
518
    fn test_add_delete_find_peer() {
1✔
519
        let peer_storage = get_peer_storage_sql_test_db().unwrap();
1✔
520

521
        // Create Peers
522
        let mut rng = rand::rngs::OsRng;
1✔
523
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
1✔
524
        let node_id = NodeId::from_key(&pk);
1✔
525
        let net_address1 = "/ip4/1.2.3.4/tcp/8000".parse::<Multiaddr>().unwrap();
1✔
526
        let net_address2 = "/ip4/5.6.7.8/tcp/8000".parse::<Multiaddr>().unwrap();
1✔
527
        let net_address3 = "/ip4/5.6.7.8/tcp/7000".parse::<Multiaddr>().unwrap();
1✔
528
        let mut net_addresses =
1✔
529
            MultiaddressesWithStats::from_addresses_with_source(vec![net_address1], &PeerAddressSource::Config);
1✔
530
        net_addresses.add_address(&net_address2, &PeerAddressSource::Config);
1✔
531
        net_addresses.add_address(&net_address3, &PeerAddressSource::Config);
1✔
532
        let peer1 = Peer::new(
1✔
533
            pk,
1✔
534
            node_id,
1✔
535
            net_addresses,
1✔
536
            PeerFlags::default(),
1✔
537
            PeerFeatures::empty(),
1✔
538
            Default::default(),
1✔
539
            Default::default(),
1✔
540
        );
541

542
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
1✔
543
        let node_id = NodeId::from_key(&pk);
1✔
544
        let net_address4 = "/ip4/9.10.11.12/tcp/7000".parse::<Multiaddr>().unwrap();
1✔
545
        let net_addresses =
1✔
546
            MultiaddressesWithStats::from_addresses_with_source(vec![net_address4], &PeerAddressSource::Config);
1✔
547
        let peer2: Peer = Peer::new(
1✔
548
            pk,
1✔
549
            node_id,
1✔
550
            net_addresses,
1✔
551
            PeerFlags::default(),
1✔
552
            PeerFeatures::empty(),
1✔
553
            Default::default(),
1✔
554
            Default::default(),
1✔
555
        );
556

557
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
1✔
558
        let node_id = NodeId::from_key(&pk);
1✔
559
        let net_address5 = "/ip4/13.14.15.16/tcp/6000".parse::<Multiaddr>().unwrap();
1✔
560
        let net_address6 = "/ip4/17.18.19.20/tcp/8000".parse::<Multiaddr>().unwrap();
1✔
561
        let mut net_addresses =
1✔
562
            MultiaddressesWithStats::from_addresses_with_source(vec![net_address5], &PeerAddressSource::Config);
1✔
563
        net_addresses.add_address(&net_address6, &PeerAddressSource::Config);
1✔
564
        let peer3 = Peer::new(
1✔
565
            pk,
1✔
566
            node_id,
1✔
567
            net_addresses,
1✔
568
            PeerFlags::default(),
1✔
569
            PeerFeatures::empty(),
1✔
570
            Default::default(),
1✔
571
            Default::default(),
1✔
572
        );
573
        // Test adding and searching for peers
574
        peer_storage.add_or_update_peer(peer1.clone()).unwrap(); // assert!(peer_storage.add_or_update_peer(peer1.clone()).is_ok());
1✔
575
        assert!(peer_storage.add_or_update_peer(peer2.clone()).is_ok());
1✔
576
        assert!(peer_storage.add_or_update_peer(peer3.clone()).is_ok());
1✔
577

578
        assert_eq!(peer_storage.peer_db.size(), 3);
1✔
579

580
        assert_eq!(
1✔
581
            peer_storage
1✔
582
                .find_by_public_key(&peer1.public_key)
1✔
583
                .unwrap()
1✔
584
                .unwrap()
1✔
585
                .public_key,
586
            peer1.public_key
587
        );
588
        assert_eq!(
1✔
589
            peer_storage
1✔
590
                .find_by_public_key(&peer2.public_key)
1✔
591
                .unwrap()
1✔
592
                .unwrap()
1✔
593
                .public_key,
594
            peer2.public_key
595
        );
596
        assert_eq!(
1✔
597
            peer_storage
1✔
598
                .find_by_public_key(&peer3.public_key)
1✔
599
                .unwrap()
1✔
600
                .unwrap()
1✔
601
                .public_key,
602
            peer3.public_key
603
        );
604

605
        assert_eq!(
1✔
606
            peer_storage
1✔
607
                .get_peer_by_node_id(&peer1.node_id)
1✔
608
                .unwrap()
1✔
609
                .unwrap()
1✔
610
                .node_id,
611
            peer1.node_id
612
        );
613
        assert_eq!(
1✔
614
            peer_storage
1✔
615
                .get_peer_by_node_id(&peer2.node_id)
1✔
616
                .unwrap()
1✔
617
                .unwrap()
1✔
618
                .node_id,
619
            peer2.node_id
620
        );
621
        assert_eq!(
1✔
622
            peer_storage
1✔
623
                .get_peer_by_node_id(&peer3.node_id)
1✔
624
                .unwrap()
1✔
625
                .unwrap()
1✔
626
                .node_id,
627
            peer3.node_id
628
        );
629

630
        peer_storage.find_by_public_key(&peer1.public_key).unwrap().unwrap();
1✔
631
        peer_storage.find_by_public_key(&peer2.public_key).unwrap().unwrap();
1✔
632
        peer_storage.find_by_public_key(&peer3.public_key).unwrap().unwrap();
1✔
633

634
        // Test delete of border case peer
635
        assert!(peer_storage.soft_delete_peer(&peer3.node_id).is_ok());
1✔
636

637
        // It is a logical delete, so there should still be 3 peers in the db
638
        assert_eq!(peer_storage.peer_db.size(), 3);
1✔
639

640
        assert_eq!(
1✔
641
            peer_storage
1✔
642
                .find_by_public_key(&peer1.public_key)
1✔
643
                .unwrap()
1✔
644
                .unwrap()
1✔
645
                .public_key,
646
            peer1.public_key
647
        );
648
        assert_eq!(
1✔
649
            peer_storage
1✔
650
                .find_by_public_key(&peer2.public_key)
1✔
651
                .unwrap()
1✔
652
                .unwrap()
1✔
653
                .public_key,
654
            peer2.public_key
655
        );
656
        assert!(peer_storage
1✔
657
            .find_by_public_key(&peer3.public_key)
1✔
658
            .unwrap()
1✔
659
            .unwrap()
1✔
660
            .deleted_at
1✔
661
            .is_some());
1✔
662

663
        assert_eq!(
1✔
664
            peer_storage
1✔
665
                .get_peer_by_node_id(&peer1.node_id)
1✔
666
                .unwrap()
1✔
667
                .unwrap()
1✔
668
                .node_id,
669
            peer1.node_id
670
        );
671
        assert_eq!(
1✔
672
            peer_storage
1✔
673
                .get_peer_by_node_id(&peer2.node_id)
1✔
674
                .unwrap()
1✔
675
                .unwrap()
1✔
676
                .node_id,
677
            peer2.node_id
678
        );
679
        assert!(peer_storage
1✔
680
            .get_peer_by_node_id(&peer3.node_id)
1✔
681
            .unwrap()
1✔
682
            .unwrap()
1✔
683
            .deleted_at
1✔
684
            .is_some());
1✔
685
    }
1✔
686

687
    fn create_test_peer(features: PeerFeatures, ban: bool) -> Peer {
30✔
688
        let mut rng = rand::rngs::OsRng;
30✔
689

690
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
30✔
691
        let node_id = NodeId::from_key(&pk);
30✔
692

693
        let mut net_addresses = MultiaddressesWithStats::from_addresses_with_source(vec![], &PeerAddressSource::Config);
30✔
694

695
        // Create 1 to 4 random addresses
696
        for _i in 1..=rand::thread_rng().gen_range(1..4) {
69✔
697
            let n = [
69✔
698
                rand::thread_rng().gen_range(1..255),
69✔
699
                rand::thread_rng().gen_range(1..255),
69✔
700
                rand::thread_rng().gen_range(1..255),
69✔
701
                rand::thread_rng().gen_range(1..255),
69✔
702
                rand::thread_rng().gen_range(5000..9000),
69✔
703
            ];
69✔
704
            let net_address = format!("/ip4/{}.{}.{}.{}/tcp/{}", n[0], n[1], n[2], n[3], n[4])
69✔
705
                .parse::<Multiaddr>()
69✔
706
                .unwrap();
69✔
707
            net_addresses.add_address(&net_address, &PeerAddressSource::Config);
69✔
708
        }
69✔
709

710
        let mut peer = Peer::new(
30✔
711
            pk,
30✔
712
            node_id,
30✔
713
            net_addresses,
30✔
714
            PeerFlags::default(),
30✔
715
            features,
30✔
716
            Default::default(),
30✔
717
            Default::default(),
30✔
718
        );
719
        if ban {
30✔
720
            peer.ban_for(Duration::from_secs(600), "".to_string());
1✔
721
        }
29✔
722
        peer
30✔
723
    }
30✔
724

725
    #[test]
726
    fn test_in_network_region() {
1✔
727
        let peer_storage = get_peer_storage_sql_test_db().unwrap();
1✔
728

729
        let mut nodes = repeat_with(|| create_test_peer(PeerFeatures::COMMUNICATION_NODE, false))
5✔
730
            .take(5)
1✔
731
            .chain(repeat_with(|| create_test_peer(PeerFeatures::COMMUNICATION_CLIENT, false)).take(4))
4✔
732
            .collect::<Vec<_>>();
1✔
733

734
        for p in &nodes {
10✔
735
            peer_storage.add_or_update_peer(p.clone()).unwrap();
9✔
736
        }
9✔
737

738
        let main_peer_node_id = peer_storage.this_peer_identity().node_id;
1✔
739

740
        nodes.sort_by(|a, b| {
26✔
741
            a.node_id
26✔
742
                .distance(&main_peer_node_id)
26✔
743
                .cmp(&b.node_id.distance(&main_peer_node_id))
26✔
744
        });
26✔
745

746
        let db_nodes = peer_storage.peer_db.get_all_peers(None).unwrap();
1✔
747
        assert_eq!(db_nodes.len(), 9);
1✔
748

749
        let close_node = &nodes.first().unwrap().node_id;
1✔
750
        let far_node = &nodes.last().unwrap().node_id;
1✔
751

752
        let is_in_region = peer_storage.in_network_region(&main_peer_node_id, 1).unwrap();
1✔
753
        assert!(is_in_region);
1✔
754

755
        let is_in_region = peer_storage.in_network_region(close_node, 1).unwrap();
1✔
756
        assert!(is_in_region);
1✔
757

758
        let is_in_region = peer_storage.in_network_region(far_node, 9).unwrap();
1✔
759
        assert!(is_in_region);
1✔
760

761
        let is_in_region = peer_storage.in_network_region(far_node, 3).unwrap();
1✔
762
        assert!(!is_in_region);
1✔
763
    }
1✔
764

765
    #[test]
766
    fn get_just_seeds() {
1✔
767
        let peer_storage = get_peer_storage_sql_test_db().unwrap();
1✔
768

769
        let seeds = repeat_with(|| {
5✔
770
            let mut peer = create_test_peer(PeerFeatures::COMMUNICATION_NODE, false);
5✔
771
            peer.add_flags(PeerFlags::SEED);
5✔
772
            peer
5✔
773
        })
5✔
774
        .take(5)
1✔
775
        .collect::<Vec<_>>();
1✔
776

777
        for p in &seeds {
6✔
778
            peer_storage.add_or_update_peer(p.clone()).unwrap();
5✔
779
        }
5✔
780

781
        let nodes = repeat_with(|| create_test_peer(PeerFeatures::COMMUNICATION_NODE, false))
5✔
782
            .take(5)
1✔
783
            .collect::<Vec<_>>();
1✔
784

785
        for p in &nodes {
6✔
786
            peer_storage.add_or_update_peer(p.clone()).unwrap();
5✔
787
        }
5✔
788
        let retrieved_seeds = peer_storage.get_seed_peers().unwrap();
1✔
789
        assert_eq!(retrieved_seeds.len(), seeds.len());
1✔
790
        for seed in seeds {
6✔
791
            assert!(retrieved_seeds.iter().any(|p| p.node_id == seed.node_id));
15✔
792
        }
793
    }
1✔
794

795
    #[test]
796
    fn discovery_syncing_returns_correct_peers() {
1✔
797
        let peer_storage = get_peer_storage_sql_test_db().unwrap();
1✔
798

799
        // Threshold duration + a minute
800
        #[allow(clippy::cast_possible_wrap)] // Won't wrap around, numbers are static
801
        let above_the_threshold = Utc::now().timestamp() - (STALE_PEER_THRESHOLD_DURATION.as_secs() + 60) as i64;
1✔
802

803
        let never_seen_peer = create_test_peer(PeerFeatures::COMMUNICATION_NODE, false);
1✔
804
        let banned_peer = create_test_peer(PeerFeatures::COMMUNICATION_NODE, true);
1✔
805

806
        let mut not_active_peer = create_test_peer(PeerFeatures::COMMUNICATION_NODE, false);
1✔
807
        let address = not_active_peer.addresses.best().unwrap();
1✔
808
        let mut address = MultiaddrWithStats::new(address.address().clone(), PeerAddressSource::Config);
1✔
809
        address.mark_last_attempted(DateTime::from_timestamp(above_the_threshold, 0).unwrap().naive_utc());
1✔
810
        not_active_peer
1✔
811
            .addresses
1✔
812
            .merge(&MultiaddressesWithStats::from(vec![address]));
1✔
813

814
        let mut good_peer = create_test_peer(PeerFeatures::COMMUNICATION_NODE, false);
1✔
815
        let good_addresses = good_peer.addresses.borrow_mut();
1✔
816
        let good_address = good_addresses.addresses()[0].address().clone();
1✔
817
        good_addresses.mark_last_seen_now(&good_address);
1✔
818

819
        let mut good_seed = create_test_peer(PeerFeatures::COMMUNICATION_NODE, false);
1✔
820
        good_seed.flags = PeerFlags::SEED;
1✔
821
        let good_addresses = good_seed.addresses.borrow_mut();
1✔
822
        let good_address = good_addresses.addresses()[0].address().clone();
1✔
823
        good_addresses.mark_last_seen_now(&good_address);
1✔
824

825
        assert!(peer_storage.add_or_update_peer(never_seen_peer).is_ok());
1✔
826
        assert!(peer_storage.add_or_update_peer(not_active_peer).is_ok());
1✔
827
        assert!(peer_storage.add_or_update_peer(banned_peer).is_ok());
1✔
828
        assert!(peer_storage.add_or_update_peer(good_peer).is_ok());
1✔
829
        assert!(peer_storage.add_or_update_peer(good_seed.clone()).is_ok());
1✔
830

831
        assert_eq!(peer_storage.all(None).unwrap().len(), 5);
1✔
832
        assert_eq!(
1✔
833
            peer_storage
1✔
834
                .discovery_syncing(100, &[good_seed.node_id], Some(PeerFeatures::COMMUNICATION_NODE), false,)
1✔
835
                .unwrap()
1✔
836
                .len(),
1✔
837
            1
838
        );
839
        assert_eq!(
1✔
840
            peer_storage
1✔
841
                .discovery_syncing(100, &[], Some(PeerFeatures::COMMUNICATION_NODE), false)
1✔
842
                .unwrap()
1✔
843
                .len(),
1✔
844
            2
845
        );
846
    }
1✔
847

848
    #[test]
849
    fn discovery_syncing_peers_with_external_addresses_only() {
1✔
850
        let peer_storage = get_peer_storage_sql_test_db().unwrap();
1✔
851
        let nodes = repeat_with(|| create_test_peer_add_internal_addresses(false, PeerFeatures::COMMUNICATION_NODE))
5✔
852
            .take(5)
1✔
853
            .collect::<Vec<_>>();
1✔
854
        let wallets =
1✔
855
            repeat_with(|| create_test_peer_add_internal_addresses(false, PeerFeatures::COMMUNICATION_CLIENT))
5✔
856
                .take(5)
1✔
857
                .collect::<Vec<_>>();
1✔
858
        for peer in nodes.iter().chain(wallets.iter()) {
10✔
859
            peer_storage.add_or_update_peer(peer.clone()).unwrap();
10✔
860
        }
10✔
861

862
        // Assert that peers have internal and external addresses
863
        let nodes_all_addresses = peer_storage
1✔
864
            .discovery_syncing(100, &[], Some(PeerFeatures::COMMUNICATION_NODE), false)
1✔
865
            .unwrap();
1✔
866
        assert!(nodes_all_addresses
1✔
867
            .iter()
1✔
868
            .all(|p| { p.addresses.addresses().iter().any(|addr| addr.is_external()) }));
5✔
869
        assert!(nodes_all_addresses
1✔
870
            .iter()
1✔
871
            .all(|p| { p.addresses.addresses().iter().any(|addr| !addr.is_external()) }));
19✔
872

873
        // Assert that peers have external addresses only
874
        let nodes_external_addresses_only = peer_storage
1✔
875
            .discovery_syncing(100, &[], Some(PeerFeatures::COMMUNICATION_NODE), true)
1✔
876
            .unwrap();
1✔
877
        assert!(nodes_external_addresses_only
1✔
878
            .iter()
1✔
879
            .all(|p| { p.addresses.addresses().iter().all(|addr| addr.is_external()) }));
14✔
880
    }
1✔
881
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc