• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 27141517531

08 Jun 2026 01:37PM UTC coverage: 61.298% (-0.01%) from 61.308%
27141517531

push

github

SWvheerden
chore: new release v5.4.0-pre.3

72248 of 117863 relevant lines covered (61.3%)

221811.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.45
/base_layer/core/src/base_node/sync/sync_peer.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{
24
    cmp::Ordering,
25
    fmt::{Display, Formatter},
26
    time::Duration,
27
};
28

29
use primitive_types::U512;
30
use tari_common_types::chain_metadata::ChainMetadata;
31
use tari_comms::{PeerConnection, peer_manager::NodeId};
32

33
use crate::{base_node::chain_metadata_service::PeerChainMetadata, common::rolling_avg::RollingAverageTime};
34

35
#[derive(Debug, Clone)]
36
pub struct SyncPeer {
37
    peer_metadata: PeerChainMetadata,
38
    avg_latency: RollingAverageTime,
39
    /// Optional pre-dialled connection that travels with this sync peer across header_sync →
40
    /// block_sync → horizon_state_sync. Populated when entering header sync (typically as a
41
    /// [`tari_comms::RefKind::Strong`] handle so it remains pinned for the full sync). Cloning
42
    /// the SyncPeer preserves the underlying connection's strength because
43
    /// [`PeerConnection::clone`] is Arc-like.
44
    connection: Option<PeerConnection>,
45
}
46

47
impl SyncPeer {
48
    pub fn node_id(&self) -> &NodeId {
130✔
49
        self.peer_metadata.node_id()
130✔
50
    }
130✔
51

52
    pub fn claimed_chain_metadata(&self) -> &ChainMetadata {
33✔
53
        self.peer_metadata.claimed_chain_metadata()
33✔
54
    }
33✔
55

56
    pub fn claimed_difficulty(&self) -> U512 {
28✔
57
        self.peer_metadata.claimed_chain_metadata().accumulated_difficulty()
28✔
58
    }
28✔
59

60
    pub fn latency(&self) -> Option<Duration> {
111✔
61
        self.peer_metadata.latency()
111✔
62
    }
111✔
63

64
    pub(super) fn set_latency(&mut self, latency: Duration) -> &mut Self {
27✔
65
        self.peer_metadata.set_latency(latency);
27✔
66
        self
27✔
67
    }
27✔
68

69
    pub fn items_per_second(&self) -> Option<f64> {
×
70
        self.avg_latency.calc_samples_per_second()
×
71
    }
×
72

73
    pub(super) fn add_sample(&mut self, time: Duration) -> &mut Self {
17✔
74
        self.avg_latency.add_sample(time);
17✔
75
        self
17✔
76
    }
17✔
77

78
    pub fn calc_avg_latency(&self) -> Option<Duration> {
×
79
        self.avg_latency.calculate_average()
×
80
    }
×
81

82
    /// Returns the connection held by this peer, if any.
83
    pub fn connection(&self) -> Option<&PeerConnection> {
64✔
84
        self.connection.as_ref()
64✔
85
    }
64✔
86

87
    /// Stores a pre-dialled `PeerConnection` on this sync peer. Typically called once at the
88
    /// entry to header sync with a [`tari_comms::RefKind::Strong`] handle; that strong handle
89
    /// then propagates with the SyncPeer into block_sync and horizon_state_sync, pinning the
90
    /// connection for the full sync cycle.
91
    pub fn set_connection(&mut self, connection: PeerConnection) {
24✔
92
        self.connection = Some(connection);
24✔
93
    }
24✔
94

95
    /// Drops any stored connection for this peer (releasing the strong ref if one was held).
96
    /// Use when a peer is being removed from the sync set, e.g. after a failed attempt.
97
    pub fn clear_connection(&mut self) {
1✔
98
        self.connection = None;
1✔
99
    }
1✔
100

101
    /// If the stored connection is currently a Weak handle (or no connection is attached),
102
    /// upgrade it to Strong in place. If it's already Strong this is a no-op. The upgrade is
103
    /// cheap and never touches the connectivity actor — it just clones the existing handle.
104
    ///
105
    /// Returns `true` if a Strong handle is attached after this call (either pre-existing or
106
    /// newly upgraded), `false` if no connection was attached. Callers that get `false` should
107
    /// dial a fresh Strong connection via the connectivity layer.
108
    pub fn ensure_strong_connection(&mut self) -> bool {
27✔
109
        match self.connection.take() {
27✔
110
            Some(conn) if conn.is_strong() => {
2✔
111
                self.connection = Some(conn);
1✔
112
                true
1✔
113
            },
114
            Some(conn) => {
1✔
115
                // Upgrade: clone_strong bumps the shared strong-counter, then we drop the old
116
                // weak handle. Net result: +1 strong slot held by this SyncPeer.
117
                self.connection = Some(conn.clone_strong());
1✔
118
                true
1✔
119
            },
120
            None => false,
25✔
121
        }
122
    }
27✔
123

124
    /// If the stored connection is currently a Strong handle, replace it with a Weak clone.
125
    /// The connection itself remains attached so later stages can reuse it without redialing,
126
    /// but the strong-count is released — letting background reapers reclaim the peer if no
127
    /// other strong holder exists. No-op when the stored handle is already Weak or absent.
128
    pub fn downgrade_connection(&mut self) {
20✔
129
        match self.connection.take() {
20✔
130
            Some(conn) if conn.is_strong() => {
19✔
131
                // clone_weak does not touch the counter; dropping `conn` (the strong handle)
18✔
132
                // releases its slot. Net result: -1 strong slot, connection still attached.
18✔
133
                self.connection = Some(conn.clone_weak());
18✔
134
            },
18✔
135
            other => {
2✔
136
                self.connection = other;
2✔
137
            },
2✔
138
        }
139
    }
20✔
140
}
141

142
impl From<PeerChainMetadata> for SyncPeer {
143
    fn from(peer_metadata: PeerChainMetadata) -> Self {
65✔
144
        Self {
65✔
145
            peer_metadata,
65✔
146
            avg_latency: RollingAverageTime::new(20),
65✔
147
            connection: None,
65✔
148
        }
65✔
149
    }
65✔
150
}
151

152
impl Display for SyncPeer {
153
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
×
154
        write!(
×
155
            f,
×
156
            "Node ID: {}, Chain metadata: {}, Latency: {}",
157
            self.node_id(),
×
158
            self.claimed_chain_metadata(),
×
159
            self.latency()
×
160
                .map(|d| format!("{d:.2?}"))
×
161
                .unwrap_or_else(|| "--".to_string())
×
162
        )
163
    }
×
164
}
165

166
impl PartialEq for SyncPeer {
167
    fn eq(&self, other: &Self) -> bool {
17✔
168
        self.node_id() == other.node_id()
17✔
169
    }
17✔
170
}
171
impl Eq for SyncPeer {}
172

173
impl Ord for SyncPeer {
174
    fn cmp(&self, other: &Self) -> Ordering {
40✔
175
        let mut result = other
40✔
176
            .peer_metadata
40✔
177
            .claimed_chain_metadata()
40✔
178
            .accumulated_difficulty()
40✔
179
            .cmp(&self.peer_metadata.claimed_chain_metadata().accumulated_difficulty());
40✔
180
        if result == Ordering::Equal {
40✔
181
            match (self.latency(), other.latency()) {
37✔
182
                (None, None) => result = Ordering::Equal,
2✔
183
                // No latency goes to the end
184
                (Some(_), None) => result = Ordering::Less,
16✔
185
                (None, Some(_)) => result = Ordering::Greater,
×
186
                (Some(la), Some(lb)) => result = la.cmp(&lb),
19✔
187
            }
188
        }
3✔
189
        result
40✔
190
    }
40✔
191
}
192

193
impl PartialOrd for SyncPeer {
194
    fn partial_cmp(&self, other: &SyncPeer) -> Option<Ordering> {
40✔
195
        Some(self.cmp(other))
40✔
196
    }
40✔
197
}
198

199
#[cfg(test)]
200
mod test {
201
    #![allow(clippy::indexing_slicing)]
202
    use std::time::Duration;
203

204
    use tari_common_types::chain_metadata::ChainMetadata;
205

206
    use super::*;
207

208
    mod connection_attachment {
209
        use tari_common_types::types::FixedHash;
210
        use tari_comms::{
211
            RefKind,
212
            test_utils::mocks::create_dummy_peer_connection,
213
            types::{CommsPublicKey, CommsSecretKey},
214
        };
215
        use tari_crypto::keys::SecretKey;
216

217
        use super::*;
218

219
        fn peer_with_id() -> SyncPeer {
5✔
220
            let sk = CommsSecretKey::random(&mut rand::rng());
5✔
221
            let pk = CommsPublicKey::from_secret_key(&sk);
5✔
222
            let node_id = NodeId::from_key(&pk);
5✔
223
            PeerChainMetadata::new(
5✔
224
                node_id,
5✔
225
                ChainMetadata::new(0, FixedHash::zero(), 0, 0, U512::from(1), 0).unwrap(),
5✔
226
                None,
5✔
227
            )
228
            .into()
5✔
229
        }
5✔
230

231
        #[test]
232
        fn set_connection_stores_handle_and_clone_preserves_strength() {
1✔
233
            let mut peer = peer_with_id();
1✔
234
            assert!(peer.connection().is_none());
1✔
235

236
            let (raw_conn, _rx) = create_dummy_peer_connection(peer.node_id().clone());
1✔
237
            let strong = raw_conn.clone_strong();
1✔
238
            assert_eq!(strong.strong_count(), 1);
1✔
239

240
            peer.set_connection(strong);
1✔
241
            assert!(peer.connection().is_some());
1✔
242
            assert_eq!(peer.connection().unwrap().strong_count(), 1);
1✔
243

244
            // Cloning the SyncPeer clones the underlying strong handle (Arc-like): counter bumps.
245
            let peer_clone = peer.clone();
1✔
246
            assert_eq!(peer.connection().unwrap().strong_count(), 2);
1✔
247
            drop(peer_clone);
1✔
248
            assert_eq!(peer.connection().unwrap().strong_count(), 1);
1✔
249

250
            // Weak clone used inside attempt loops does not bump the counter.
251
            let weak = peer.connection().unwrap().clone_with(RefKind::Weak);
1✔
252
            assert_eq!(peer.connection().unwrap().strong_count(), 1);
1✔
253
            drop(weak);
1✔
254
            assert_eq!(peer.connection().unwrap().strong_count(), 1);
1✔
255

256
            // Releasing the SyncPeer entirely drops the only Strong handle.
257
            peer.clear_connection();
1✔
258
            assert!(peer.connection().is_none());
1✔
259
            // raw_conn (weak) observes the release via the shared counter.
260
            assert_eq!(raw_conn.strong_count(), 0);
1✔
261
        }
1✔
262

263
        #[test]
264
        fn downgrade_connection_releases_strong_but_keeps_handle() {
1✔
265
            let mut peer = peer_with_id();
1✔
266
            let (raw_conn, _rx) = create_dummy_peer_connection(peer.node_id().clone());
1✔
267
            peer.set_connection(raw_conn.clone_strong());
1✔
268
            assert_eq!(raw_conn.strong_count(), 1);
1✔
269

270
            peer.downgrade_connection();
1✔
271
            // Connection still attached for reuse, but strong-count is back to zero.
272
            assert!(peer.connection().is_some());
1✔
273
            assert!(!peer.connection().unwrap().is_strong());
1✔
274
            assert_eq!(raw_conn.strong_count(), 0);
1✔
275
        }
1✔
276

277
        #[test]
278
        fn downgrade_connection_is_noop_for_weak_or_none() {
1✔
279
            let mut peer = peer_with_id();
1✔
280
            // None case
281
            peer.downgrade_connection();
1✔
282
            assert!(peer.connection().is_none());
1✔
283

284
            // Weak case
285
            let (raw_conn, _rx) = create_dummy_peer_connection(peer.node_id().clone());
1✔
286
            peer.set_connection(raw_conn.clone_weak());
1✔
287
            assert!(!peer.connection().unwrap().is_strong());
1✔
288
            peer.downgrade_connection();
1✔
289
            assert!(peer.connection().is_some());
1✔
290
            assert!(!peer.connection().unwrap().is_strong());
1✔
291
            assert_eq!(raw_conn.strong_count(), 0);
1✔
292
        }
1✔
293

294
        #[test]
295
        fn ensure_strong_connection_upgrades_weak_in_place() {
1✔
296
            let mut peer = peer_with_id();
1✔
297
            let (raw_conn, _rx) = create_dummy_peer_connection(peer.node_id().clone());
1✔
298
            peer.set_connection(raw_conn.clone_weak());
1✔
299
            assert_eq!(raw_conn.strong_count(), 0);
1✔
300

301
            let upgraded = peer.ensure_strong_connection();
1✔
302
            assert!(upgraded);
1✔
303
            assert!(peer.connection().unwrap().is_strong());
1✔
304
            assert_eq!(raw_conn.strong_count(), 1);
1✔
305

306
            // Calling again is idempotent — already-strong handles aren't double-counted.
307
            let upgraded_again = peer.ensure_strong_connection();
1✔
308
            assert!(upgraded_again);
1✔
309
            assert_eq!(raw_conn.strong_count(), 1);
1✔
310
        }
1✔
311

312
        #[test]
313
        fn ensure_strong_connection_returns_false_when_no_connection() {
1✔
314
            let mut peer = peer_with_id();
1✔
315
            assert!(!peer.ensure_strong_connection());
1✔
316
        }
1✔
317
    }
318

319
    mod sort_by_latency {
320
        use tari_common_types::types::FixedHash;
321
        use tari_comms::types::{CommsPublicKey, CommsSecretKey};
322
        use tari_crypto::keys::SecretKey;
323

324
        use super::*;
325

326
        // Helper function to generate a peer with a given latency
327
        fn generate_peer(latency: Option<usize>, accumulated_difficulty: Option<U512>) -> SyncPeer {
16✔
328
            let sk = CommsSecretKey::random(&mut rand::rng());
16✔
329
            let pk = CommsPublicKey::from_secret_key(&sk);
16✔
330
            let node_id = NodeId::from_key(&pk);
16✔
331
            let latency_option = latency.map(|latency| Duration::from_millis(latency as u64));
16✔
332
            let peer_accumulated_difficulty = match accumulated_difficulty {
16✔
333
                Some(v) => v,
3✔
334
                None => 1.into(),
13✔
335
            };
336
            PeerChainMetadata::new(
16✔
337
                node_id,
16✔
338
                ChainMetadata::new(0, FixedHash::zero(), 0, 0, peer_accumulated_difficulty, 0).unwrap(),
16✔
339
                latency_option,
16✔
340
            )
341
            .into()
16✔
342
        }
16✔
343

344
        #[test]
345
        fn it_sorts_by_latency() {
1✔
346
            const DISTINCT_LATENCY: usize = 5;
347

348
            // Generate a list of peers with latency, adding duplicates
349
            let mut peers = (0..2 * DISTINCT_LATENCY)
1✔
350
                .map(|latency| generate_peer(Some(latency % DISTINCT_LATENCY), None))
10✔
351
                .collect::<Vec<SyncPeer>>();
1✔
352

353
            // Add peers with no latency in a few places
354
            peers.insert(0, generate_peer(None, None));
1✔
355
            peers.insert(DISTINCT_LATENCY, generate_peer(None, None));
1✔
356
            peers.push(generate_peer(None, None));
1✔
357

358
            // Sort the list; because difficulty is identical, it should sort by latency
359
            peers.sort();
1✔
360

361
            // Confirm that the sorted latency is correct: numerical ordering, then `None`
362
            for (i, peer) in peers[..2 * DISTINCT_LATENCY].iter().enumerate() {
10✔
363
                assert_eq!(peer.latency(), Some(Duration::from_millis((i as u64) / 2)));
10✔
364
            }
365
            for _ in 0..3 {
1✔
366
                assert_eq!(peers.pop().unwrap().latency(), None);
3✔
367
            }
368
        }
1✔
369

370
        #[test]
371
        fn it_sorts_by_pow() {
1✔
372
            let mut peers = Vec::new();
1✔
373

374
            let mut pow = U512::from(1);
1✔
375
            let new_peer = generate_peer(Some(1), Some(pow));
1✔
376
            peers.push(new_peer);
1✔
377
            pow = U512::from(100);
1✔
378
            let new_peer = generate_peer(Some(100), Some(pow));
1✔
379
            peers.push(new_peer);
1✔
380
            pow = U512::from(1000);
1✔
381
            let new_peer = generate_peer(Some(1000), Some(pow));
1✔
382
            peers.push(new_peer);
1✔
383

384
            // Sort the list;
385
            peers.sort();
1✔
386

387
            assert_eq!(
1✔
388
                peers[0].peer_metadata.claimed_chain_metadata().accumulated_difficulty(),
1✔
389
                1000.into()
1✔
390
            );
391
            assert_eq!(
1✔
392
                peers[1].peer_metadata.claimed_chain_metadata().accumulated_difficulty(),
1✔
393
                100.into()
1✔
394
            );
395
            assert_eq!(
1✔
396
                peers[2].peer_metadata.claimed_chain_metadata().accumulated_difficulty(),
1✔
397
                1.into()
1✔
398
            );
399
        }
1✔
400
    }
401
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc