• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 23184374285

17 Mar 2026 08:04AM UTC coverage: 60.967% (-0.8%) from 61.722%
23184374285

push

github

SWvheerden
chore: new release v5.3.0-pre.3

69750 of 114406 relevant lines covered (60.97%)

227024.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.3
/comms/dht/src/connectivity/mod.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
//! # DHT Connectivity Actor
24
//!
25
//! Responsible for ensuring DHT network connectivity to a random peer set. This includes joining the
26
//! network when the node has established some peer connections (e.g to seed peers). It maintains a
27
//! random peer pool and instructs the comms `ConnectivityManager` to establish those connections. Once a configured
28
//! percentage of these peers is online, the node is established on the DHT network.
29
//!
30
//! The DHT connectivity actor monitors the connectivity state (using `ConnectivityEvent`s) and attempts
31
//! to maintain connectivity to the network as peers come and go.
32

33
#[cfg(test)]
34
mod test;
35

36
mod metrics;
37
use std::{
38
    sync::Arc,
39
    time::{Duration, Instant},
40
};
41

42
use log::*;
43
pub use metrics::{MetricsCollector, MetricsCollectorHandle};
44
use tari_comms::{
45
    Minimized,
46
    PeerConnection,
47
    PeerManager,
48
    connectivity::{
49
        ConnectivityError,
50
        ConnectivityEvent,
51
        ConnectivityEventRx,
52
        ConnectivityRequester,
53
        ConnectivitySelection,
54
    },
55
    multiaddr,
56
    peer_manager::{NodeId, Peer, PeerManagerError},
57
};
58
use tari_shutdown::ShutdownSignal;
59
use thiserror::Error;
60
use tokio::{sync::broadcast, task, task::JoinHandle, time, time::MissedTickBehavior};
61

62
use crate::{DhtActorError, DhtConfig, DhtRequester, connectivity::metrics::MetricsError, event::DhtEvent};
63

64
const LOG_TARGET: &str = "comms::dht::connectivity";
65

66
/// Error type for the DHT connectivity actor.
67
#[derive(Debug, Error)]
68
pub enum DhtConnectivityError {
69
    #[error("ConnectivityError: {0}")]
70
    ConnectivityError(#[from] ConnectivityError),
71
    #[error("PeerManagerError: {0}")]
72
    PeerManagerError(#[from] PeerManagerError),
73
    #[error("Failed to send network Join message: {0}")]
74
    SendJoinFailed(#[from] DhtActorError),
75
    #[error("Metrics error: {0}")]
76
    MetricError(#[from] MetricsError),
77
}
78

79
/// DHT connectivity actor.
80
pub(crate) struct DhtConnectivity {
81
    config: Arc<DhtConfig>,
82
    peer_manager: Arc<PeerManager>,
83
    connectivity: ConnectivityRequester,
84
    dht_requester: DhtRequester,
85
    /// A randomly-selected set of peers managed by this actor.
86
    random_pool: Vec<NodeId>,
87
    /// The random pool history.
88
    previous_random: Vec<NodeId>,
89
    /// Used to track when the random peer pool was last refreshed
90
    random_pool_last_refresh: Option<Instant>,
91
    /// Holds references to peer connections that should be kept alive
92
    connection_handles: Vec<PeerConnection>,
93
    stats: Stats,
94
    dht_events: broadcast::Receiver<Arc<DhtEvent>>,
95
    metrics_collector: MetricsCollectorHandle,
96
    cooldown_in_effect: Option<Instant>,
97
    shutdown_signal: ShutdownSignal,
98
}
99

100
impl DhtConnectivity {
101
    pub fn new(
86✔
102
        config: Arc<DhtConfig>,
86✔
103
        peer_manager: Arc<PeerManager>,
86✔
104
        connectivity: ConnectivityRequester,
86✔
105
        dht_requester: DhtRequester,
86✔
106
        dht_events: broadcast::Receiver<Arc<DhtEvent>>,
86✔
107
        metrics_collector: MetricsCollectorHandle,
86✔
108
        shutdown_signal: ShutdownSignal,
86✔
109
    ) -> Self {
86✔
110
        let pool_size = config.num_neighbouring_nodes + config.num_random_nodes;
86✔
111
        Self {
86✔
112
            random_pool: Vec::with_capacity(pool_size),
86✔
113
            connection_handles: Vec::with_capacity(pool_size),
86✔
114
            config,
86✔
115
            peer_manager,
86✔
116
            connectivity,
86✔
117
            dht_requester,
86✔
118
            metrics_collector,
86✔
119
            random_pool_last_refresh: None,
86✔
120
            stats: Stats::new(),
86✔
121
            dht_events,
86✔
122
            cooldown_in_effect: None,
86✔
123
            shutdown_signal,
86✔
124
            previous_random: vec![],
86✔
125
        }
86✔
126
    }
86✔
127

128
    /// Spawn a DhtConnectivity actor. This will immediately subscribe to the connection manager event stream to
129
    /// prevent unexpected missed events.
130
    pub fn spawn(mut self) -> JoinHandle<Result<(), DhtConnectivityError>> {
85✔
131
        // Listen to events as early as possible
132
        let connectivity_events = self.connectivity.get_event_subscription();
85✔
133
        task::spawn(async move {
85✔
134
            debug!(target: LOG_TARGET, "Waiting for connectivity manager to start");
85✔
135
            if let Err(err) = self.connectivity.wait_started().await {
85✔
136
                error!(target: LOG_TARGET, "Comms connectivity failed to start: {err}");
3✔
137
            }
82✔
138
            match self.run(connectivity_events).await {
85✔
139
                Ok(_) => Ok(()),
61✔
140
                Err(err) => {
3✔
141
                    error!(target: LOG_TARGET, "DhtConnectivity exited with error: {err:?}");
3✔
142
                    Err(err)
3✔
143
                },
144
            }
145
        })
64✔
146
    }
85✔
147

148
    pub async fn run(mut self, mut connectivity_events: ConnectivityEventRx) -> Result<(), DhtConnectivityError> {
85✔
149
        // Initial discovery and refresh sync peers delay period, when a configured connection needs preference,
150
        // usually needed for the wallet to connect to its own base node first.
151
        if let Some(delay) = self.config.network_discovery.initial_peer_sync_delay {
85✔
152
            tokio::time::sleep(delay).await;
×
153
            debug!(target: LOG_TARGET, "DHT connectivity starting after delayed for {delay:.0?}");
×
154
        }
85✔
155
        self.refresh_random_pool().await?;
85✔
156

157
        let mut ticker = time::interval(self.config.connectivity.update_interval);
82✔
158
        ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
82✔
159
        loop {
160
            tokio::select! {
484✔
161
                Ok(event) = connectivity_events.recv() => {
484✔
162
                    if let Err(err) = self.handle_connectivity_event(event).await {
335✔
163
                        error!(target: LOG_TARGET, "Error handling connectivity event: {err:?}");
2✔
164
                    }
318✔
165
                    if self.connection_handles.is_empty() {
320✔
166
                        debug!(target: LOG_TARGET, "No active peer connections detected. Triggering aggressive peer pool refresh.");
98✔
167
                        if let Err(err) = self.refresh_peer_pools(true).await {
98✔
168
                             error!(target: LOG_TARGET, "Error during aggressive peer pool refresh: {err:?}");
1✔
169
                        }
97✔
170
                    }
222✔
171
               },
172

173
               Ok(event) = self.dht_events.recv() => {
484✔
174
                    if let Err(err) = self.handle_dht_event(&event).await {
×
175
                        error!(target: LOG_TARGET, "Error handling DHT event: {err:?}");
×
176
                    }
×
177
               },
178

179
               _ = ticker.tick() => {
484✔
180
                    if let Err(err) = self.check_and_ban_flooding_peers().await {
82✔
181
                        error!(target: LOG_TARGET, "Error checking for peer flooding: {err:?}");
×
182
                    }
82✔
183
                    if let Err(err) = self.refresh_random_pool_if_required().await {
82✔
184
                        error!(target: LOG_TARGET, "Error refreshing random peer pool: {err:?}");
×
185
                    }
82✔
186
                    self.log_status();
82✔
187
                    if let Err(err) = self.check_minimum_required_tcp_nodes().await {
82✔
188
                        error!(target: LOG_TARGET, "Error checking minimum required TCP nodes: {err:?}");
×
189
                    }
82✔
190
               },
191

192
               _ = self.shutdown_signal.wait() => {
484✔
193
                    info!(target: LOG_TARGET, "DhtConnectivity shutting down because the shutdown signal was received");
61✔
194
                    break;
61✔
195
               }
196
            }
197
        }
198

199
        Ok(())
61✔
200
    }
64✔
201

202
    async fn check_minimum_required_tcp_nodes(&mut self) -> Result<(), DhtConnectivityError> {
82✔
203
        let desired_ratio = self.config.connectivity.minimum_desired_tcpv4_node_ratio;
82✔
204
        if desired_ratio == 0.0 {
82✔
205
            return Ok(());
×
206
        }
82✔
207

208
        let conns = self
82✔
209
            .connectivity
82✔
210
            .select_connections(ConnectivitySelection::all_nodes(vec![]))
82✔
211
            .await?;
82✔
212
        if conns.len() <= 1 {
82✔
213
            return Ok(());
82✔
214
        }
×
215

216
        let num_tcp_nodes = conns
×
217
            .iter()
×
218
            .filter(|conn| {
×
219
                let ip = conn.address().iter().next();
×
220
                let tcp = conn.address().iter().nth(2);
×
221
                matches!(ip, Some(multiaddr::Protocol::Ip4(_))) && matches!(tcp, Some(multiaddr::Protocol::Tcp(_)))
×
222
            })
×
223
            .count();
×
224

225
        let current_ratio = num_tcp_nodes as f32 / conns.len() as f32;
×
226
        if current_ratio < desired_ratio {
×
227
            debug!(
×
228
                target: LOG_TARGET,
×
229
                "{:.1?}% of this node's {} connections are using TCPv4. This node requires at least {:.1?}% of nodes \
230
                 to be TCP nodes.",
231
                (current_ratio * 100.0).round(),
×
232
                conns.len(),
×
233
                (desired_ratio * 100.0).round(),
×
234
            );
235
        }
×
236

237
        Ok(())
×
238
    }
82✔
239

240
    fn log_status(&self) {
100✔
241
        let pool_size = self.config.num_neighbouring_nodes + self.config.num_random_nodes;
100✔
242
        let (pool_connected, pool_pending) = self
100✔
243
            .random_pool
100✔
244
            .iter()
100✔
245
            .partition::<Vec<_>, _>(|peer| self.connection_handles.iter().any(|c| c.peer_node_id() == *peer));
100✔
246
        debug!(
100✔
247
            target: LOG_TARGET,
×
248
            "DHT connectivity status: {}peer pool: {}/{} ({} connected, last refreshed {}), active DHT connections: \
249
             {}/{}",
250
            self.cooldown_in_effect
×
251
                .map(|ts| format!(
×
252
                    "COOLDOWN({:.2?} remaining) ",
253
                    self.config
×
254
                        .connectivity
×
255
                        .high_failure_rate_cooldown
×
256
                        .saturating_sub(ts.elapsed())
×
257
                ))
258
                .unwrap_or_default(),
×
259
            self.random_pool.len(),
×
260
            pool_size,
261
            pool_connected.len(),
×
262
            self.random_pool_last_refresh
×
263
                .map(|i| format!("{:.0?} ago", i.elapsed()))
×
264
                .unwrap_or_else(|| "<never>".to_string()),
×
265
            self.connection_handles.len(),
×
266
            pool_size,
267
        );
268
        if !pool_pending.is_empty() {
100✔
269
            debug!(
55✔
270
                target: LOG_TARGET,
×
271
                "Pending connections: {}",
272
                pool_pending
×
273
                    .iter()
×
274
                    .map(ToString::to_string)
×
275
                    .collect::<Vec<_>>()
×
276
                    .join(", "),
×
277
            );
278
        }
45✔
279
    }
100✔
280

281
    async fn handle_dht_event(&mut self, event: &DhtEvent) -> Result<(), DhtConnectivityError> {
×
282
        #[allow(clippy::single_match)]
283
        match event {
×
284
            DhtEvent::NetworkDiscoveryPeersAdded(info) => {
×
285
                if info.num_new_peers > 0 {
×
286
                    self.refresh_peer_pools(false).await?;
×
287
                }
×
288
            },
289
            _ => {},
×
290
        }
291

292
        Ok(())
×
293
    }
×
294

295
    async fn check_and_ban_flooding_peers(&mut self) -> Result<(), DhtConnectivityError> {
82✔
296
        let nodes = self
82✔
297
            .metrics_collector
82✔
298
            .get_message_rates_exceeding(self.config.flood_ban_max_msg_count, self.config.flood_ban_timespan)
82✔
299
            .await?;
82✔
300

301
        for (peer, mps) in nodes {
82✔
302
            warn!(
×
303
                target: LOG_TARGET,
×
304
                "Banning peer `{peer}` because of flooding. Message rate: {mps:.2}m/s"
305
            );
306
            self.connectivity
×
307
                .ban_peer_until(
×
308
                    peer,
×
309
                    self.config.ban_duration_short,
×
310
                    format!(
×
311
                        "Exceeded maximum message rate. Config: {}/{:#?}. Rate: {:.2} m/s",
×
312
                        self.config.flood_ban_max_msg_count, self.config.flood_ban_timespan, mps
×
313
                    ),
×
314
                )
×
315
                .await?;
×
316
        }
317
        Ok(())
82✔
318
    }
82✔
319

320
    async fn refresh_peer_pools(&mut self, try_revive_connections: bool) -> Result<(), DhtConnectivityError> {
170✔
321
        info!(
170✔
322
            target: LOG_TARGET,
×
323
            "Reinitializing peer pool. (size={})",
324
            self.random_pool.len(),
×
325
        );
326

327
        if try_revive_connections {
170✔
328
            self.redial_pool_peers_as_required().await?;
98✔
329
        }
72✔
330
        self.refresh_random_pool().await?;
170✔
331

332
        Ok(())
169✔
333
    }
170✔
334

335
    async fn redial_pool_peers_as_required(&mut self) -> Result<(), DhtConnectivityError> {
98✔
336
        let disconnected = self
98✔
337
            .connection_handles
98✔
338
            .iter()
98✔
339
            .filter(|c| !c.is_connected())
98✔
340
            .collect::<Vec<_>>();
98✔
341
        let to_redial = self
98✔
342
            .random_pool
98✔
343
            .iter()
98✔
344
            .filter(|n| disconnected.iter().any(|c| c.peer_node_id() == *n))
98✔
345
            .cloned()
98✔
346
            .collect::<Vec<_>>();
98✔
347

348
        if !to_redial.is_empty() {
98✔
349
            debug!(
×
350
                target: LOG_TARGET,
×
351
                "Redialling {} disconnected peer(s)",
352
                to_redial.len()
×
353
            );
354
            self.dial_multiple_peers(&to_redial).await?;
×
355
        }
98✔
356

357
        Ok(())
98✔
358
    }
98✔
359

360
    async fn dial_multiple_peers(&self, peers_to_dial: &[NodeId]) -> Result<(), DhtConnectivityError> {
210✔
361
        if !peers_to_dial.is_empty() {
210✔
362
            self.connectivity.request_many_dials(peers_to_dial.to_vec()).await?;
85✔
363
        }
125✔
364

365
        Ok(())
209✔
366
    }
210✔
367

368
    async fn refresh_random_pool_if_required(&mut self) -> Result<(), DhtConnectivityError> {
82✔
369
        let pool_size = self.config.num_neighbouring_nodes + self.config.num_random_nodes;
82✔
370
        let should_refresh = pool_size > 0 &&
82✔
371
            self.random_pool_last_refresh
82✔
372
                .map(|instant| instant.elapsed() >= self.config.connectivity.random_pool_refresh_interval)
82✔
373
                .unwrap_or(true);
82✔
374
        if should_refresh {
82✔
375
            self.refresh_random_pool().await?;
32✔
376
        }
50✔
377

378
        Ok(())
82✔
379
    }
82✔
380

381
    async fn refresh_random_pool(&mut self) -> Result<(), DhtConnectivityError> {
287✔
382
        self.remove_unmanaged_peers_from_pools().await?;
287✔
383
        let pool_size = self.config.num_neighbouring_nodes + self.config.num_random_nodes;
283✔
384
        let mut exclude = Vec::new();
283✔
385
        if self.config.minimize_connections {
283✔
386
            exclude.extend(self.previous_random.iter().cloned());
×
387
        }
283✔
388
        let mut new_peers = self.fetch_random_peers(pool_size, &exclude).await?;
283✔
389
        if new_peers.is_empty() {
283✔
390
            info!(
96✔
391
                target: LOG_TARGET,
×
392
                "Unable to refresh peer pool because there are insufficient known peers",
393
            );
394
            return Ok(());
96✔
395
        }
187✔
396

397
        let (intersection, difference) = self
187✔
398
            .random_pool
187✔
399
            .drain(..)
187✔
400
            .partition::<Vec<_>, _>(|n| new_peers.contains(n));
187✔
401
        // Remove the peers that we want to keep from the `new_peers` to be added
402
        new_peers.retain(|n| !intersection.contains(n));
250✔
403
        self.random_pool = intersection;
187✔
404
        debug!(
187✔
405
            target: LOG_TARGET,
×
406
            "Adding new peers to peer pool (#new = {}, #keeping = {}, #removing = {})",
407
            new_peers.len(),
×
408
            self.random_pool.len(),
×
409
            difference.len()
×
410
        );
411
        trace!(
187✔
412
            target: LOG_TARGET,
×
413
            "Pool peers: Adding = {new_peers:?}, Removing = {difference:?}"
414

415
        );
416
        for peer in &new_peers {
187✔
417
            self.insert_random_peer(peer.clone());
84✔
418
        }
84✔
419
        // Drop any connection handles that removed from the pool
420
        difference.iter().for_each(|peer| {
187✔
421
            self.remove_connection_handle(peer);
2✔
422
        });
2✔
423
        self.dial_multiple_peers(&new_peers).await?;
187✔
424

425
        self.random_pool_last_refresh = Some(Instant::now());
187✔
426
        Ok(())
187✔
427
    }
287✔
428

429
    async fn handle_new_peer_connected(&mut self, conn: PeerConnection) -> Result<(), DhtConnectivityError> {
122✔
430
        self.remove_unmanaged_peers_from_pools().await?;
122✔
431
        if conn.peer_features().is_client() {
122✔
432
            debug!(
2✔
433
                target: LOG_TARGET,
×
434
                "Client node '{}' connected",
435
                conn.peer_node_id().short_str()
×
436
            );
437
            return Ok(());
2✔
438
        }
120✔
439

440
        if self.is_allow_list_peer(conn.peer_node_id()).await? {
120✔
441
            debug!(
×
442
                target: LOG_TARGET,
×
443
                "Unmanaged peer '{}' connected",
444
                conn.peer_node_id()
×
445
            );
446
            return Ok(());
×
447
        }
120✔
448

449
        if self.is_pool_peer(conn.peer_node_id()) {
120✔
450
            debug!(
68✔
451
                target: LOG_TARGET,
×
452
                "Added pool peer '{}' to connection handles",
453
                conn.peer_node_id()
×
454
            );
455
            self.insert_connection_handle(conn);
68✔
456
            return Ok(());
68✔
457
        }
52✔
458

459
        let pool_size = self.config.num_neighbouring_nodes + self.config.num_random_nodes;
52✔
460
        if self.random_pool.len() < pool_size {
52✔
461
            debug!(
51✔
462
                target: LOG_TARGET,
×
463
                "Peer '{}' connected. Adding to peer pool.",
464
                conn.peer_node_id().short_str()
×
465
            );
466
            self.random_pool.push(conn.peer_node_id().clone());
51✔
467
            self.insert_connection_handle(conn);
51✔
468
        }
1✔
469

470
        Ok(())
52✔
471
    }
122✔
472

473
    async fn pool_peers_with_active_connections(&self) -> Result<Vec<Peer>, DhtConnectivityError> {
×
474
        let peer_list = self
×
475
            .connection_handles
×
476
            .iter()
×
477
            .map(|conn| conn.peer_node_id())
×
478
            .cloned()
×
479
            .collect::<Vec<_>>();
×
480
        let peers = self.peer_manager.get_peers_by_node_ids(&peer_list).await?;
×
481
        debug!(
×
482
            target: LOG_TARGET,
×
483
            "minimize_connections: Filtered peers: {}, Handles: {}",
484
            peers.len(),
×
485
            self.connection_handles.len(),
×
486
        );
487
        Ok(peers)
×
488
    }
×
489

490
    async fn minimize_connections(&mut self) -> Result<(), DhtConnectivityError> {
×
491
        // Retrieve all communication node peers with an active connection status
492
        let mut peers_by_distance = self.pool_peers_with_active_connections().await?;
×
493
        let peer_allow_list = self.peer_allow_list().await?;
×
494
        peers_by_distance.retain(|p| !peer_allow_list.contains(&p.node_id));
×
495

496
        // Remove all above threshold connections
497
        let threshold = self.config.num_neighbouring_nodes + self.config.num_random_nodes;
×
498
        for peer in peers_by_distance.iter_mut().skip(threshold) {
×
499
            debug!(
×
500
                target: LOG_TARGET,
×
501
                "minimize_connections: Disconnecting '{}' because the node is not among the {} managed peers",
502
                peer.node_id,
503
                threshold
504
            );
505
            self.replace_pool_peer(&peer.node_id).await?;
×
506
            self.remove_connection_handle(&peer.node_id);
×
507
        }
508

509
        Ok(())
×
510
    }
×
511

512
    fn insert_connection_handle(&mut self, conn: PeerConnection) {
119✔
513
        // Remove any existing connection for this peer
514
        self.remove_connection_handle(conn.peer_node_id());
119✔
515
        trace!(target: LOG_TARGET, "Insert new peer connection {conn}" );
119✔
516
        self.connection_handles.push(conn);
119✔
517
    }
119✔
518

519
    fn remove_connection_handle(&mut self, node_id: &NodeId) {
145✔
520
        if let Some(idx) = self.connection_handles.iter().position(|c| c.peer_node_id() == node_id) {
145✔
521
            let conn = self.connection_handles.swap_remove(idx);
8✔
522
            trace!(target: LOG_TARGET, "Removing peer connection {conn}" );
8✔
523
        }
137✔
524
    }
145✔
525

526
    async fn handle_connectivity_event(&mut self, event: ConnectivityEvent) -> Result<(), DhtConnectivityError> {
335✔
527
        #[allow(clippy::enum_glob_use)]
528
        use ConnectivityEvent::*;
529
        debug!(target: LOG_TARGET, "Connectivity event: {event}");
335✔
530
        match event {
335✔
531
            PeerConnected(conn) => {
122✔
532
                self.handle_new_peer_connected(*conn.clone()).await?;
122✔
533
                trace!(
122✔
534
                    target: LOG_TARGET,
×
535
                    "Peer: node_id '{}', allow_list '{}', connected '{}'",
536
                    conn.peer_node_id(),
×
537
                    self.is_allow_list_peer(conn.peer_node_id()).await?,
×
538
                    conn.is_connected(),
×
539
                );
540

541
                if self.config.minimize_connections {
122✔
542
                    self.minimize_connections().await?;
×
543
                }
122✔
544
            },
545
            PeerConnectFailed(node_id) => {
4✔
546
                self.connection_handles.retain(|c| *c.peer_node_id() != node_id);
4✔
547
                if self.metrics_collector.clear_metrics(node_id.clone()).await.is_err() {
4✔
548
                    debug!(
×
549
                        target: LOG_TARGET,
×
550
                        "Failed to clear metrics for peer `{node_id}`. Metric collector is shut down."
551
                    );
552
                };
4✔
553
                self.remove_unmanaged_peers_from_pools().await?;
4✔
554
                if !self.is_pool_peer(&node_id) {
4✔
555
                    debug!(target: LOG_TARGET, "{node_id} is not managed by the DHT. Ignoring");
×
556
                    return Ok(());
×
557
                }
4✔
558
                self.replace_pool_peer(&node_id).await?;
4✔
559
                self.log_status();
4✔
560
            },
561
            PeerDisconnected(node_id, minimized) => {
33✔
562
                debug!(
33✔
563
                    target: LOG_TARGET,
×
564
                    "Peer: node_id '{}', allow_list '{}', connected 'false'",
565
                    node_id,
566
                    self.is_allow_list_peer(&node_id).await?,
×
567
                );
568
                self.connection_handles.retain(|c| *c.peer_node_id() != node_id);
50✔
569
                if self.metrics_collector.clear_metrics(node_id.clone()).await.is_err() {
33✔
570
                    debug!(
×
571
                        target: LOG_TARGET,
×
572
                        "Failed to clear metrics for peer `{node_id}`. Metric collector is shut down."
573
                    );
574
                };
33✔
575
                self.remove_unmanaged_peers_from_pools().await?;
33✔
576
                if !self.is_pool_peer(&node_id) {
32✔
577
                    debug!(target: LOG_TARGET, "{node_id} is not managed by the DHT. Ignoring");
×
578
                    return Ok(());
×
579
                }
32✔
580
                if minimized == Minimized::Yes || self.config.minimize_connections {
32✔
581
                    debug!(
10✔
582
                        target: LOG_TARGET,
×
583
                        "Peer '{node_id}' was disconnected because it was minimized, will not reconnect."
584

585
                    );
586
                    // Remove from managed pool if applicable
587
                    self.replace_pool_peer(&node_id).await?;
10✔
588
                    // In case the connections was not managed, remove the connection handle
589
                    self.remove_connection_handle(&node_id);
10✔
590
                    return Ok(());
10✔
591
                }
22✔
592
                debug!(target: LOG_TARGET, "Pool peer {node_id} disconnected. Redialling...");
22✔
593
                // Attempt to reestablish the lost connection to the pool peer. If reconnection fails,
594
                // it is replaced with another peer (replace_pool_peer via PeerConnectFailed)
595
                self.dial_multiple_peers(&[node_id]).await?;
22✔
596
            },
597
            ConnectivityStateOnline(n) => {
72✔
598
                self.refresh_peer_pools(false).await?;
72✔
599
                if self.config.auto_join && self.should_send_join() {
72✔
600
                    debug!(
×
601
                        target: LOG_TARGET,
×
602
                        "Node is online ({n} peer(s) connected). Sending network join message."
603
                    );
604
                    self.dht_requester
×
605
                        .send_join()
×
606
                        .await
×
607
                        .map_err(DhtConnectivityError::SendJoinFailed)?;
×
608

609
                    self.stats.mark_join_sent();
×
610
                }
72✔
611
            },
612
            ConnectivityStateOffline => {
613
                debug!(target: LOG_TARGET, "Node is OFFLINE");
15✔
614
                tokio::time::sleep(Duration::from_secs(15)).await;
15✔
615
                self.refresh_peer_pools(true).await?;
×
616
            },
617
            _ => {},
89✔
618
        }
619

620
        Ok(())
308✔
621
    }
320✔
622

623
    async fn peer_allow_list(&mut self) -> Result<Vec<NodeId>, DhtConnectivityError> {
891✔
624
        Ok(self.connectivity.get_allow_list().await?)
891✔
625
    }
891✔
626

627
    async fn replace_pool_peer(&mut self, current_peer: &NodeId) -> Result<(), DhtConnectivityError> {
14✔
628
        self.remove_unmanaged_peers_from_pools().await?;
14✔
629
        if self.is_allow_list_peer(current_peer).await? {
14✔
630
            debug!(
×
631
                target: LOG_TARGET,
×
632
                "Peer '{current_peer}' is on the allow list, ignoring replacement."
633

634
            );
635
            return Ok(());
×
636
        }
14✔
637

638
        if self.random_pool.contains(current_peer) {
14✔
639
            let mut exclude = self.get_pool_peers();
14✔
640
            if self.config.minimize_connections {
14✔
641
                exclude.extend(self.previous_random.iter().cloned());
×
642
                self.previous_random.push(current_peer.clone());
×
643
            }
14✔
644

645
            self.random_pool.retain(|n| n != current_peer);
28✔
646
            self.remove_connection_handle(current_peer);
14✔
647

648
            debug!(
14✔
649
                target: LOG_TARGET,
×
650
                "Peer '{current_peer}' in peer pool is unavailable. Adding a new peer if possible"
651
            );
652
            match self.fetch_random_peers(1, &exclude).await?.pop() {
14✔
653
                Some(new_peer) => {
1✔
654
                    self.insert_random_peer(new_peer.clone());
1✔
655
                    self.dial_multiple_peers(&[new_peer]).await?;
1✔
656
                },
657
                None => {
658
                    debug!(
13✔
659
                        target: LOG_TARGET,
×
660
                        "Unable to fetch new peer to replace disconnected peer '{}' because not enough peers \
661
                         are known. Pool size is {}.",
662
                        current_peer,
663
                        self.random_pool.len()
×
664
                    );
665
                },
666
            }
667
        }
×
668

669
        self.log_status();
14✔
670

671
        Ok(())
14✔
672
    }
14✔
673

674
    fn insert_random_peer(&mut self, node_id: NodeId) {
95✔
675
        let pool_size = self.config.num_neighbouring_nodes + self.config.num_random_nodes;
95✔
676
        self.random_pool.push(node_id);
95✔
677
        if self.random_pool.len() > pool_size &&
95✔
678
            let Some(removed_peer) = self.random_pool.pop() &&
2✔
679
            self.config.minimize_connections
2✔
680
        {
×
681
            self.previous_random.push(removed_peer.clone());
×
682
        }
95✔
683
    }
95✔
684

685
    async fn remove_unmanaged_peers_from_pools(&mut self) -> Result<(), DhtConnectivityError> {
460✔
686
        self.remove_allow_list_peers_from_pools().await?;
460✔
687
        self.remove_exlcuded_peers_from_pools().await
455✔
688
    }
460✔
689

690
    async fn remove_allow_list_peers_from_pools(&mut self) -> Result<(), DhtConnectivityError> {
460✔
691
        let allow_list = self.peer_allow_list().await?;
460✔
692
        self.random_pool.retain(|n| !allow_list.contains(n));
455✔
693
        Ok(())
455✔
694
    }
460✔
695

696
    async fn remove_exlcuded_peers_from_pools(&mut self) -> Result<(), DhtConnectivityError> {
455✔
697
        if !self.config.excluded_dial_addresses.is_empty() {
455✔
698
            let mut pool = Vec::with_capacity(self.random_pool.len());
×
699
            for peer in &self.random_pool {
×
700
                if let Ok(addresses) = self.peer_manager.get_peer_multi_addresses(peer).await &&
×
701
                    !addresses.iter().all(|addr| {
×
702
                        self.config
×
703
                            .excluded_dial_addresses
×
704
                            .iter()
×
705
                            .any(|v| v.contains(addr.address()))
×
706
                    })
×
707
                {
×
708
                    pool.push(peer.clone());
×
709
                }
×
710
            }
711
            self.random_pool = pool;
×
712
        }
455✔
713
        Ok(())
455✔
714
    }
455✔
715

716
    async fn is_allow_list_peer(&mut self, node_id: &NodeId) -> Result<bool, DhtConnectivityError> {
134✔
717
        Ok(self.peer_allow_list().await?.contains(node_id))
134✔
718
    }
134✔
719

720
    fn is_pool_peer(&self, node_id: &NodeId) -> bool {
156✔
721
        self.random_pool.contains(node_id)
156✔
722
    }
156✔
723

724
    fn get_pool_peers(&self) -> Vec<NodeId> {
14✔
725
        self.random_pool.clone()
14✔
726
    }
14✔
727

728
    async fn fetch_random_peers(&mut self, n: usize, excluded: &[NodeId]) -> Result<Vec<NodeId>, DhtConnectivityError> {
297✔
729
        let mut excluded = excluded.to_vec();
297✔
730
        excluded.extend(self.peer_allow_list().await?);
297✔
731
        let peers = self.peer_manager.random_peers(n, &excluded, None).await?;
297✔
732
        Ok(peers.into_iter().map(|p| p.node_id).collect())
297✔
733
    }
297✔
734

735
    fn should_send_join(&self) -> bool {
×
736
        let cooldown = self.config.join_cooldown_interval;
×
737
        self.stats
×
738
            .join_last_sent_at()
×
739
            .map(|at| at.elapsed() > cooldown)
×
740
            .unwrap_or(true)
×
741
    }
×
742
}
743

744
/// Basic connectivity stats. Right now, it is only used to track the last time a join message was sent to prevent the
745
/// node spamming the network if local connectivity changes.
746
#[derive(Debug, Default)]
747
struct Stats {
748
    join_last_sent_at: Option<Instant>,
749
}
750

751
impl Stats {
752
    pub fn new() -> Self {
86✔
753
        Default::default()
86✔
754
    }
86✔
755

756
    pub fn join_last_sent_at(&self) -> Option<Instant> {
×
757
        self.join_last_sent_at
×
758
    }
×
759

760
    pub fn mark_join_sent(&mut self) {
×
761
        self.join_last_sent_at = Some(Instant::now());
×
762
    }
×
763
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc