• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 24185883744

09 Apr 2026 10:35AM UTC coverage: 61.162% (-0.02%) from 61.184%
24185883744

push

github

SWvheerden
chore: new release v5.3.0-pre.5

70629 of 115478 relevant lines covered (61.16%)

224996.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.85
/comms/core/src/peer_manager/peer_storage_sql.rs
1
//  Copyright 2019 The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{cmp::min, time::Duration};
24

25
use log::*;
26
use multiaddr::Multiaddr;
27

28
use crate::{
29
    net_address::PeerAddressSource,
30
    peer_manager::{
31
        NodeId,
32
        PeerFeatures,
33
        PeerFlags,
34
        PeerManagerError,
35
        database::{PeerDatabaseSql, ThisPeerIdentity},
36
        peer::Peer,
37
        peer_id::PeerId,
38
    },
39
    types::{CommsDatabase, CommsPublicKey, TransportProtocol},
40
};
41

42
const LOG_TARGET: &str = "comms::peer_manager::peer_storage_sql";
43
// The maximum number of peers to return in peer manager
44
const PEER_MANAGER_SYNC_PEERS: usize = 100;
45
// The maximum amount of time a peer can be inactive before being considered stale:
46
// ((5 days, 24h, 60m, 60s)/2 = 2.5 days)
47
pub const STALE_PEER_THRESHOLD_DURATION: Duration = Duration::from_secs(5 * 24 * 60 * 60 / 2);
48

49
/// PeerStorageSql provides a mechanism to keep a datastore and a local copy of all peers in sync and allow fast
50
/// searches using the node_id, public key or net_address of a peer.
51
#[derive(Clone)]
52
pub struct PeerStorageSql {
53
    peer_db: PeerDatabaseSql,
54
}
55

56
impl PeerStorageSql {
57
    /// Constructs a new PeerStorageSql, with indexes populated from the given datastore
58
    pub fn new_indexed(database: PeerDatabaseSql) -> Result<PeerStorageSql, PeerManagerError> {
212✔
59
        trace!(
212✔
60
            target: LOG_TARGET,
×
61
            "Peer storage is initialized. {} total entries.",
62
            database.size(),
×
63
        );
64

65
        Ok(PeerStorageSql { peer_db: database })
212✔
66
    }
212✔
67

68
    /// Get this peer's identity
69
    pub fn this_peer_identity(&self) -> ThisPeerIdentity {
×
70
        self.peer_db.this_peer_identity()
×
71
    }
×
72

73
    /// Get the size of the database
74
    pub fn count(&self) -> usize {
12,340✔
75
        self.peer_db.size()
12,340✔
76
    }
12,340✔
77

78
    /// Adds or updates a peer and sets the last connection as successful.
79
    /// If the peer is marked as offline, it will be unmarked.
80
    pub fn add_or_update_peer(&self, peer: Peer) -> Result<PeerId, PeerManagerError> {
10,859✔
81
        Ok(self.peer_db.add_or_update_peer(peer)?)
10,859✔
82
    }
10,859✔
83

84
    /// Adds a peer an online peer if the peer does not already exist. When a peer already
85
    /// exists, the stored version will be replaced with the newly provided peer.
86
    pub fn add_or_update_online_peer(
1✔
87
        &self,
1✔
88
        pubkey: &CommsPublicKey,
1✔
89
        node_id: &NodeId,
1✔
90
        addresses: &[Multiaddr],
1✔
91
        peer_features: &PeerFeatures,
1✔
92
        source: &PeerAddressSource,
1✔
93
    ) -> Result<Peer, PeerManagerError> {
1✔
94
        Ok(self
1✔
95
            .peer_db
1✔
96
            .add_or_update_online_peer(pubkey, node_id, addresses, peer_features, source)?)
1✔
97
    }
1✔
98

99
    /// The peer with the specified node id will be soft deleted (marked as deleted)
100
    pub fn soft_delete_peer(&self, node_id: &NodeId) -> Result<(), PeerManagerError> {
1✔
101
        self.peer_db.soft_delete_peer(node_id)?;
1✔
102
        Ok(())
1✔
103
    }
1✔
104

105
    /// Find the peer with the provided NodeID
106
    pub fn get_peer_by_node_id(&self, node_id: &NodeId) -> Result<Option<Peer>, PeerManagerError> {
22,412✔
107
        Ok(self.peer_db.get_peer_by_node_id(node_id)?)
22,412✔
108
    }
22,412✔
109

110
    /// Get all peers based on a list of their node_ids
111
    pub fn get_peers_by_node_ids(&self, node_ids: &[NodeId]) -> Result<Vec<Peer>, PeerManagerError> {
1✔
112
        Ok(self.peer_db.get_peers_by_node_ids(node_ids)?)
1✔
113
    }
1✔
114

115
    /// Get all peers based on a list of their node_ids
116
    pub fn get_peer_public_keys_by_node_ids(
×
117
        &self,
×
118
        node_ids: &[NodeId],
×
119
    ) -> Result<Vec<CommsPublicKey>, PeerManagerError> {
×
120
        Ok(self.peer_db.get_peer_public_keys_by_node_ids(node_ids)?)
×
121
    }
×
122

123
    /// Get all banned peers
124
    pub fn get_banned_peers(&self) -> Result<Vec<Peer>, PeerManagerError> {
×
125
        Ok(self.peer_db.get_banned_peers()?)
×
126
    }
×
127

128
    pub fn find_all_starts_with(&self, partial: &[u8]) -> Result<Vec<Peer>, PeerManagerError> {
×
129
        Ok(self.peer_db.find_all_peers_match_partial_key(partial)?)
×
130
    }
×
131

132
    /// Find the peer with the provided PublicKey
133
    pub fn find_by_public_key(&self, public_key: &CommsPublicKey) -> Result<Option<Peer>, PeerManagerError> {
7,349✔
134
        Ok(self.peer_db.get_peer_by_public_key(public_key)?)
7,349✔
135
    }
7,349✔
136

137
    /// Check if a peer exist using the specified public_key
138
    pub fn exists_public_key(&self, public_key: &CommsPublicKey) -> Result<bool, PeerManagerError> {
17✔
139
        if let Ok(val) = self.peer_db.peer_exists_by_public_key(public_key) {
17✔
140
            Ok(val.is_some())
17✔
141
        } else {
142
            Ok(false)
×
143
        }
144
    }
17✔
145

146
    /// Check if a peer exist using the specified node_id
147
    pub fn exists_node_id(&self, node_id: &NodeId) -> Result<bool, PeerManagerError> {
×
148
        if let Ok(val) = self.peer_db.peer_exists_by_node_id(node_id) {
×
149
            Ok(val.is_some())
×
150
        } else {
151
            Ok(false)
×
152
        }
153
    }
×
154

155
    /// Return the peer by corresponding to the provided NodeId if it is not banned
156
    pub fn direct_identity_node_id(&self, node_id: &NodeId) -> Result<Peer, PeerManagerError> {
7,249✔
157
        let peer = self
7,249✔
158
            .get_peer_by_node_id(node_id)?
7,249✔
159
            .ok_or(PeerManagerError::peer_not_found(node_id))?;
7,249✔
160

161
        if peer.is_banned() {
7,248✔
162
            Err(PeerManagerError::BannedPeer)
×
163
        } else {
164
            Ok(peer)
7,248✔
165
        }
166
    }
7,249✔
167

168
    /// Return the peer by corresponding to the provided public key if it is not banned
169
    pub fn direct_identity_public_key(&self, public_key: &CommsPublicKey) -> Result<Peer, PeerManagerError> {
7,244✔
170
        let peer = self
7,244✔
171
            .find_by_public_key(public_key)?
7,244✔
172
            .ok_or(PeerManagerError::peer_not_found(&NodeId::from_public_key(public_key)))?;
7,244✔
173

174
        if peer.is_banned() {
7,244✔
175
            Err(PeerManagerError::BannedPeer)
×
176
        } else {
177
            Ok(peer)
7,244✔
178
        }
179
    }
7,244✔
180

181
    /// Return all peers, optionally filtering on supplied feature
182
    pub fn all(&self, features: Option<PeerFeatures>) -> Result<Vec<Peer>, PeerManagerError> {
1✔
183
        Ok(self.peer_db.get_all_peers(features)?)
1✔
184
    }
1✔
185

186
    /// Return "good" peers for syncing
187
    /// Criteria:
188
    ///  - Peer is not banned
189
    ///  - Peer has been seen within a defined time span (within the threshold)
190
    ///  - Only returns a maximum number of syncable peers (corresponds with the max possible number of requestable
191
    ///    peers to sync)
192
    ///  - Uses 0 as max PEER_MANAGER_SYNC_PEERS
193
    ///  - Peers has an address that is reachable - with supported transport protocols
194
    pub fn discovery_syncing(
8✔
195
        &self,
8✔
196
        mut n: usize,
8✔
197
        excluded_peers: &[NodeId],
8✔
198
        features: Option<PeerFeatures>,
8✔
199
        external_addresses_only: bool,
8✔
200
    ) -> Result<Vec<Peer>, PeerManagerError> {
8✔
201
        if n == 0 {
8✔
202
            n = PEER_MANAGER_SYNC_PEERS;
×
203
        } else {
8✔
204
            n = min(n, PEER_MANAGER_SYNC_PEERS);
8✔
205
        }
8✔
206

207
        Ok(self.peer_db.get_n_random_active_peers(
8✔
208
            n,
8✔
209
            excluded_peers,
8✔
210
            features,
8✔
211
            None,
8✔
212
            Some(STALE_PEER_THRESHOLD_DURATION),
8✔
213
            external_addresses_only,
8✔
214
            &[],
8✔
215
        )?)
×
216
    }
8✔
217

218
    /// Compile a list of all known peers
219
    pub fn get_not_banned_or_deleted_peers(&self) -> Result<Vec<Peer>, PeerManagerError> {
1✔
220
        Ok(self
1✔
221
            .peer_db
1✔
222
            .get_n_not_banned_or_deleted_peers(PEER_MANAGER_SYNC_PEERS)?)
1✔
223
    }
1✔
224

225
    /// Get available dial candidates that are communication nodes, not banned, not deleted, reachable,
226
    /// optionally not failed, optionally at random, and not in the excluded node IDs list
227
    pub fn get_available_dial_candidates(
94✔
228
        &self,
94✔
229
        exclude_node_ids: &[NodeId],
94✔
230
        limit: Option<usize>,
94✔
231
        transport_protocols: &[TransportProtocol],
94✔
232
        exclude_failed: bool,
94✔
233
        randomize: bool,
94✔
234
    ) -> Result<Vec<Peer>, PeerManagerError> {
94✔
235
        Ok(self.peer_db.get_available_dial_candidates(
94✔
236
            exclude_node_ids,
94✔
237
            limit,
94✔
238
            transport_protocols,
94✔
239
            exclude_failed,
94✔
240
            randomize,
94✔
241
        )?)
×
242
    }
94✔
243

244
    /// Get all seed peers
245
    pub fn get_seed_peers(&self) -> Result<Vec<Peer>, PeerManagerError> {
148✔
246
        let seed_peers = self.peer_db.get_seed_peers()?;
148✔
247
        trace!(
148✔
248
            target: LOG_TARGET,
×
249
            "Get seed peers: {:?}",
250
            seed_peers.iter().map(|p| p.node_id.short_str()).collect::<Vec<_>>(),
×
251
        );
252
        Ok(seed_peers)
148✔
253
    }
148✔
254

255
    /// Compile a random list of communication node peers of size _n_ that are not banned or offline and
256
    /// external addresses support protocols defined in the `transport_protocols` vector.
257
    pub fn random_peers(
1,693✔
258
        &self,
1,693✔
259
        n: usize,
1,693✔
260
        exclude_peers: &[NodeId],
1,693✔
261
        flags: Option<PeerFlags>,
1,693✔
262
        transport_protocols: &[TransportProtocol],
1,693✔
263
        known_good: bool,
1,693✔
264
    ) -> Result<Vec<Peer>, PeerManagerError> {
1,693✔
265
        Ok(self
1,693✔
266
            .peer_db
1,693✔
267
            .get_n_random_peers(n, exclude_peers, flags, transport_protocols, known_good)?)
1,693✔
268
    }
1,693✔
269

270
    /// Unban the peer
271
    pub fn unban_peer(&self, node_id: &NodeId) -> Result<(), PeerManagerError> {
×
272
        let _node_id = self.peer_db.reset_banned(node_id)?;
×
273
        Ok(())
×
274
    }
×
275

276
    /// Unban the peer
277
    pub fn unban_all_peers(&self) -> Result<usize, PeerManagerError> {
×
278
        let number_unbanned = self.peer_db.reset_all_banned()?;
×
279
        Ok(number_unbanned)
×
280
    }
×
281

282
    pub fn reset_offline_non_wallet_peers(&self) -> Result<usize, PeerManagerError> {
×
283
        let number_offline = self.peer_db.reset_offline_non_wallet_peers()?;
×
284
        Ok(number_offline)
×
285
    }
×
286

287
    /// Ban the peer for the given duration
288
    pub fn ban_peer(
×
289
        &self,
×
290
        public_key: &CommsPublicKey,
×
291
        duration: Duration,
×
292
        reason: String,
×
293
    ) -> Result<NodeId, PeerManagerError> {
×
294
        let node_id = NodeId::from_key(public_key);
×
295
        self.peer_db
×
296
            .set_banned(&node_id, duration, reason)?
×
297
            .ok_or(PeerManagerError::peer_not_found(&NodeId::from_public_key(public_key)))
×
298
    }
×
299

300
    /// Ban the peer for the given duration
301
    pub fn ban_peer_by_node_id(
11✔
302
        &self,
11✔
303
        node_id: &NodeId,
11✔
304
        duration: Duration,
11✔
305
        reason: String,
11✔
306
    ) -> Result<NodeId, PeerManagerError> {
11✔
307
        self.peer_db
11✔
308
            .set_banned(node_id, duration, reason)?
11✔
309
            .ok_or(PeerManagerError::peer_not_found(node_id))
11✔
310
    }
11✔
311

312
    pub fn is_peer_banned(&self, node_id: &NodeId) -> Result<bool, PeerManagerError> {
458✔
313
        let peer = self
458✔
314
            .get_peer_by_node_id(node_id)?
458✔
315
            .ok_or(PeerManagerError::peer_not_found(node_id))?;
458✔
316
        Ok(peer.is_banned())
458✔
317
    }
458✔
318

319
    /// This will store metadata inside of the metadata field in the peer provided by the nodeID.
320
    /// It will return None if the value was empty and the old value if the value was updated
321
    pub fn set_peer_metadata(
237✔
322
        &self,
237✔
323
        node_id: &NodeId,
237✔
324
        key: u8,
237✔
325
        data: Vec<u8>,
237✔
326
    ) -> Result<Option<Vec<u8>>, PeerManagerError> {
237✔
327
        Ok(self.peer_db.set_metadata(node_id, key, data)?)
237✔
328
    }
237✔
329
}
330

331
#[allow(clippy::from_over_into)]
332
impl Into<CommsDatabase> for PeerStorageSql {
333
    fn into(self) -> CommsDatabase {
×
334
        self.peer_db
×
335
    }
×
336
}
337

338
#[cfg(test)]
339
mod test {
340
    #![allow(clippy::indexing_slicing)]
341
    use std::{borrow::BorrowMut, iter::repeat_with};
342

343
    use chrono::{DateTime, Utc};
344
    use multiaddr::Multiaddr;
345
    use rand::Rng;
346
    use tari_common_sqlite::connection::DbConnection;
347

348
    use super::*;
349
    use crate::{
350
        net_address::{MultiaddrWithStats, MultiaddressesWithStats, PeerAddressSource},
351
        peer_manager::{create_test_peer_add_internal_addresses, database::MIGRATIONS, peer::PeerFlags},
352
    };
353

354
    fn get_peer_db_sql_test_db() -> Result<PeerDatabaseSql, PeerManagerError> {
5✔
355
        let db_connection = DbConnection::connect_temp_file_and_migrate(MIGRATIONS).unwrap();
5✔
356
        Ok(PeerDatabaseSql::new(
5✔
357
            db_connection,
5✔
358
            &create_test_peer(PeerFeatures::COMMUNICATION_NODE, false),
5✔
359
        )?)
×
360
    }
5✔
361

362
    fn get_peer_storage_sql_test_db() -> Result<PeerStorageSql, PeerManagerError> {
4✔
363
        PeerStorageSql::new_indexed(get_peer_db_sql_test_db()?)
4✔
364
    }
4✔
365

366
    #[test]
367
    fn test_restore() {
1✔
368
        // Create Peers
369
        let mut rng = rand::rngs::OsRng;
1✔
370
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
1✔
371
        let node_id = NodeId::from_key(&pk);
1✔
372
        let net_address1 = "/ip4/1.2.3.4/tcp/8000".parse::<Multiaddr>().unwrap();
1✔
373
        let net_address2 = "/ip4/5.6.7.8/tcp/8000".parse::<Multiaddr>().unwrap();
1✔
374
        let net_address3 = "/ip4/5.6.7.8/tcp/7000".parse::<Multiaddr>().unwrap();
1✔
375
        let mut net_addresses =
1✔
376
            MultiaddressesWithStats::from_addresses_with_source(vec![net_address1], &PeerAddressSource::Config);
1✔
377
        net_addresses.add_or_update_addresses(&[net_address2], &PeerAddressSource::Config);
1✔
378
        net_addresses.add_or_update_addresses(&[net_address3], &PeerAddressSource::Config);
1✔
379
        let peer1 = Peer::new(
1✔
380
            pk,
1✔
381
            node_id,
1✔
382
            net_addresses,
1✔
383
            PeerFlags::default(),
1✔
384
            PeerFeatures::empty(),
1✔
385
            Default::default(),
1✔
386
            Default::default(),
1✔
387
        );
388

389
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
1✔
390
        let node_id = NodeId::from_key(&pk);
1✔
391
        let net_address4 = "/ip4/9.10.11.12/tcp/7000".parse::<Multiaddr>().unwrap();
1✔
392
        let net_addresses =
1✔
393
            MultiaddressesWithStats::from_addresses_with_source(vec![net_address4], &PeerAddressSource::Config);
1✔
394
        let peer2: Peer = Peer::new(
1✔
395
            pk,
1✔
396
            node_id,
1✔
397
            net_addresses,
1✔
398
            PeerFlags::default(),
1✔
399
            PeerFeatures::empty(),
1✔
400
            Default::default(),
1✔
401
            Default::default(),
1✔
402
        );
403

404
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
1✔
405
        let node_id = NodeId::from_key(&pk);
1✔
406
        let net_address5 = "/ip4/13.14.15.16/tcp/6000".parse::<Multiaddr>().unwrap();
1✔
407
        let net_address6 = "/ip4/17.18.19.20/tcp/8000".parse::<Multiaddr>().unwrap();
1✔
408
        let mut net_addresses =
1✔
409
            MultiaddressesWithStats::from_addresses_with_source(vec![net_address5], &PeerAddressSource::Config);
1✔
410
        net_addresses.add_or_update_addresses(&[net_address6], &PeerAddressSource::Config);
1✔
411
        let peer3 = Peer::new(
1✔
412
            pk,
1✔
413
            node_id,
1✔
414
            net_addresses,
1✔
415
            PeerFlags::default(),
1✔
416
            PeerFeatures::empty(),
1✔
417
            Default::default(),
1✔
418
            Default::default(),
1✔
419
        );
420

421
        // Create new datastore with a peer database
422
        let mut db = Some(get_peer_db_sql_test_db().unwrap());
1✔
423
        {
424
            let peer_storage = db.take().unwrap();
1✔
425

426
            // Test adding and searching for peers
427
            assert!(peer_storage.add_or_update_peer(peer1.clone()).is_ok());
1✔
428
            assert!(peer_storage.add_or_update_peer(peer2.clone()).is_ok());
1✔
429
            assert!(peer_storage.add_or_update_peer(peer3.clone()).is_ok());
1✔
430

431
            assert_eq!(peer_storage.size(), 3);
1✔
432
            assert!(peer_storage.get_peer_by_public_key(&peer1.public_key).is_ok());
1✔
433
            assert!(peer_storage.get_peer_by_public_key(&peer2.public_key).is_ok());
1✔
434
            assert!(peer_storage.get_peer_by_public_key(&peer3.public_key).is_ok());
1✔
435
            db = Some(peer_storage);
1✔
436
        }
437
        // Restore from existing database
438
        let peer_storage = PeerStorageSql::new_indexed(db.take().unwrap()).unwrap();
1✔
439

440
        assert_eq!(peer_storage.peer_db.size(), 3);
1✔
441
        assert!(peer_storage.find_by_public_key(&peer1.public_key).is_ok());
1✔
442
        assert!(peer_storage.find_by_public_key(&peer2.public_key).is_ok());
1✔
443
        assert!(peer_storage.find_by_public_key(&peer3.public_key).is_ok());
1✔
444
    }
1✔
445

446
    #[allow(clippy::too_many_lines)]
447
    #[test]
448
    fn test_add_delete_find_peer() {
1✔
449
        let peer_storage = get_peer_storage_sql_test_db().unwrap();
1✔
450

451
        // Create Peers
452
        let mut rng = rand::rngs::OsRng;
1✔
453
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
1✔
454
        let node_id = NodeId::from_key(&pk);
1✔
455
        let net_address1 = "/ip4/1.2.3.4/tcp/8000".parse::<Multiaddr>().unwrap();
1✔
456
        let net_address2 = "/ip4/5.6.7.8/tcp/8000".parse::<Multiaddr>().unwrap();
1✔
457
        let net_address3 = "/ip4/5.6.7.8/tcp/7000".parse::<Multiaddr>().unwrap();
1✔
458
        let mut net_addresses =
1✔
459
            MultiaddressesWithStats::from_addresses_with_source(vec![net_address1], &PeerAddressSource::Config);
1✔
460
        net_addresses.add_or_update_addresses(&[net_address2], &PeerAddressSource::Config);
1✔
461
        net_addresses.add_or_update_addresses(&[net_address3], &PeerAddressSource::Config);
1✔
462
        let peer1 = Peer::new(
1✔
463
            pk,
1✔
464
            node_id,
1✔
465
            net_addresses,
1✔
466
            PeerFlags::default(),
1✔
467
            PeerFeatures::empty(),
1✔
468
            Default::default(),
1✔
469
            Default::default(),
1✔
470
        );
471

472
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
1✔
473
        let node_id = NodeId::from_key(&pk);
1✔
474
        let net_address4 = "/ip4/9.10.11.12/tcp/7000".parse::<Multiaddr>().unwrap();
1✔
475
        let net_addresses =
1✔
476
            MultiaddressesWithStats::from_addresses_with_source(vec![net_address4], &PeerAddressSource::Config);
1✔
477
        let peer2: Peer = Peer::new(
1✔
478
            pk,
1✔
479
            node_id,
1✔
480
            net_addresses,
1✔
481
            PeerFlags::default(),
1✔
482
            PeerFeatures::empty(),
1✔
483
            Default::default(),
1✔
484
            Default::default(),
1✔
485
        );
486

487
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
1✔
488
        let node_id = NodeId::from_key(&pk);
1✔
489
        let net_address5 = "/ip4/13.14.15.16/tcp/6000".parse::<Multiaddr>().unwrap();
1✔
490
        let net_address6 = "/ip4/17.18.19.20/tcp/8000".parse::<Multiaddr>().unwrap();
1✔
491
        let mut net_addresses =
1✔
492
            MultiaddressesWithStats::from_addresses_with_source(vec![net_address5], &PeerAddressSource::Config);
1✔
493
        net_addresses.add_or_update_addresses(&[net_address6], &PeerAddressSource::Config);
1✔
494
        let peer3 = Peer::new(
1✔
495
            pk,
1✔
496
            node_id,
1✔
497
            net_addresses,
1✔
498
            PeerFlags::default(),
1✔
499
            PeerFeatures::empty(),
1✔
500
            Default::default(),
1✔
501
            Default::default(),
1✔
502
        );
503
        // Test adding and searching for peers
504
        peer_storage.add_or_update_peer(peer1.clone()).unwrap(); // assert!(peer_storage.add_or_update_peer(peer1.clone()).is_ok());
1✔
505
        assert!(peer_storage.add_or_update_peer(peer2.clone()).is_ok());
1✔
506
        assert!(peer_storage.add_or_update_peer(peer3.clone()).is_ok());
1✔
507

508
        assert_eq!(peer_storage.peer_db.size(), 3);
1✔
509

510
        assert_eq!(
1✔
511
            peer_storage
1✔
512
                .find_by_public_key(&peer1.public_key)
1✔
513
                .unwrap()
1✔
514
                .unwrap()
1✔
515
                .public_key,
516
            peer1.public_key
517
        );
518
        assert_eq!(
1✔
519
            peer_storage
1✔
520
                .find_by_public_key(&peer2.public_key)
1✔
521
                .unwrap()
1✔
522
                .unwrap()
1✔
523
                .public_key,
524
            peer2.public_key
525
        );
526
        assert_eq!(
1✔
527
            peer_storage
1✔
528
                .find_by_public_key(&peer3.public_key)
1✔
529
                .unwrap()
1✔
530
                .unwrap()
1✔
531
                .public_key,
532
            peer3.public_key
533
        );
534

535
        assert_eq!(
1✔
536
            peer_storage
1✔
537
                .get_peer_by_node_id(&peer1.node_id)
1✔
538
                .unwrap()
1✔
539
                .unwrap()
1✔
540
                .node_id,
541
            peer1.node_id
542
        );
543
        assert_eq!(
1✔
544
            peer_storage
1✔
545
                .get_peer_by_node_id(&peer2.node_id)
1✔
546
                .unwrap()
1✔
547
                .unwrap()
1✔
548
                .node_id,
549
            peer2.node_id
550
        );
551
        assert_eq!(
1✔
552
            peer_storage
1✔
553
                .get_peer_by_node_id(&peer3.node_id)
1✔
554
                .unwrap()
1✔
555
                .unwrap()
1✔
556
                .node_id,
557
            peer3.node_id
558
        );
559

560
        peer_storage.find_by_public_key(&peer1.public_key).unwrap().unwrap();
1✔
561
        peer_storage.find_by_public_key(&peer2.public_key).unwrap().unwrap();
1✔
562
        peer_storage.find_by_public_key(&peer3.public_key).unwrap().unwrap();
1✔
563

564
        // Test delete of border case peer
565
        assert!(peer_storage.soft_delete_peer(&peer3.node_id).is_ok());
1✔
566

567
        // It is a logical delete, so there should still be 3 peers in the db
568
        assert_eq!(peer_storage.peer_db.size(), 3);
1✔
569

570
        assert_eq!(
1✔
571
            peer_storage
1✔
572
                .find_by_public_key(&peer1.public_key)
1✔
573
                .unwrap()
1✔
574
                .unwrap()
1✔
575
                .public_key,
576
            peer1.public_key
577
        );
578
        assert_eq!(
1✔
579
            peer_storage
1✔
580
                .find_by_public_key(&peer2.public_key)
1✔
581
                .unwrap()
1✔
582
                .unwrap()
1✔
583
                .public_key,
584
            peer2.public_key
585
        );
586
        assert!(
1✔
587
            peer_storage
1✔
588
                .find_by_public_key(&peer3.public_key)
1✔
589
                .unwrap()
1✔
590
                .unwrap()
1✔
591
                .deleted_at
1✔
592
                .is_some()
1✔
593
        );
594

595
        assert_eq!(
1✔
596
            peer_storage
1✔
597
                .get_peer_by_node_id(&peer1.node_id)
1✔
598
                .unwrap()
1✔
599
                .unwrap()
1✔
600
                .node_id,
601
            peer1.node_id
602
        );
603
        assert_eq!(
1✔
604
            peer_storage
1✔
605
                .get_peer_by_node_id(&peer2.node_id)
1✔
606
                .unwrap()
1✔
607
                .unwrap()
1✔
608
                .node_id,
609
            peer2.node_id
610
        );
611
        assert!(
1✔
612
            peer_storage
1✔
613
                .get_peer_by_node_id(&peer3.node_id)
1✔
614
                .unwrap()
1✔
615
                .unwrap()
1✔
616
                .deleted_at
1✔
617
                .is_some()
1✔
618
        );
619
    }
1✔
620

621
    fn create_test_peer(features: PeerFeatures, ban: bool) -> Peer {
20✔
622
        let mut rng = rand::rngs::OsRng;
20✔
623

624
        let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
20✔
625
        let node_id = NodeId::from_key(&pk);
20✔
626

627
        let mut net_addresses = MultiaddressesWithStats::from_addresses_with_source(vec![], &PeerAddressSource::Config);
20✔
628

629
        // Create 1 to 4 random addresses
630
        for _i in 1..=rand::thread_rng().gen_range(1..4) {
36✔
631
            let n = [
36✔
632
                rand::thread_rng().gen_range(1..255),
36✔
633
                rand::thread_rng().gen_range(1..255),
36✔
634
                rand::thread_rng().gen_range(1..255),
36✔
635
                rand::thread_rng().gen_range(1..255),
36✔
636
                rand::thread_rng().gen_range(5000..9000),
36✔
637
            ];
36✔
638
            let net_address = format!("/ip4/{}.{}.{}.{}/tcp/{}", n[0], n[1], n[2], n[3], n[4])
36✔
639
                .parse::<Multiaddr>()
36✔
640
                .unwrap();
36✔
641
            net_addresses.add_or_update_addresses(&[net_address], &PeerAddressSource::Config);
36✔
642
        }
36✔
643

644
        let mut peer = Peer::new(
20✔
645
            pk,
20✔
646
            node_id,
20✔
647
            net_addresses,
20✔
648
            PeerFlags::default(),
20✔
649
            features,
20✔
650
            Default::default(),
20✔
651
            Default::default(),
20✔
652
        );
653
        if ban {
20✔
654
            peer.ban_for(Duration::from_secs(600), "".to_string());
1✔
655
        }
19✔
656
        peer
20✔
657
    }
20✔
658

659
    #[test]
660
    fn get_just_seeds() {
1✔
661
        let peer_storage = get_peer_storage_sql_test_db().unwrap();
1✔
662

663
        let seeds = repeat_with(|| {
5✔
664
            let mut peer = create_test_peer(PeerFeatures::COMMUNICATION_NODE, false);
5✔
665
            peer.add_flags(PeerFlags::SEED);
5✔
666
            peer
5✔
667
        })
5✔
668
        .take(5)
1✔
669
        .collect::<Vec<_>>();
1✔
670

671
        for p in &seeds {
5✔
672
            peer_storage.add_or_update_peer(p.clone()).unwrap();
5✔
673
        }
5✔
674

675
        let nodes = repeat_with(|| create_test_peer(PeerFeatures::COMMUNICATION_NODE, false))
5✔
676
            .take(5)
1✔
677
            .collect::<Vec<_>>();
1✔
678

679
        for p in &nodes {
5✔
680
            peer_storage.add_or_update_peer(p.clone()).unwrap();
5✔
681
        }
5✔
682
        let retrieved_seeds = peer_storage.get_seed_peers().unwrap();
1✔
683
        assert_eq!(retrieved_seeds.len(), seeds.len());
1✔
684
        for seed in seeds {
5✔
685
            assert!(retrieved_seeds.iter().any(|p| p.node_id == seed.node_id));
15✔
686
        }
687
    }
1✔
688

689
    #[test]
690
    fn discovery_syncing_returns_correct_peers() {
1✔
691
        let peer_storage = get_peer_storage_sql_test_db().unwrap();
1✔
692

693
        // Threshold duration + a minute
694
        #[allow(clippy::cast_possible_wrap)] // Won't wrap around, numbers are static
695
        let above_the_threshold = Utc::now().timestamp() - (STALE_PEER_THRESHOLD_DURATION.as_secs() + 60) as i64;
1✔
696

697
        let never_seen_peer = create_test_peer(PeerFeatures::COMMUNICATION_NODE, false);
1✔
698
        let banned_peer = create_test_peer(PeerFeatures::COMMUNICATION_NODE, true);
1✔
699

700
        let mut not_active_peer = create_test_peer(PeerFeatures::COMMUNICATION_NODE, false);
1✔
701
        let address = not_active_peer.addresses.best().unwrap();
1✔
702
        let mut address = MultiaddrWithStats::new(address.address().clone(), PeerAddressSource::Config);
1✔
703
        address.mark_last_attempted(DateTime::from_timestamp(above_the_threshold, 0).unwrap().naive_utc());
1✔
704
        not_active_peer
1✔
705
            .addresses
1✔
706
            .merge(&MultiaddressesWithStats::from(vec![address]));
1✔
707

708
        let mut good_peer = create_test_peer(PeerFeatures::COMMUNICATION_NODE, false);
1✔
709
        let good_addresses = good_peer.addresses.borrow_mut();
1✔
710
        let good_address = good_addresses.addresses()[0].address().clone();
1✔
711
        good_addresses.mark_last_seen_now(&good_address);
1✔
712

713
        let mut good_seed = create_test_peer(PeerFeatures::COMMUNICATION_NODE, false);
1✔
714
        good_seed.flags = PeerFlags::SEED;
1✔
715
        let good_addresses = good_seed.addresses.borrow_mut();
1✔
716
        let good_address = good_addresses.addresses()[0].address().clone();
1✔
717
        good_addresses.mark_last_seen_now(&good_address);
1✔
718

719
        assert!(peer_storage.add_or_update_peer(never_seen_peer).is_ok());
1✔
720
        assert!(peer_storage.add_or_update_peer(not_active_peer).is_ok());
1✔
721
        assert!(peer_storage.add_or_update_peer(banned_peer).is_ok());
1✔
722
        assert!(peer_storage.add_or_update_peer(good_peer).is_ok());
1✔
723
        assert!(peer_storage.add_or_update_peer(good_seed.clone()).is_ok());
1✔
724

725
        assert_eq!(peer_storage.all(None).unwrap().len(), 5);
1✔
726
        assert_eq!(
1✔
727
            peer_storage
1✔
728
                .discovery_syncing(100, &[good_seed.node_id], Some(PeerFeatures::COMMUNICATION_NODE), false,)
1✔
729
                .unwrap()
1✔
730
                .len(),
1✔
731
            1
732
        );
733
        assert_eq!(
1✔
734
            peer_storage
1✔
735
                .discovery_syncing(100, &[], Some(PeerFeatures::COMMUNICATION_NODE), false)
1✔
736
                .unwrap()
1✔
737
                .len(),
1✔
738
            2
739
        );
740
    }
1✔
741

742
    #[test]
743
    fn discovery_syncing_peers_with_external_addresses_only() {
1✔
744
        let peer_storage = get_peer_storage_sql_test_db().unwrap();
1✔
745
        let nodes = repeat_with(|| create_test_peer_add_internal_addresses(false, PeerFeatures::COMMUNICATION_NODE))
5✔
746
            .take(5)
1✔
747
            .collect::<Vec<_>>();
1✔
748
        let wallets =
1✔
749
            repeat_with(|| create_test_peer_add_internal_addresses(false, PeerFeatures::COMMUNICATION_CLIENT))
5✔
750
                .take(5)
1✔
751
                .collect::<Vec<_>>();
1✔
752
        for peer in nodes.iter().chain(wallets.iter()) {
10✔
753
            peer_storage.add_or_update_peer(peer.clone()).unwrap();
10✔
754
        }
10✔
755

756
        // Assert that peers have internal and external addresses
757
        let nodes_all_addresses = peer_storage
1✔
758
            .discovery_syncing(100, &[], Some(PeerFeatures::COMMUNICATION_NODE), false)
1✔
759
            .unwrap();
1✔
760
        assert!(
1✔
761
            nodes_all_addresses
1✔
762
                .iter()
1✔
763
                .all(|p| { p.addresses.addresses().iter().any(|addr| addr.is_external()) })
5✔
764
        );
765
        assert!(
1✔
766
            nodes_all_addresses
1✔
767
                .iter()
1✔
768
                .all(|p| { p.addresses.addresses().iter().any(|addr| !addr.is_external()) })
14✔
769
        );
770

771
        // Assert that peers have external addresses only
772
        let nodes_external_addresses_only = peer_storage
1✔
773
            .discovery_syncing(100, &[], Some(PeerFeatures::COMMUNICATION_NODE), true)
1✔
774
            .unwrap();
1✔
775
        assert!(
1✔
776
            nodes_external_addresses_only
1✔
777
                .iter()
1✔
778
                .all(|p| { p.addresses.addresses().iter().all(|addr| addr.is_external()) })
9✔
779
        );
780
    }
1✔
781
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc