• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 18402812670

10 Oct 2025 09:44AM UTC coverage: 59.552% (+1.0%) from 58.589%
18402812670

push

github

web-flow
chore(ci): remove bitnami docker images for debian slim images and minor updates (#7543)

Description
remove bitnami docker images for debian slim images
minor updates and tor version bump

Motivation and Context
Move away from bitnami docker images as they going pay for use

How Has This Been Tested?
Builds in local fork, runnings in local container


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- New Features
  - None
- Bug Fixes
  - None
- Chores
- Switched build and runtime base images to Debian slim variants for
improved consistency.
- Updated Rust toolchain to 1.90.0 and moved OS base to Debian “trixie.”
  - Bumped Tor package to 0.4.8.19-r0.
- Documentation
- Refreshed inline comments to reflect new base image sources and Tor
branch reference.
- Tests
  - None

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

67582 of 113484 relevant lines covered (59.55%)

304718.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.47
/comms/dht/src/connectivity/mod.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
//! # DHT Connectivity Actor
24
//!
25
//! Responsible for ensuring DHT network connectivity to a neighbouring and random peer set. This includes joining the
26
//! network when the node has established some peer connections (e.g to seed peers). It maintains neighbouring and
27
//! random peer pools and instructs the comms `ConnectivityManager` to establish those connections. Once a configured
28
//! percentage of these peers is online, the node is established on the DHT network.
29
//!
30
//! The DHT connectivity actor monitors the connectivity state (using `ConnectivityEvent`s) and attempts
31
//! to maintain connectivity to the network as peers come and go.
32

33
#[cfg(test)]
34
mod test;
35

36
mod metrics;
37
use std::{
38
    sync::Arc,
39
    time::{Duration, Instant},
40
};
41

42
use log::*;
43
pub use metrics::{MetricsCollector, MetricsCollectorHandle};
44
use tari_comms::{
45
    connectivity::{
46
        ConnectivityError,
47
        ConnectivityEvent,
48
        ConnectivityEventRx,
49
        ConnectivityRequester,
50
        ConnectivitySelection,
51
    },
52
    multiaddr,
53
    peer_manager::{NodeDistance, NodeId, Peer, PeerFeatures, PeerManagerError, STALE_PEER_THRESHOLD_DURATION},
54
    Minimized,
55
    NodeIdentity,
56
    PeerConnection,
57
    PeerManager,
58
};
59
use tari_shutdown::ShutdownSignal;
60
use thiserror::Error;
61
use tokio::{sync::broadcast, task, task::JoinHandle, time, time::MissedTickBehavior};
62

63
use crate::{connectivity::metrics::MetricsError, event::DhtEvent, DhtActorError, DhtConfig, DhtRequester};
64

65
const LOG_TARGET: &str = "comms::dht::connectivity";
66

67
/// Error type for the DHT connectivity actor.
68
#[derive(Debug, Error)]
69
pub enum DhtConnectivityError {
70
    #[error("ConnectivityError: {0}")]
71
    ConnectivityError(#[from] ConnectivityError),
72
    #[error("PeerManagerError: {0}")]
73
    PeerManagerError(#[from] PeerManagerError),
74
    #[error("Failed to send network Join message: {0}")]
75
    SendJoinFailed(#[from] DhtActorError),
76
    #[error("Metrics error: {0}")]
77
    MetricError(#[from] MetricsError),
78
}
79

80
/// DHT connectivity actor.
81
pub(crate) struct DhtConnectivity {
82
    config: Arc<DhtConfig>,
83
    peer_manager: Arc<PeerManager>,
84
    node_identity: Arc<NodeIdentity>,
85
    connectivity: ConnectivityRequester,
86
    dht_requester: DhtRequester,
87
    /// List of neighbours managed by DhtConnectivity ordered by distance from this node
88
    neighbours: Vec<NodeId>,
89
    /// A randomly-selected set of peers, excluding neighbouring peers.
90
    random_pool: Vec<NodeId>,
91
    /// The random pool history.
92
    previous_random: Vec<NodeId>,
93
    /// Used to track when the random peer pool was last refreshed
94
    random_pool_last_refresh: Option<Instant>,
95
    /// Holds references to peer connections that should be kept alive
96
    connection_handles: Vec<PeerConnection>,
97
    stats: Stats,
98
    dht_events: broadcast::Receiver<Arc<DhtEvent>>,
99
    metrics_collector: MetricsCollectorHandle,
100
    cooldown_in_effect: Option<Instant>,
101
    shutdown_signal: ShutdownSignal,
102
}
103

104
impl DhtConnectivity {
105
    pub fn new(
39✔
106
        config: Arc<DhtConfig>,
39✔
107
        peer_manager: Arc<PeerManager>,
39✔
108
        node_identity: Arc<NodeIdentity>,
39✔
109
        connectivity: ConnectivityRequester,
39✔
110
        dht_requester: DhtRequester,
39✔
111
        dht_events: broadcast::Receiver<Arc<DhtEvent>>,
39✔
112
        metrics_collector: MetricsCollectorHandle,
39✔
113
        shutdown_signal: ShutdownSignal,
39✔
114
    ) -> Self {
39✔
115
        Self {
39✔
116
            neighbours: Vec::with_capacity(config.num_neighbouring_nodes),
39✔
117
            random_pool: Vec::with_capacity(config.num_random_nodes),
39✔
118
            connection_handles: Vec::with_capacity(config.num_neighbouring_nodes + config.num_random_nodes),
39✔
119
            config,
39✔
120
            peer_manager,
39✔
121
            node_identity,
39✔
122
            connectivity,
39✔
123
            dht_requester,
39✔
124
            metrics_collector,
39✔
125
            random_pool_last_refresh: None,
39✔
126
            stats: Stats::new(),
39✔
127
            dht_events,
39✔
128
            cooldown_in_effect: None,
39✔
129
            shutdown_signal,
39✔
130
            previous_random: vec![],
39✔
131
        }
39✔
132
    }
39✔
133

134
    /// Spawn a DhtConnectivity actor. This will immediately subscribe to the connection manager event stream to
135
    /// prevent unexpected missed events.
136
    pub fn spawn(mut self) -> JoinHandle<Result<(), DhtConnectivityError>> {
38✔
137
        // Listen to events as early as possible
138
        let connectivity_events = self.connectivity.get_event_subscription();
38✔
139
        task::spawn(async move {
38✔
140
            debug!(target: LOG_TARGET, "Waiting for connectivity manager to start");
38✔
141
            if let Err(err) = self.connectivity.wait_started().await {
38✔
142
                error!(target: LOG_TARGET, "Comms connectivity failed to start: {err}");
3✔
143
            }
35✔
144
            match self.run(connectivity_events).await {
38✔
145
                Ok(_) => Ok(()),
27✔
146
                Err(err) => {
3✔
147
                    error!(target: LOG_TARGET, "DhtConnectivity exited with error: {err:?}");
3✔
148
                    Err(err)
3✔
149
                },
150
            }
151
        })
30✔
152
    }
38✔
153

154
    pub async fn run(mut self, mut connectivity_events: ConnectivityEventRx) -> Result<(), DhtConnectivityError> {
38✔
155
        // Initial discovery and refresh sync peers delay period, when a configured connection needs preference,
156
        // usually needed for the wallet to connect to its own base node first.
157
        if let Some(delay) = self.config.network_discovery.initial_peer_sync_delay {
38✔
158
            tokio::time::sleep(delay).await;
×
159
            debug!(target: LOG_TARGET, "DHT connectivity starting after delayed for {delay:.0?}");
×
160
        }
38✔
161
        self.refresh_neighbour_pool(true).await?;
38✔
162

163
        let mut ticker = time::interval(self.config.connectivity.update_interval);
35✔
164
        ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
35✔
165
        loop {
166
            tokio::select! {
220✔
167
                Ok(event) = connectivity_events.recv() => {
220✔
168
                    if let Err(err) = self.handle_connectivity_event(event).await {
155✔
169
                        error!(target: LOG_TARGET, "Error handling connectivity event: {err:?}");
1✔
170
                    }
149✔
171
               },
172

173
               Ok(event) = self.dht_events.recv() => {
220✔
174
                    if let Err(err) = self.handle_dht_event(&event).await {
×
175
                        error!(target: LOG_TARGET, "Error handling DHT event: {err:?}");
×
176
                    }
×
177
               },
178

179
               _ = ticker.tick() => {
220✔
180
                    if let Err(err) = self.check_and_ban_flooding_peers().await {
35✔
181
                        error!(target: LOG_TARGET, "Error checking for peer flooding: {err:?}");
×
182
                    }
35✔
183
                    if let Err(err) = self.refresh_neighbour_pool_if_required().await {
35✔
184
                        error!(target: LOG_TARGET, "Error refreshing neighbour peer pool: {err:?}");
×
185
                    }
35✔
186
                    if let Err(err) = self.refresh_random_pool_if_required().await {
35✔
187
                        error!(target: LOG_TARGET, "Error refreshing random peer pool: {err:?}");
×
188
                    }
35✔
189
                    self.log_status();
35✔
190
                    if let Err(err) = self.check_minimum_required_tcp_nodes().await {
35✔
191
                        error!(target: LOG_TARGET, "Error checking minimum required TCP nodes: {err:?}");
×
192
                    }
35✔
193
               },
194

195
               _ = self.shutdown_signal.wait() => {
220✔
196
                    info!(target: LOG_TARGET, "DhtConnectivity shutting down because the shutdown signal was received");
27✔
197
                    break;
27✔
198
               }
199
            }
200
        }
201

202
        Ok(())
27✔
203
    }
30✔
204

205
    async fn check_minimum_required_tcp_nodes(&mut self) -> Result<(), DhtConnectivityError> {
35✔
206
        let desired_ratio = self.config.connectivity.minimum_desired_tcpv4_node_ratio;
35✔
207
        if desired_ratio == 0.0 {
35✔
208
            return Ok(());
×
209
        }
35✔
210

211
        let conns = self
35✔
212
            .connectivity
35✔
213
            .select_connections(ConnectivitySelection::all_nodes(vec![]))
35✔
214
            .await?;
35✔
215
        if conns.len() <= 1 {
35✔
216
            return Ok(());
35✔
217
        }
×
218

219
        let num_tcp_nodes = conns
×
220
            .iter()
×
221
            .filter(|conn| {
×
222
                let ip = conn.address().iter().next();
×
223
                let tcp = conn.address().iter().nth(2);
×
224
                matches!(ip, Some(multiaddr::Protocol::Ip4(_))) && matches!(tcp, Some(multiaddr::Protocol::Tcp(_)))
×
225
            })
×
226
            .count();
×
227

228
        let current_ratio = num_tcp_nodes as f32 / conns.len() as f32;
×
229
        if current_ratio < desired_ratio {
×
230
            debug!(
×
231
                target: LOG_TARGET,
×
232
                "{:.1?}% of this node's {} connections are using TCPv4. This node requires at least {:.1?}% of nodes \
×
233
                 to be TCP nodes.",
×
234
                (current_ratio * 100.0).round(),
×
235
                conns.len(),
×
236
                (desired_ratio * 100.0).round(),
×
237
            );
238
        }
×
239

240
        Ok(())
×
241
    }
35✔
242

243
    fn log_status(&self) {
44✔
244
        let (neighbour_connected, neighbour_pending) = self
44✔
245
            .neighbours
44✔
246
            .iter()
44✔
247
            .partition::<Vec<_>, _>(|peer| self.connection_handles.iter().any(|c| c.peer_node_id() == *peer));
61✔
248
        let (random_connected, random_pending) = self
44✔
249
            .random_pool
44✔
250
            .iter()
44✔
251
            .partition::<Vec<_>, _>(|peer| self.connection_handles.iter().any(|c| c.peer_node_id() == *peer));
44✔
252
        debug!(
44✔
253
            target: LOG_TARGET,
×
254
            "DHT connectivity status: {}neighbour pool: {}/{} ({} connected), random pool: {}/{} ({} connected, last \
×
255
             refreshed {}), active DHT connections: {}/{}",
×
256
            self.cooldown_in_effect
×
257
                .map(|ts| format!(
×
258
                    "COOLDOWN({:.2?} remaining) ",
×
259
                    self.config
×
260
                        .connectivity
×
261
                        .high_failure_rate_cooldown
×
262
                        .saturating_sub(ts.elapsed())
×
263
                ))
264
                .unwrap_or_default(),
×
265
            self.neighbours.len(),
×
266
            self.config.num_neighbouring_nodes,
×
267
            neighbour_connected.len(),
×
268
            self.random_pool.len(),
×
269
            self.config.num_random_nodes,
×
270
            random_connected.len(),
×
271
            self.random_pool_last_refresh
×
272
                .map(|i| format!("{:.0?} ago", i.elapsed()))
×
273
                .unwrap_or_else(|| "<never>".to_string()),
×
274
            self.connection_handles.len(),
×
275
            self.config.num_neighbouring_nodes + self.config.num_random_nodes,
×
276
        );
277
        if !neighbour_pending.is_empty() || !random_pending.is_empty() {
44✔
278
            debug!(
28✔
279
                target: LOG_TARGET,
×
280
                "Pending connections: neighbouring({}), random({})",
×
281
                neighbour_pending
×
282
                    .iter()
×
283
                    .map(ToString::to_string)
×
284
                    .collect::<Vec<_>>()
×
285
                    .join(", "),
×
286
                random_pending
×
287
                    .iter()
×
288
                    .map(ToString::to_string)
×
289
                    .collect::<Vec<_>>()
×
290
                    .join(", ")
×
291
            );
292
        }
16✔
293
    }
44✔
294

295
    async fn handle_dht_event(&mut self, event: &DhtEvent) -> Result<(), DhtConnectivityError> {
×
296
        #[allow(clippy::single_match)]
297
        match event {
×
298
            DhtEvent::NetworkDiscoveryPeersAdded(info) => {
×
299
                if info.num_new_peers > 0 {
×
300
                    self.refresh_peer_pools(false).await?;
×
301
                }
×
302
            },
303
            _ => {},
×
304
        }
305

306
        Ok(())
×
307
    }
×
308

309
    async fn check_and_ban_flooding_peers(&mut self) -> Result<(), DhtConnectivityError> {
35✔
310
        let nodes = self
35✔
311
            .metrics_collector
35✔
312
            .get_message_rates_exceeding(self.config.flood_ban_max_msg_count, self.config.flood_ban_timespan)
35✔
313
            .await?;
35✔
314

315
        for (peer, mps) in nodes {
35✔
316
            warn!(
×
317
                target: LOG_TARGET,
×
318
                "Banning peer `{peer}` because of flooding. Message rate: {mps:.2}m/s"
×
319
            );
320
            self.connectivity
×
321
                .ban_peer_until(
×
322
                    peer,
×
323
                    self.config.ban_duration_short,
×
324
                    format!(
×
325
                        "Exceeded maximum message rate. Config: {}/{:#?}. Rate: {:.2} m/s",
×
326
                        self.config.flood_ban_max_msg_count, self.config.flood_ban_timespan, mps
×
327
                    ),
×
328
                )
×
329
                .await?;
×
330
        }
331
        Ok(())
35✔
332
    }
35✔
333

334
    async fn refresh_peer_pools(&mut self, try_revive_connections: bool) -> Result<(), DhtConnectivityError> {
32✔
335
        info!(
32✔
336
            target: LOG_TARGET,
×
337
            "Reinitializing neighbour pool. (size={})",
×
338
            self.neighbours.len(),
×
339
        );
340

341
        self.refresh_neighbour_pool(try_revive_connections).await?;
32✔
342
        self.refresh_random_pool().await?;
32✔
343

344
        Ok(())
32✔
345
    }
32✔
346

347
    async fn refresh_neighbour_pool_if_required(&mut self) -> Result<(), DhtConnectivityError> {
35✔
348
        if self.num_connected_neighbours() < self.config.num_neighbouring_nodes {
35✔
349
            self.refresh_neighbour_pool(false).await?;
35✔
350
        }
×
351

352
        Ok(())
35✔
353
    }
35✔
354

355
    fn num_connected_neighbours(&self) -> usize {
35✔
356
        self.neighbours
35✔
357
            .iter()
35✔
358
            .filter(|peer| self.connection_handles.iter().any(|c| c.peer_node_id() == *peer))
41✔
359
            .count()
35✔
360
    }
35✔
361

362
    fn connected_pool_peers_iter(&self) -> impl Iterator<Item = &NodeId> {
105✔
363
        self.connection_handles.iter().map(|c| c.peer_node_id())
105✔
364
    }
105✔
365

366
    async fn refresh_neighbour_pool(&mut self, try_revive_connections: bool) -> Result<(), DhtConnectivityError> {
105✔
367
        self.remove_unmanaged_peers_from_pools().await?;
105✔
368
        let mut new_neighbours = self
102✔
369
            .fetch_neighbouring_peers(self.config.num_neighbouring_nodes, &[], try_revive_connections)
102✔
370
            .await?;
102✔
371

372
        if new_neighbours.is_empty() {
102✔
373
            debug!(
39✔
374
                target: LOG_TARGET,
×
375
                "Unable to refresh neighbouring peer pool because there are insufficient known/online peers",
×
376
            );
377
            self.redial_neighbours_as_required().await?;
39✔
378
            return Ok(());
39✔
379
        }
63✔
380

381
        let (intersection, difference) = self
63✔
382
            .neighbours
63✔
383
            .iter()
63✔
384
            .cloned()
63✔
385
            .partition::<Vec<_>, _>(|n| new_neighbours.contains(n));
63✔
386
        // Only retain the peers that aren't already added
387
        new_neighbours.retain(|n| !intersection.contains(n));
97✔
388
        self.neighbours.retain(|n| intersection.contains(n));
63✔
389

390
        debug!(
63✔
391
            target: LOG_TARGET,
×
392
            "Adding {} neighbouring peer(s), removing {} peers: {}",
×
393
            new_neighbours.len(),
×
394
            difference.len(),
×
395
            new_neighbours
×
396
                .iter()
×
397
                .map(ToString::to_string)
×
398
                .collect::<Vec<_>>()
×
399
                .join(", ")
×
400
        );
401

402
        new_neighbours.iter().cloned().for_each(|peer| {
63✔
403
            self.insert_neighbour_ordered_by_distance(peer);
48✔
404
        });
48✔
405
        self.dial_multiple_peers(&new_neighbours).await?;
63✔
406

407
        Ok(())
63✔
408
    }
105✔
409

410
    async fn dial_multiple_peers(&self, peers_to_dial: &[NodeId]) -> Result<(), DhtConnectivityError> {
90✔
411
        if !peers_to_dial.is_empty() {
90✔
412
            self.connectivity.request_many_dials(peers_to_dial.to_vec()).await?;
60✔
413
        }
30✔
414

415
        Ok(())
90✔
416
    }
90✔
417

418
    async fn redial_neighbours_as_required(&mut self) -> Result<(), DhtConnectivityError> {
39✔
419
        let disconnected = self
39✔
420
            .connection_handles
39✔
421
            .iter()
39✔
422
            .filter(|c| !c.is_connected())
39✔
423
            .collect::<Vec<_>>();
39✔
424
        let to_redial = self
39✔
425
            .neighbours
39✔
426
            .iter()
39✔
427
            .filter(|n| disconnected.iter().any(|c| c.peer_node_id() == *n))
39✔
428
            .cloned()
39✔
429
            .collect::<Vec<_>>();
39✔
430

431
        if !to_redial.is_empty() {
39✔
432
            debug!(
×
433
                target: LOG_TARGET,
×
434
                "Redialling {} disconnected peer(s)",
×
435
                to_redial.len()
×
436
            );
437
            self.dial_multiple_peers(&to_redial).await?;
×
438
        }
39✔
439

440
        Ok(())
39✔
441
    }
39✔
442

443
    async fn refresh_random_pool_if_required(&mut self) -> Result<(), DhtConnectivityError> {
35✔
444
        let should_refresh = self.config.num_random_nodes > 0 &&
35✔
445
            self.random_pool_last_refresh
33✔
446
                .map(|instant| instant.elapsed() >= self.config.connectivity.random_pool_refresh_interval)
33✔
447
                .unwrap_or(true);
33✔
448
        if should_refresh {
35✔
449
            self.refresh_random_pool().await?;
33✔
450
        }
2✔
451

452
        Ok(())
35✔
453
    }
35✔
454

455
    async fn refresh_random_pool(&mut self) -> Result<(), DhtConnectivityError> {
65✔
456
        self.remove_unmanaged_peers_from_pools().await?;
65✔
457
        let mut exclude = self.neighbours.clone();
65✔
458
        if self.config.minimize_connections {
65✔
459
            exclude.extend(self.previous_random.iter().cloned());
×
460
        }
65✔
461
        let mut random_peers = self.fetch_random_peers(self.config.num_random_nodes, &exclude).await?;
65✔
462
        if random_peers.is_empty() {
65✔
463
            info!(
52✔
464
                target: LOG_TARGET,
×
465
                "Unable to refresh random peer pool because there are insufficient known peers",
×
466
            );
467
            return Ok(());
52✔
468
        }
13✔
469

470
        let (intersection, difference) = self
13✔
471
            .random_pool
13✔
472
            .drain(..)
13✔
473
            .partition::<Vec<_>, _>(|n| random_peers.contains(n));
13✔
474
        // Remove the peers that we want to keep from the `random_peers` to be added
475
        random_peers.retain(|n| !intersection.contains(n));
14✔
476
        self.random_pool = intersection;
13✔
477
        debug!(
13✔
478
            target: LOG_TARGET,
×
479
            "Adding new peers to random peer pool (#new = {}, #keeping = {}, #removing = {})",
×
480
            random_peers.len(),
×
481
            self.random_pool.len(),
×
482
            difference.len()
×
483
        );
484
        trace!(
13✔
485
            target: LOG_TARGET,
×
486
            "Random peers: Adding = {random_peers:?}, Removing = {difference:?}"
×
487

488
        );
489
        for peer in &random_peers {
27✔
490
            self.insert_random_peer_ordered_by_distance(peer.clone());
14✔
491
        }
14✔
492
        // Drop any connection handles that removed from the random pool
493
        difference.iter().for_each(|peer| {
13✔
494
            self.remove_connection_handle(peer);
×
495
        });
×
496
        self.dial_multiple_peers(&random_peers).await?;
13✔
497

498
        self.random_pool_last_refresh = Some(Instant::now());
13✔
499
        Ok(())
13✔
500
    }
65✔
501

502
    async fn handle_new_peer_connected(&mut self, conn: PeerConnection) -> Result<(), DhtConnectivityError> {
62✔
503
        self.remove_unmanaged_peers_from_pools().await?;
62✔
504
        if conn.peer_features().is_client() {
62✔
505
            debug!(
2✔
506
                target: LOG_TARGET,
×
507
                "Client node '{}' connected",
×
508
                conn.peer_node_id().short_str()
×
509
            );
510
            return Ok(());
2✔
511
        }
60✔
512

513
        if self.is_allow_list_peer(conn.peer_node_id()).await? {
60✔
514
            debug!(
×
515
                target: LOG_TARGET,
×
516
                "Unmanaged peer '{}' connected",
×
517
                conn.peer_node_id()
×
518
            );
519
            return Ok(());
×
520
        }
60✔
521

522
        if self.is_pool_peer(conn.peer_node_id()) {
60✔
523
            debug!(
38✔
524
                target: LOG_TARGET,
×
525
                "Added pool peer '{}' to connection handles",
×
526
                conn.peer_node_id()
×
527
            );
528
            self.insert_connection_handle(conn);
38✔
529
            return Ok(());
38✔
530
        }
22✔
531

532
        let current_dist = conn.peer_node_id().distance(self.node_identity.node_id());
22✔
533
        let neighbour_distance = self.get_neighbour_max_distance();
22✔
534
        if current_dist < neighbour_distance {
22✔
535
            debug!(
22✔
536
                target: LOG_TARGET,
×
537
                "Peer '{}' connected that is closer than any current neighbour. Adding to neighbours.",
×
538
                conn.peer_node_id().short_str()
×
539
            );
540

541
            let peer_to_insert = conn.peer_node_id().clone();
22✔
542
            if let Some(node_id) = self.insert_neighbour_ordered_by_distance(peer_to_insert.clone()) {
22✔
543
                // If we kicked a neighbour out of our neighbour pool, add it to the random pool if
544
                // it is not full or if it is closer than the furthest random peer.
545
                debug!(
×
546
                    target: LOG_TARGET,
×
547
                    "Moving peer '{peer_to_insert}' from neighbouring pool to random pool if not full or closer"
×
548
                );
549
                self.insert_random_peer_ordered_by_distance(node_id)
×
550
            }
22✔
551
            self.insert_connection_handle(conn);
22✔
552
        }
×
553

554
        Ok(())
22✔
555
    }
62✔
556

557
    async fn pool_peers_with_active_connections_by_distance(&self) -> Result<Vec<Peer>, DhtConnectivityError> {
×
558
        let peer_list = self
×
559
            .connection_handles
×
560
            .iter()
×
561
            .map(|conn| conn.peer_node_id())
×
562
            .cloned()
×
563
            .collect::<Vec<_>>();
×
564
        let mut peers_by_distance = self.peer_manager.get_peers_by_node_ids(&peer_list).await?;
×
565
        peers_by_distance.sort_by_key(|a| a.node_id.distance(self.node_identity.node_id()));
×
566

567
        debug!(
×
568
            target: LOG_TARGET,
×
569
            "minimize_connections: Filtered peers: {}, Handles: {}",
×
570
            peers_by_distance.len(),
×
571
            self.connection_handles.len(),
×
572
        );
573
        Ok(peers_by_distance)
×
574
    }
×
575

576
    async fn minimize_connections(&mut self) -> Result<(), DhtConnectivityError> {
×
577
        // Retrieve all communication node peers with an active connection status
578
        let mut peers_by_distance = self.pool_peers_with_active_connections_by_distance().await?;
×
579
        let peer_allow_list = self.peer_allow_list().await?;
×
580
        peers_by_distance.retain(|p| !peer_allow_list.contains(&p.node_id));
×
581

582
        // Remove all above threshold connections
583
        let threshold = self.config.num_neighbouring_nodes + self.config.num_random_nodes;
×
584
        for peer in peers_by_distance.iter_mut().skip(threshold) {
×
585
            debug!(
×
586
                target: LOG_TARGET,
×
587
                "minimize_connections: Disconnecting '{}' because the node is not among the {} closest peers",
×
588
                peer.node_id,
589
                threshold
590
            );
591
            self.replace_pool_peer(&peer.node_id).await?;
×
592
            self.remove_connection_handle(&peer.node_id);
×
593
        }
594

595
        Ok(())
×
596
    }
×
597

598
    fn insert_connection_handle(&mut self, conn: PeerConnection) {
60✔
599
        // Remove any existing connection for this peer
600
        self.remove_connection_handle(conn.peer_node_id());
60✔
601
        trace!(target: LOG_TARGET, "Insert new peer connection {conn}" );
60✔
602
        self.connection_handles.push(conn);
60✔
603
    }
60✔
604

605
    fn remove_connection_handle(&mut self, node_id: &NodeId) {
70✔
606
        if let Some(idx) = self.connection_handles.iter().position(|c| c.peer_node_id() == node_id) {
70✔
607
            let conn = self.connection_handles.swap_remove(idx);
4✔
608
            trace!(target: LOG_TARGET, "Removing peer connection {conn}" );
4✔
609
        }
66✔
610
    }
70✔
611

612
    async fn handle_connectivity_event(&mut self, event: ConnectivityEvent) -> Result<(), DhtConnectivityError> {
155✔
613
        #[allow(clippy::enum_glob_use)]
614
        use ConnectivityEvent::*;
615
        debug!(target: LOG_TARGET, "Connectivity event: {event}");
155✔
616
        match event {
155✔
617
            PeerConnected(conn) => {
62✔
618
                self.handle_new_peer_connected(*conn.clone()).await?;
62✔
619
                trace!(
62✔
620
                    target: LOG_TARGET,
×
621
                    "Peer: node_id '{}', allow_list '{}', connected '{}'",
×
622
                    conn.peer_node_id(),
×
623
                    self.is_allow_list_peer(conn.peer_node_id()).await?,
×
624
                    conn.is_connected(),
×
625
                );
626

627
                if self.config.minimize_connections {
62✔
628
                    self.minimize_connections().await?;
×
629
                }
62✔
630
            },
631
            PeerConnectFailed(node_id) => {
3✔
632
                self.connection_handles.retain(|c| *c.peer_node_id() != node_id);
3✔
633
                if self.metrics_collector.clear_metrics(node_id.clone()).await.is_err() {
3✔
634
                    debug!(
×
635
                        target: LOG_TARGET,
×
636
                        "Failed to clear metrics for peer `{node_id}`. Metric collector is shut down."
×
637
                    );
638
                };
3✔
639
                self.remove_unmanaged_peers_from_pools().await?;
3✔
640
                if !self.is_pool_peer(&node_id) {
3✔
641
                    debug!(target: LOG_TARGET, "{node_id} is not managed by the DHT. Ignoring");
×
642
                    return Ok(());
×
643
                }
3✔
644
                self.replace_pool_peer(&node_id).await?;
3✔
645
                self.log_status();
3✔
646
            },
647
            PeerDisconnected(node_id, minimized) => {
17✔
648
                debug!(
17✔
649
                    target: LOG_TARGET,
×
650
                    "Peer: node_id '{}', allow_list '{}', connected 'false'",
×
651
                    node_id,
652
                    self.is_allow_list_peer(&node_id).await?,
×
653
                );
654
                self.connection_handles.retain(|c| *c.peer_node_id() != node_id);
29✔
655
                if self.metrics_collector.clear_metrics(node_id.clone()).await.is_err() {
17✔
656
                    debug!(
×
657
                        target: LOG_TARGET,
×
658
                        "Failed to clear metrics for peer `{node_id}`. Metric collector is shut down."
×
659
                    );
660
                };
17✔
661
                self.remove_unmanaged_peers_from_pools().await?;
17✔
662
                if !self.is_pool_peer(&node_id) {
17✔
663
                    debug!(target: LOG_TARGET, "{node_id} is not managed by the DHT. Ignoring");
×
664
                    return Ok(());
×
665
                }
17✔
666
                if minimized == Minimized::Yes || self.config.minimize_connections {
17✔
667
                    debug!(
4✔
668
                        target: LOG_TARGET,
×
669
                        "Peer '{node_id}' was disconnected because it was minimized, will not reconnect."
×
670

671
                    );
672
                    // Remove from managed pool if applicable
673
                    self.replace_pool_peer(&node_id).await?;
4✔
674
                    // In case the connections was not managed, remove the connection handle
675
                    self.remove_connection_handle(&node_id);
3✔
676
                    return Ok(());
3✔
677
                }
13✔
678
                debug!(target: LOG_TARGET, "Pool peer {node_id} disconnected. Redialling...");
13✔
679
                // Attempt to reestablish the lost connection to the pool peer. If reconnection fails,
680
                // it is replaced with another peer (replace_pool_peer via PeerConnectFailed)
681
                self.dial_multiple_peers(&[node_id]).await?;
13✔
682
            },
683
            ConnectivityStateOnline(n) => {
32✔
684
                self.refresh_peer_pools(false).await?;
32✔
685
                if self.config.auto_join && self.should_send_join() {
32✔
686
                    debug!(
×
687
                        target: LOG_TARGET,
×
688
                        "Node is online ({n} peer(s) connected). Sending network join message."
×
689
                    );
690
                    self.dht_requester
×
691
                        .send_join()
×
692
                        .await
×
693
                        .map_err(DhtConnectivityError::SendJoinFailed)?;
×
694

695
                    self.stats.mark_join_sent();
×
696
                }
32✔
697
            },
698
            ConnectivityStateOffline => {
699
                debug!(target: LOG_TARGET, "Node is OFFLINE");
5✔
700
                tokio::time::sleep(Duration::from_secs(15)).await;
5✔
701
                self.refresh_peer_pools(true).await?;
×
702
            },
703
            _ => {},
36✔
704
        }
705

706
        Ok(())
146✔
707
    }
150✔
708

709
    async fn peer_allow_list(&mut self) -> Result<Vec<NodeId>, DhtConnectivityError> {
500✔
710
        Ok(self.connectivity.get_allow_list().await?)
500✔
711
    }
500✔
712

713
    async fn all_connected_comms_nodes(&mut self) -> Result<Vec<NodeId>, DhtConnectivityError> {
×
714
        let all_connections = self
×
715
            .connectivity
×
716
            .select_connections(ConnectivitySelection::closest_to(
×
717
                self.node_identity.node_id().clone(),
×
718
                usize::MAX,
×
719
                vec![],
×
720
            ))
×
721
            .await?;
×
722
        let comms_nodes = all_connections
×
723
            .iter()
×
724
            .filter(|p| p.peer_features().is_node())
×
725
            .map(|p| p.peer_node_id().clone())
×
726
            .collect();
×
727
        Ok(comms_nodes)
×
728
    }
×
729

730
    async fn replace_pool_peer(&mut self, current_peer: &NodeId) -> Result<(), DhtConnectivityError> {
7✔
731
        self.remove_unmanaged_peers_from_pools().await?;
7✔
732
        if self.is_allow_list_peer(current_peer).await? {
7✔
733
            debug!(
×
734
                target: LOG_TARGET,
×
735
                "Peer '{current_peer}' is on the allow list, ignoring replacement."
×
736

737
            );
738
            return Ok(());
×
739
        }
7✔
740

741
        if self.random_pool.contains(current_peer) {
7✔
742
            let mut exclude = self.get_pool_peers();
3✔
743
            if self.config.minimize_connections {
3✔
744
                exclude.extend(self.previous_random.iter().cloned());
×
745
                self.previous_random.push(current_peer.clone());
×
746
            }
3✔
747

748
            self.random_pool.retain(|n| n != current_peer);
3✔
749
            self.remove_connection_handle(current_peer);
3✔
750

751
            debug!(
3✔
752
                target: LOG_TARGET,
×
753
                "Peer '{current_peer}' in random pool is unavailable. Adding a new random peer if possible"
×
754
            );
755
            match self.fetch_random_peers(1, &exclude).await?.pop() {
3✔
756
                Some(new_peer) => {
×
757
                    self.insert_random_peer_ordered_by_distance(new_peer.clone());
×
758
                    self.dial_multiple_peers(&[new_peer]).await?;
×
759
                },
760
                None => {
761
                    debug!(
3✔
762
                        target: LOG_TARGET,
×
763
                        "Unable to fetch new random peer to replace disconnected peer '{}' because not enough peers \
×
764
                         are known. Random pool size is {}.",
×
765
                        current_peer,
766
                        self.random_pool.len()
×
767
                    );
768
                },
769
            }
770
        }
4✔
771

772
        if self.neighbours.contains(current_peer) {
7✔
773
            let exclude = self.get_pool_peers();
4✔
774

775
            self.neighbours.retain(|n| n != current_peer);
10✔
776
            self.remove_connection_handle(current_peer);
4✔
777

778
            debug!(
4✔
779
                target: LOG_TARGET,
×
780
                "Peer '{current_peer}' in neighbour pool is offline. Adding a new peer if possible"
×
781
            );
782
            match self.fetch_neighbouring_peers(1, &exclude, false).await?.pop() {
4✔
783
                Some(new_peer) => {
1✔
784
                    self.insert_neighbour_ordered_by_distance(new_peer.clone());
1✔
785
                    self.dial_multiple_peers(&[new_peer]).await?;
1✔
786
                },
787
                None => {
788
                    info!(
2✔
789
                        target: LOG_TARGET,
×
790
                        "Unable to fetch new neighbouring peer to replace disconnected peer '{}'. Neighbour pool size \
×
791
                         is {}.",
×
792
                        current_peer,
793
                        self.neighbours.len()
×
794
                    );
795
                },
796
            }
797
        }
3✔
798

799
        self.log_status();
6✔
800

801
        Ok(())
6✔
802
    }
7✔
803

804
    fn insert_neighbour_ordered_by_distance(&mut self, node_id: NodeId) -> Option<NodeId> {
81✔
805
        let dist = node_id.distance(self.node_identity.node_id());
81✔
806
        let pos = self
81✔
807
            .neighbours
81✔
808
            .iter()
81✔
809
            .position(|node_id| node_id.distance(self.node_identity.node_id()) > dist);
84✔
810

811
        match pos {
81✔
812
            Some(idx) => {
14✔
813
                self.neighbours.insert(idx, node_id);
14✔
814
            },
14✔
815
            None => {
67✔
816
                self.neighbours.push(node_id);
67✔
817
            },
67✔
818
        }
819

820
        if self.neighbours.len() > self.config.num_neighbouring_nodes {
81✔
821
            self.neighbours.pop()
2✔
822
        } else {
823
            None
79✔
824
        }
825
    }
81✔
826

827
    fn insert_random_peer_ordered_by_distance(&mut self, node_id: NodeId) {
14✔
828
        let dist = node_id.distance(self.node_identity.node_id());
14✔
829
        let pos = self
14✔
830
            .random_pool
14✔
831
            .iter()
14✔
832
            .position(|node_id| node_id.distance(self.node_identity.node_id()) > dist);
14✔
833

834
        match pos {
14✔
835
            Some(idx) => {
×
836
                self.random_pool.insert(idx, node_id);
×
837
            },
×
838
            None => {
14✔
839
                self.random_pool.push(node_id);
14✔
840
            },
14✔
841
        }
842

843
        if self.random_pool.len() > self.config.num_random_nodes {
14✔
844
            if let Some(removed_peer) = self.random_pool.pop() {
×
845
                if self.config.minimize_connections {
×
846
                    self.previous_random.push(removed_peer.clone());
×
847
                }
×
848
            }
×
849
        }
14✔
850
    }
14✔
851

852
    async fn remove_unmanaged_peers_from_pools(&mut self) -> Result<(), DhtConnectivityError> {
259✔
853
        self.remove_allow_list_peers_from_pools().await?;
259✔
854
        self.remove_exlcuded_peers_from_pools().await
256✔
855
    }
259✔
856

857
    async fn remove_allow_list_peers_from_pools(&mut self) -> Result<(), DhtConnectivityError> {
259✔
858
        let allow_list = self.peer_allow_list().await?;
259✔
859
        self.neighbours.retain(|n| !allow_list.contains(n));
261✔
860
        self.random_pool.retain(|n| !allow_list.contains(n));
256✔
861
        Ok(())
256✔
862
    }
259✔
863

864
    async fn remove_exlcuded_peers_from_pools(&mut self) -> Result<(), DhtConnectivityError> {
256✔
865
        if !self.config.excluded_dial_addresses.is_empty() {
256✔
866
            let mut neighbours = Vec::with_capacity(self.neighbours.len());
×
867
            for peer in &self.neighbours {
×
868
                if let Ok(addresses) = self.peer_manager.get_peer_multi_addresses(peer).await {
×
869
                    if !addresses.iter().all(|addr| {
×
870
                        self.config
×
871
                            .excluded_dial_addresses
×
872
                            .iter()
×
873
                            .any(|v| v.contains(addr.address()))
×
874
                    }) {
×
875
                        neighbours.push(peer.clone());
×
876
                    }
×
877
                }
×
878
            }
879
            self.neighbours = neighbours;
×
880

881
            let mut random_pool = Vec::with_capacity(self.random_pool.len());
×
882
            for peer in &self.random_pool {
×
883
                if let Ok(addresses) = self.peer_manager.get_peer_multi_addresses(peer).await {
×
884
                    if !addresses.iter().all(|addr| {
×
885
                        self.config
×
886
                            .excluded_dial_addresses
×
887
                            .iter()
×
888
                            .any(|v| v.contains(addr.address()))
×
889
                    }) {
×
890
                        random_pool.push(peer.clone());
×
891
                    }
×
892
                }
×
893
            }
894
            self.random_pool = random_pool;
×
895
        }
256✔
896
        Ok(())
256✔
897
    }
256✔
898

899
    async fn is_allow_list_peer(&mut self, node_id: &NodeId) -> Result<bool, DhtConnectivityError> {
67✔
900
        Ok(self.peer_allow_list().await?.contains(node_id))
67✔
901
    }
67✔
902

903
    fn is_pool_peer(&self, node_id: &NodeId) -> bool {
80✔
904
        self.neighbours.contains(node_id) || self.random_pool.contains(node_id)
80✔
905
    }
80✔
906

907
    fn get_pool_peers(&self) -> Vec<NodeId> {
7✔
908
        self.neighbours.iter().chain(self.random_pool.iter()).cloned().collect()
7✔
909
    }
7✔
910

911
    fn get_neighbour_max_distance(&self) -> NodeDistance {
127✔
912
        assert!(
127✔
913
            self.config.num_neighbouring_nodes > 0,
127✔
914
            "DhtConfig::num_neighbouring_nodes must be greater than zero"
×
915
        );
916

917
        if self.neighbours.len() < self.config.num_neighbouring_nodes {
127✔
918
            return NodeDistance::max_distance();
124✔
919
        }
3✔
920

921
        self.neighbours
3✔
922
            .last()
3✔
923
            .map(|node_id| node_id.distance(self.node_identity.node_id()))
3✔
924
            .expect("already checked")
3✔
925
    }
127✔
926

927
    async fn max_neighbour_distance_all_conncetions(&mut self) -> Result<NodeDistance, DhtConnectivityError> {
105✔
928
        let mut distance = self.get_neighbour_max_distance();
105✔
929
        if self.config.minimize_connections {
105✔
930
            let all_connected_comms_nodes = self.all_connected_comms_nodes().await?;
×
931
            if let Some(node_id) = all_connected_comms_nodes.get(self.config.num_neighbouring_nodes - 1) {
×
932
                let node_distance = self.node_identity.node_id().distance(node_id);
×
933
                if node_distance < distance {
×
934
                    distance = node_distance;
×
935
                }
×
936
            }
×
937
        }
105✔
938
        Ok(distance)
105✔
939
    }
105✔
940

941
    async fn fetch_neighbouring_peers(
106✔
942
        &mut self,
106✔
943
        n: usize,
106✔
944
        excluded: &[NodeId],
106✔
945
        try_revive_connections: bool,
106✔
946
    ) -> Result<Vec<NodeId>, DhtConnectivityError> {
106✔
947
        let peer_allow_list = self.peer_allow_list().await?;
106✔
948
        let neighbour_distance = self.max_neighbour_distance_all_conncetions().await?;
105✔
949
        let self_node_id = self.node_identity.node_id();
105✔
950
        let mut excluded = excluded.to_vec();
105✔
951

952
        // Exclude allowlist and already-connected peers at DB level
953
        excluded.extend(peer_allow_list);
105✔
954
        excluded.extend(self.connected_pool_peers_iter().cloned());
105✔
955

956
        // Set query options based on whether we're reviving
957
        let (stale_peer_threshold, exclude_if_all_address_failed) = if try_revive_connections {
105✔
958
            (Some(STALE_PEER_THRESHOLD_DURATION), false)
35✔
959
        } else {
960
            (Some(self.config.offline_peer_cooldown), true)
70✔
961
        };
962

963
        // Apply optional distance constraint only if minimize_connections is enabled
964
        let exclusion_distance = if self.config.minimize_connections {
105✔
965
            Some(neighbour_distance)
×
966
        } else {
967
            None
105✔
968
        };
969

970
        // Fetch n nearest neighbour Communication Nodes which are eligible for connection.
971
        // Currently, that means:
972
        // - the peer isn't banned;
973
        // - it has the required feature;
974
        // - it didn't recently fail to connect; and
975
        // - it is not in the exclusion list in closest_request.
976
        let peers = self
105✔
977
            .peer_manager
105✔
978
            .closest_n_active_peers(
105✔
979
                self_node_id,
105✔
980
                n,
105✔
981
                &excluded,
105✔
982
                Some(PeerFeatures::COMMUNICATION_NODE),
105✔
983
                None,
105✔
984
                stale_peer_threshold,
105✔
985
                exclude_if_all_address_failed,
105✔
986
                exclusion_distance,
105✔
987
                true,
105✔
988
            )
105✔
989
            .await?;
105✔
990

991
        // Return up to n node IDs
992
        Ok(peers.into_iter().map(|p| p.node_id).collect())
105✔
993
    }
106✔
994

995
    async fn fetch_random_peers(&mut self, n: usize, excluded: &[NodeId]) -> Result<Vec<NodeId>, DhtConnectivityError> {
68✔
996
        let mut excluded = excluded.to_vec();
68✔
997
        excluded.extend(self.peer_allow_list().await?);
68✔
998
        let peers = self.peer_manager.random_peers(n, &excluded, None).await?;
68✔
999
        Ok(peers.into_iter().map(|p| p.node_id).collect())
68✔
1000
    }
68✔
1001

1002
    fn should_send_join(&self) -> bool {
×
1003
        let cooldown = self.config.join_cooldown_interval;
×
1004
        self.stats
×
1005
            .join_last_sent_at()
×
1006
            .map(|at| at.elapsed() > cooldown)
×
1007
            .unwrap_or(true)
×
1008
    }
×
1009
}
1010

1011
/// Basic connectivity stats. Right now, it is only used to track the last time a join message was sent to prevent the
1012
/// node spamming the network if local connectivity changes.
1013
#[derive(Debug, Default)]
1014
struct Stats {
1015
    join_last_sent_at: Option<Instant>,
1016
}
1017

1018
impl Stats {
1019
    pub fn new() -> Self {
39✔
1020
        Default::default()
39✔
1021
    }
39✔
1022

1023
    pub fn join_last_sent_at(&self) -> Option<Instant> {
×
1024
        self.join_last_sent_at
×
1025
    }
×
1026

1027
    pub fn mark_join_sent(&mut self) {
×
1028
        self.join_last_sent_at = Some(Instant::now());
×
1029
    }
×
1030
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc