• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 17297453740

28 Aug 2025 01:33PM UTC coverage: 61.046% (+0.9%) from 60.14%
17297453740

push

github

web-flow
chore(ci): add a wasm build step in ci (#7448)

Description
Add a wasm build step in ci

Motivation and Context
Test the wasm builds

How Has This Been Tested?
Builds in local fork


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

* **Chores**
* Added CI workflow to build WebAssembly targets with optimized caching
on both hosted and self-hosted runners, improving build consistency and
speed.
* **Tests**
* Expanded automated checks to include WebAssembly build verification
for multiple modules, increasing coverage and early detection of build
issues.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

72582 of 118897 relevant lines covered (61.05%)

301536.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.21
/comms/dht/src/connectivity/mod.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
//! # DHT Connectivity Actor
24
//!
25
//! Responsible for ensuring DHT network connectivity to a neighbouring and random peer set. This includes joining the
26
//! network when the node has established some peer connections (e.g to seed peers). It maintains neighbouring and
27
//! random peer pools and instructs the comms `ConnectivityManager` to establish those connections. Once a configured
28
//! percentage of these peers is online, the node is established on the DHT network.
29
//!
30
//! The DHT connectivity actor monitors the connectivity state (using `ConnectivityEvent`s) and attempts
31
//! to maintain connectivity to the network as peers come and go.
32

33
#[cfg(test)]
34
mod test;
35

36
mod metrics;
37
use std::{
38
    sync::Arc,
39
    time::{Duration, Instant},
40
};
41

42
use log::*;
43
pub use metrics::{MetricsCollector, MetricsCollectorHandle};
44
use tari_comms::{
45
    connectivity::{
46
        ConnectivityError,
47
        ConnectivityEvent,
48
        ConnectivityEventRx,
49
        ConnectivityRequester,
50
        ConnectivitySelection,
51
    },
52
    multiaddr,
53
    peer_manager::{NodeDistance, NodeId, Peer, PeerFeatures, PeerManagerError, STALE_PEER_THRESHOLD_DURATION},
54
    Minimized,
55
    NodeIdentity,
56
    PeerConnection,
57
    PeerManager,
58
};
59
use tari_shutdown::ShutdownSignal;
60
use thiserror::Error;
61
use tokio::{sync::broadcast, task, task::JoinHandle, time, time::MissedTickBehavior};
62

63
use crate::{connectivity::metrics::MetricsError, event::DhtEvent, DhtActorError, DhtConfig, DhtRequester};
64

65
const LOG_TARGET: &str = "comms::dht::connectivity";
66

67
/// Error type for the DHT connectivity actor.
68
#[derive(Debug, Error)]
69
pub enum DhtConnectivityError {
70
    #[error("ConnectivityError: {0}")]
71
    ConnectivityError(#[from] ConnectivityError),
72
    #[error("PeerManagerError: {0}")]
73
    PeerManagerError(#[from] PeerManagerError),
74
    #[error("Failed to send network Join message: {0}")]
75
    SendJoinFailed(#[from] DhtActorError),
76
    #[error("Metrics error: {0}")]
77
    MetricError(#[from] MetricsError),
78
}
79

80
/// DHT connectivity actor.
81
pub(crate) struct DhtConnectivity {
82
    config: Arc<DhtConfig>,
83
    peer_manager: Arc<PeerManager>,
84
    node_identity: Arc<NodeIdentity>,
85
    connectivity: ConnectivityRequester,
86
    dht_requester: DhtRequester,
87
    /// List of neighbours managed by DhtConnectivity ordered by distance from this node
88
    neighbours: Vec<NodeId>,
89
    /// A randomly-selected set of peers, excluding neighbouring peers.
90
    random_pool: Vec<NodeId>,
91
    /// The random pool history.
92
    previous_random: Vec<NodeId>,
93
    /// Used to track when the random peer pool was last refreshed
94
    random_pool_last_refresh: Option<Instant>,
95
    /// Holds references to peer connections that should be kept alive
96
    connection_handles: Vec<PeerConnection>,
97
    stats: Stats,
98
    dht_events: broadcast::Receiver<Arc<DhtEvent>>,
99
    metrics_collector: MetricsCollectorHandle,
100
    cooldown_in_effect: Option<Instant>,
101
    shutdown_signal: ShutdownSignal,
102
}
103

104
impl DhtConnectivity {
105
    pub fn new(
39✔
106
        config: Arc<DhtConfig>,
39✔
107
        peer_manager: Arc<PeerManager>,
39✔
108
        node_identity: Arc<NodeIdentity>,
39✔
109
        connectivity: ConnectivityRequester,
39✔
110
        dht_requester: DhtRequester,
39✔
111
        dht_events: broadcast::Receiver<Arc<DhtEvent>>,
39✔
112
        metrics_collector: MetricsCollectorHandle,
39✔
113
        shutdown_signal: ShutdownSignal,
39✔
114
    ) -> Self {
39✔
115
        Self {
39✔
116
            neighbours: Vec::with_capacity(config.num_neighbouring_nodes),
39✔
117
            random_pool: Vec::with_capacity(config.num_random_nodes),
39✔
118
            connection_handles: Vec::with_capacity(config.num_neighbouring_nodes + config.num_random_nodes),
39✔
119
            config,
39✔
120
            peer_manager,
39✔
121
            node_identity,
39✔
122
            connectivity,
39✔
123
            dht_requester,
39✔
124
            metrics_collector,
39✔
125
            random_pool_last_refresh: None,
39✔
126
            stats: Stats::new(),
39✔
127
            dht_events,
39✔
128
            cooldown_in_effect: None,
39✔
129
            shutdown_signal,
39✔
130
            previous_random: vec![],
39✔
131
        }
39✔
132
    }
39✔
133

134
    /// Spawn a DhtConnectivity actor. This will immediately subscribe to the connection manager event stream to
135
    /// prevent unexpected missed events.
136
    pub fn spawn(mut self) -> JoinHandle<Result<(), DhtConnectivityError>> {
38✔
137
        // Listen to events as early as possible
38✔
138
        let connectivity_events = self.connectivity.get_event_subscription();
38✔
139
        task::spawn(async move {
38✔
140
            debug!(target: LOG_TARGET, "Waiting for connectivity manager to start");
38✔
141
            if let Err(err) = self.connectivity.wait_started().await {
38✔
142
                error!(target: LOG_TARGET, "Comms connectivity failed to start: {err}");
3✔
143
            }
35✔
144
            match self.run(connectivity_events).await {
38✔
145
                Ok(_) => Ok(()),
28✔
146
                Err(err) => {
3✔
147
                    error!(target: LOG_TARGET, "DhtConnectivity exited with error: {err:?}");
3✔
148
                    Err(err)
3✔
149
                },
150
            }
151
        })
38✔
152
    }
38✔
153

154
    pub async fn run(mut self, mut connectivity_events: ConnectivityEventRx) -> Result<(), DhtConnectivityError> {
38✔
155
        // Initial discovery and refresh sync peers delay period, when a configured connection needs preference,
156
        // usually needed for the wallet to connect to its own base node first.
157
        if let Some(delay) = self.config.network_discovery.initial_peer_sync_delay {
38✔
158
            tokio::time::sleep(delay).await;
×
159
            debug!(target: LOG_TARGET, "DHT connectivity starting after delayed for {delay:.0?}");
×
160
        }
38✔
161
        self.refresh_neighbour_pool(true).await?;
38✔
162

163
        let mut ticker = time::interval(self.config.connectivity.update_interval);
35✔
164
        ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
35✔
165
        loop {
166
            tokio::select! {
216✔
167
                Ok(event) = connectivity_events.recv() => {
216✔
168
                    if let Err(err) = self.handle_connectivity_event(event).await {
151✔
169
                        error!(target: LOG_TARGET, "Error handling connectivity event: {err:?}");
2✔
170
                    }
144✔
171
               },
172

173
               Ok(event) = self.dht_events.recv() => {
216✔
174
                    if let Err(err) = self.handle_dht_event(&event).await {
×
175
                        error!(target: LOG_TARGET, "Error handling DHT event: {err:?}");
×
176
                    }
×
177
               },
178

179
               _ = ticker.tick() => {
216✔
180
                    if let Err(err) = self.check_and_ban_flooding_peers().await {
35✔
181
                        error!(target: LOG_TARGET, "Error checking for peer flooding: {err:?}");
×
182
                    }
35✔
183
                    if let Err(err) = self.refresh_neighbour_pool_if_required().await {
35✔
184
                        error!(target: LOG_TARGET, "Error refreshing neighbour peer pool: {err:?}");
×
185
                    }
35✔
186
                    if let Err(err) = self.refresh_random_pool_if_required().await {
35✔
187
                        error!(target: LOG_TARGET, "Error refreshing random peer pool: {err:?}");
×
188
                    }
35✔
189
                    self.log_status();
35✔
190
                    if let Err(err) = self.check_minimum_required_tcp_nodes().await {
35✔
191
                        error!(target: LOG_TARGET, "Error checking minimum required TCP nodes: {err:?}");
×
192
                    }
35✔
193
               },
194

195
               _ = self.shutdown_signal.wait() => {
216✔
196
                    info!(target: LOG_TARGET, "DhtConnectivity shutting down because the shutdown signal was received");
28✔
197
                    break;
28✔
198
               }
28✔
199
            }
28✔
200
        }
28✔
201

28✔
202
        Ok(())
28✔
203
    }
31✔
204

205
    async fn check_minimum_required_tcp_nodes(&mut self) -> Result<(), DhtConnectivityError> {
35✔
206
        let desired_ratio = self.config.connectivity.minimum_desired_tcpv4_node_ratio;
35✔
207
        if desired_ratio == 0.0 {
35✔
208
            return Ok(());
×
209
        }
35✔
210

211
        let conns = self
35✔
212
            .connectivity
35✔
213
            .select_connections(ConnectivitySelection::all_nodes(vec![]))
35✔
214
            .await?;
35✔
215
        if conns.len() <= 1 {
35✔
216
            return Ok(());
35✔
217
        }
×
218

×
219
        let num_tcp_nodes = conns
×
220
            .iter()
×
221
            .filter(|conn| {
×
222
                let ip = conn.address().iter().next();
×
223
                let tcp = conn.address().iter().nth(2);
×
224
                matches!(ip, Some(multiaddr::Protocol::Ip4(_))) && matches!(tcp, Some(multiaddr::Protocol::Tcp(_)))
×
225
            })
×
226
            .count();
×
227

×
228
        let current_ratio = num_tcp_nodes as f32 / conns.len() as f32;
×
229
        if current_ratio < desired_ratio {
×
230
            debug!(
×
231
                target: LOG_TARGET,
×
232
                "{:.1?}% of this node's {} connections are using TCPv4. This node requires at least {:.1?}% of nodes \
×
233
                 to be TCP nodes.",
×
234
                (current_ratio * 100.0).round(),
×
235
                conns.len(),
×
236
                (desired_ratio * 100.0).round(),
×
237
            );
238
        }
×
239

240
        Ok(())
×
241
    }
35✔
242

243
    fn log_status(&self) {
43✔
244
        let (neighbour_connected, neighbour_pending) = self
43✔
245
            .neighbours
43✔
246
            .iter()
43✔
247
            .partition::<Vec<_>, _>(|peer| self.connection_handles.iter().any(|c| c.peer_node_id() == *peer));
57✔
248
        let (random_connected, random_pending) = self
43✔
249
            .random_pool
43✔
250
            .iter()
43✔
251
            .partition::<Vec<_>, _>(|peer| self.connection_handles.iter().any(|c| c.peer_node_id() == *peer));
43✔
252
        debug!(
43✔
253
            target: LOG_TARGET,
×
254
            "DHT connectivity status: {}neighbour pool: {}/{} ({} connected), random pool: {}/{} ({} connected, last \
×
255
             refreshed {}), active DHT connections: {}/{}",
×
256
            self.cooldown_in_effect
×
257
                .map(|ts| format!(
×
258
                    "COOLDOWN({:.2?} remaining) ",
×
259
                    self.config
×
260
                        .connectivity
×
261
                        .high_failure_rate_cooldown
×
262
                        .saturating_sub(ts.elapsed())
×
263
                ))
×
264
                .unwrap_or_default(),
×
265
            self.neighbours.len(),
×
266
            self.config.num_neighbouring_nodes,
×
267
            neighbour_connected.len(),
×
268
            self.random_pool.len(),
×
269
            self.config.num_random_nodes,
×
270
            random_connected.len(),
×
271
            self.random_pool_last_refresh
×
272
                .map(|i| format!("{:.0?} ago", i.elapsed()))
×
273
                .unwrap_or_else(|| "<never>".to_string()),
×
274
            self.connection_handles.len(),
×
275
            self.config.num_neighbouring_nodes + self.config.num_random_nodes,
×
276
        );
277
        if !neighbour_pending.is_empty() || !random_pending.is_empty() {
43✔
278
            debug!(
28✔
279
                target: LOG_TARGET,
×
280
                "Pending connections: neighbouring({}), random({})",
×
281
                neighbour_pending
×
282
                    .iter()
×
283
                    .map(ToString::to_string)
×
284
                    .collect::<Vec<_>>()
×
285
                    .join(", "),
×
286
                random_pending
×
287
                    .iter()
×
288
                    .map(ToString::to_string)
×
289
                    .collect::<Vec<_>>()
×
290
                    .join(", ")
×
291
            );
292
        }
15✔
293
    }
43✔
294

295
    async fn handle_dht_event(&mut self, event: &DhtEvent) -> Result<(), DhtConnectivityError> {
×
296
        #[allow(clippy::single_match)]
×
297
        match event {
×
298
            DhtEvent::NetworkDiscoveryPeersAdded(info) => {
×
299
                if info.num_new_peers > 0 {
×
300
                    self.refresh_peer_pools(false).await?;
×
301
                }
×
302
            },
303
            _ => {},
×
304
        }
305

306
        Ok(())
×
307
    }
×
308

309
    async fn check_and_ban_flooding_peers(&mut self) -> Result<(), DhtConnectivityError> {
35✔
310
        let nodes = self
35✔
311
            .metrics_collector
35✔
312
            .get_message_rates_exceeding(self.config.flood_ban_max_msg_count, self.config.flood_ban_timespan)
35✔
313
            .await?;
35✔
314

315
        for (peer, mps) in nodes {
35✔
316
            warn!(
×
317
                target: LOG_TARGET,
×
318
                "Banning peer `{peer}` because of flooding. Message rate: {mps:.2}m/s"
×
319
            );
320
            self.connectivity
×
321
                .ban_peer_until(
×
322
                    peer,
×
323
                    self.config.ban_duration_short,
×
324
                    format!(
×
325
                        "Exceeded maximum message rate. Config: {}/{:#?}. Rate: {:.2} m/s",
×
326
                        self.config.flood_ban_max_msg_count, self.config.flood_ban_timespan, mps
×
327
                    ),
×
328
                )
×
329
                .await?;
×
330
        }
331
        Ok(())
35✔
332
    }
35✔
333

334
    async fn refresh_peer_pools(&mut self, try_revive_connections: bool) -> Result<(), DhtConnectivityError> {
32✔
335
        info!(
32✔
336
            target: LOG_TARGET,
×
337
            "Reinitializing neighbour pool. (size={})",
×
338
            self.neighbours.len(),
×
339
        );
340

341
        self.refresh_neighbour_pool(try_revive_connections).await?;
32✔
342
        self.refresh_random_pool().await?;
32✔
343

344
        Ok(())
32✔
345
    }
32✔
346

347
    async fn refresh_neighbour_pool_if_required(&mut self) -> Result<(), DhtConnectivityError> {
35✔
348
        if self.num_connected_neighbours() < self.config.num_neighbouring_nodes {
35✔
349
            self.refresh_neighbour_pool(false).await?;
35✔
350
        }
×
351

352
        Ok(())
35✔
353
    }
35✔
354

355
    fn num_connected_neighbours(&self) -> usize {
35✔
356
        self.neighbours
35✔
357
            .iter()
35✔
358
            .filter(|peer| self.connection_handles.iter().any(|c| c.peer_node_id() == *peer))
41✔
359
            .count()
35✔
360
    }
35✔
361

362
    fn connected_pool_peers_iter(&self) -> impl Iterator<Item = &NodeId> {
105✔
363
        self.connection_handles.iter().map(|c| c.peer_node_id())
105✔
364
    }
105✔
365

366
    async fn refresh_neighbour_pool(&mut self, try_revive_connections: bool) -> Result<(), DhtConnectivityError> {
105✔
367
        self.remove_unmanaged_peers_from_pools().await?;
105✔
368
        let mut new_neighbours = self
102✔
369
            .fetch_neighbouring_peers(self.config.num_neighbouring_nodes, &[], try_revive_connections)
102✔
370
            .await?;
102✔
371

372
        if new_neighbours.is_empty() {
102✔
373
            debug!(
38✔
374
                target: LOG_TARGET,
×
375
                "Unable to refresh neighbouring peer pool because there are insufficient known/online peers",
×
376
            );
377
            self.redial_neighbours_as_required().await?;
38✔
378
            return Ok(());
38✔
379
        }
64✔
380

64✔
381
        let (intersection, difference) = self
64✔
382
            .neighbours
64✔
383
            .iter()
64✔
384
            .cloned()
64✔
385
            .partition::<Vec<_>, _>(|n| new_neighbours.contains(n));
64✔
386
        // Only retain the peers that aren't already added
64✔
387
        new_neighbours.retain(|n| !intersection.contains(n));
98✔
388
        self.neighbours.retain(|n| intersection.contains(n));
64✔
389

64✔
390
        debug!(
64✔
391
            target: LOG_TARGET,
×
392
            "Adding {} neighbouring peer(s), removing {} peers: {}",
×
393
            new_neighbours.len(),
×
394
            difference.len(),
×
395
            new_neighbours
×
396
                .iter()
×
397
                .map(ToString::to_string)
×
398
                .collect::<Vec<_>>()
×
399
                .join(", ")
×
400
        );
401

402
        new_neighbours.iter().cloned().for_each(|peer| {
64✔
403
            self.insert_neighbour_ordered_by_distance(peer);
49✔
404
        });
64✔
405
        self.dial_multiple_peers(&new_neighbours).await?;
64✔
406

407
        Ok(())
64✔
408
    }
105✔
409

410
    async fn dial_multiple_peers(&self, peers_to_dial: &[NodeId]) -> Result<(), DhtConnectivityError> {
91✔
411
        if !peers_to_dial.is_empty() {
91✔
412
            self.connectivity.request_many_dials(peers_to_dial.to_vec()).await?;
61✔
413
        }
30✔
414

415
        Ok(())
91✔
416
    }
91✔
417

418
    async fn redial_neighbours_as_required(&mut self) -> Result<(), DhtConnectivityError> {
38✔
419
        let disconnected = self
38✔
420
            .connection_handles
38✔
421
            .iter()
38✔
422
            .filter(|c| !c.is_connected())
38✔
423
            .collect::<Vec<_>>();
38✔
424
        let to_redial = self
38✔
425
            .neighbours
38✔
426
            .iter()
38✔
427
            .filter(|n| disconnected.iter().any(|c| c.peer_node_id() == *n))
38✔
428
            .cloned()
38✔
429
            .collect::<Vec<_>>();
38✔
430

38✔
431
        if !to_redial.is_empty() {
38✔
432
            debug!(
×
433
                target: LOG_TARGET,
×
434
                "Redialling {} disconnected peer(s)",
×
435
                to_redial.len()
×
436
            );
437
            self.dial_multiple_peers(&to_redial).await?;
×
438
        }
38✔
439

440
        Ok(())
38✔
441
    }
38✔
442

443
    async fn refresh_random_pool_if_required(&mut self) -> Result<(), DhtConnectivityError> {
35✔
444
        let should_refresh = self.config.num_random_nodes > 0 &&
35✔
445
            self.random_pool_last_refresh
33✔
446
                .map(|instant| instant.elapsed() >= self.config.connectivity.random_pool_refresh_interval)
33✔
447
                .unwrap_or(true);
33✔
448
        if should_refresh {
35✔
449
            self.refresh_random_pool().await?;
33✔
450
        }
2✔
451

452
        Ok(())
35✔
453
    }
35✔
454

455
    async fn refresh_random_pool(&mut self) -> Result<(), DhtConnectivityError> {
65✔
456
        self.remove_unmanaged_peers_from_pools().await?;
65✔
457
        let mut exclude = self.neighbours.clone();
65✔
458
        if self.config.minimize_connections {
65✔
459
            exclude.extend(self.previous_random.iter().cloned());
×
460
        }
65✔
461
        let mut random_peers = self.fetch_random_peers(self.config.num_random_nodes, &exclude).await?;
65✔
462
        if random_peers.is_empty() {
65✔
463
            info!(
52✔
464
                target: LOG_TARGET,
×
465
                "Unable to refresh random peer pool because there are insufficient known peers",
×
466
            );
467
            return Ok(());
52✔
468
        }
13✔
469

13✔
470
        let (intersection, difference) = self
13✔
471
            .random_pool
13✔
472
            .drain(..)
13✔
473
            .partition::<Vec<_>, _>(|n| random_peers.contains(n));
13✔
474
        // Remove the peers that we want to keep from the `random_peers` to be added
13✔
475
        random_peers.retain(|n| !intersection.contains(n));
14✔
476
        self.random_pool = intersection;
13✔
477
        debug!(
13✔
478
            target: LOG_TARGET,
×
479
            "Adding new peers to random peer pool (#new = {}, #keeping = {}, #removing = {})",
×
480
            random_peers.len(),
×
481
            self.random_pool.len(),
×
482
            difference.len()
×
483
        );
484
        trace!(
13✔
485
            target: LOG_TARGET,
×
486
            "Random peers: Adding = {random_peers:?}, Removing = {difference:?}"
×
487

488
        );
489
        for peer in &random_peers {
27✔
490
            self.insert_random_peer_ordered_by_distance(peer.clone());
14✔
491
        }
14✔
492
        // Drop any connection handles that removed from the random pool
493
        difference.iter().for_each(|peer| {
13✔
494
            self.remove_connection_handle(peer);
×
495
        });
13✔
496
        self.dial_multiple_peers(&random_peers).await?;
13✔
497

498
        self.random_pool_last_refresh = Some(Instant::now());
13✔
499
        Ok(())
13✔
500
    }
65✔
501

502
    async fn handle_new_peer_connected(&mut self, conn: PeerConnection) -> Result<(), DhtConnectivityError> {
58✔
503
        self.remove_unmanaged_peers_from_pools().await?;
58✔
504
        if conn.peer_features().is_client() {
58✔
505
            debug!(
2✔
506
                target: LOG_TARGET,
×
507
                "Client node '{}' connected",
×
508
                conn.peer_node_id().short_str()
×
509
            );
510
            return Ok(());
2✔
511
        }
56✔
512

56✔
513
        if self.is_allow_list_peer(conn.peer_node_id()).await? {
56✔
514
            debug!(
×
515
                target: LOG_TARGET,
×
516
                "Unmanaged peer '{}' connected",
×
517
                conn.peer_node_id()
×
518
            );
519
            return Ok(());
×
520
        }
56✔
521

56✔
522
        if self.is_pool_peer(conn.peer_node_id()) {
56✔
523
            debug!(
35✔
524
                target: LOG_TARGET,
×
525
                "Added pool peer '{}' to connection handles",
×
526
                conn.peer_node_id()
×
527
            );
528
            self.insert_connection_handle(conn);
35✔
529
            return Ok(());
35✔
530
        }
21✔
531

21✔
532
        let current_dist = conn.peer_node_id().distance(self.node_identity.node_id());
21✔
533
        let neighbour_distance = self.get_neighbour_max_distance();
21✔
534
        if current_dist < neighbour_distance {
21✔
535
            debug!(
21✔
536
                target: LOG_TARGET,
×
537
                "Peer '{}' connected that is closer than any current neighbour. Adding to neighbours.",
×
538
                conn.peer_node_id().short_str()
×
539
            );
540

541
            let peer_to_insert = conn.peer_node_id().clone();
21✔
542
            if let Some(node_id) = self.insert_neighbour_ordered_by_distance(peer_to_insert.clone()) {
21✔
543
                // If we kicked a neighbour out of our neighbour pool, add it to the random pool if
544
                // it is not full or if it is closer than the furthest random peer.
545
                debug!(
×
546
                    target: LOG_TARGET,
×
547
                    "Moving peer '{peer_to_insert}' from neighbouring pool to random pool if not full or closer"
×
548
                );
549
                self.insert_random_peer_ordered_by_distance(node_id)
×
550
            }
21✔
551
            self.insert_connection_handle(conn);
21✔
552
        }
×
553

554
        Ok(())
21✔
555
    }
58✔
556

557
    async fn pool_peers_with_active_connections_by_distance(&self) -> Result<Vec<Peer>, DhtConnectivityError> {
×
558
        let peer_list = self
×
559
            .connection_handles
×
560
            .iter()
×
561
            .map(|conn| conn.peer_node_id())
×
562
            .cloned()
×
563
            .collect::<Vec<_>>();
×
564
        let mut peers_by_distance = self.peer_manager.get_peers_by_node_ids(&peer_list).await?;
×
565
        peers_by_distance.sort_by_key(|a| a.node_id.distance(self.node_identity.node_id()));
×
566

×
567
        debug!(
×
568
            target: LOG_TARGET,
×
569
            "minimize_connections: Filtered peers: {}, Handles: {}",
×
570
            peers_by_distance.len(),
×
571
            self.connection_handles.len(),
×
572
        );
573
        Ok(peers_by_distance)
×
574
    }
×
575

576
    async fn minimize_connections(&mut self) -> Result<(), DhtConnectivityError> {
×
577
        // Retrieve all communication node peers with an active connection status
578
        let mut peers_by_distance = self.pool_peers_with_active_connections_by_distance().await?;
×
579
        let peer_allow_list = self.peer_allow_list().await?;
×
580
        peers_by_distance.retain(|p| !peer_allow_list.contains(&p.node_id));
×
581

×
582
        // Remove all above threshold connections
×
583
        let threshold = self.config.num_neighbouring_nodes + self.config.num_random_nodes;
×
584
        for peer in peers_by_distance.iter_mut().skip(threshold) {
×
585
            debug!(
×
586
                target: LOG_TARGET,
×
587
                "minimize_connections: Disconnecting '{}' because the node is not among the {} closest peers",
×
588
                peer.node_id,
589
                threshold
590
            );
591
            self.replace_pool_peer(&peer.node_id).await?;
×
592
            self.remove_connection_handle(&peer.node_id);
×
593
        }
594

595
        Ok(())
×
596
    }
×
597

598
    fn insert_connection_handle(&mut self, conn: PeerConnection) {
56✔
599
        // Remove any existing connection for this peer
56✔
600
        self.remove_connection_handle(conn.peer_node_id());
56✔
601
        trace!(target: LOG_TARGET, "Insert new peer connection {conn}" );
56✔
602
        self.connection_handles.push(conn);
56✔
603
    }
56✔
604

605
    fn remove_connection_handle(&mut self, node_id: &NodeId) {
65✔
606
        if let Some(idx) = self.connection_handles.iter().position(|c| c.peer_node_id() == node_id) {
65✔
607
            let conn = self.connection_handles.swap_remove(idx);
×
608
            trace!(target: LOG_TARGET, "Removing peer connection {conn}" );
×
609
        }
65✔
610
    }
65✔
611

612
    async fn handle_connectivity_event(&mut self, event: ConnectivityEvent) -> Result<(), DhtConnectivityError> {
151✔
613
        #[allow(clippy::enum_glob_use)]
614
        use ConnectivityEvent::*;
615
        debug!(target: LOG_TARGET, "Connectivity event: {event}");
151✔
616
        match event {
151✔
617
            PeerConnected(conn) => {
58✔
618
                self.handle_new_peer_connected(*conn.clone()).await?;
58✔
619
                trace!(
58✔
620
                    target: LOG_TARGET,
×
621
                    "Peer: node_id '{}', allow_list '{}', connected '{}'",
×
622
                    conn.peer_node_id(),
×
623
                    self.is_allow_list_peer(conn.peer_node_id()).await?,
×
624
                    conn.is_connected(),
×
625
                );
626

627
                if self.config.minimize_connections {
58✔
628
                    self.minimize_connections().await?;
×
629
                }
58✔
630
            },
631
            PeerConnectFailed(node_id) => {
3✔
632
                self.connection_handles.retain(|c| *c.peer_node_id() != node_id);
3✔
633
                if self.metrics_collector.clear_metrics(node_id.clone()).await.is_err() {
3✔
634
                    debug!(
×
635
                        target: LOG_TARGET,
×
636
                        "Failed to clear metrics for peer `{node_id}`. Metric collector is shut down."
×
637
                    );
638
                };
3✔
639
                self.remove_unmanaged_peers_from_pools().await?;
3✔
640
                if !self.is_pool_peer(&node_id) {
3✔
641
                    debug!(target: LOG_TARGET, "{node_id} is not managed by the DHT. Ignoring");
×
642
                    return Ok(());
×
643
                }
3✔
644
                self.replace_pool_peer(&node_id).await?;
3✔
645
                self.log_status();
3✔
646
            },
647
            PeerDisconnected(node_id, minimized) => {
17✔
648
                debug!(
17✔
649
                    target: LOG_TARGET,
×
650
                    "Peer: node_id '{}', allow_list '{}', connected 'false'",
×
651
                    node_id,
×
652
                    self.is_allow_list_peer(&node_id).await?,
×
653
                );
654
                self.connection_handles.retain(|c| *c.peer_node_id() != node_id);
29✔
655
                if self.metrics_collector.clear_metrics(node_id.clone()).await.is_err() {
17✔
656
                    debug!(
×
657
                        target: LOG_TARGET,
×
658
                        "Failed to clear metrics for peer `{node_id}`. Metric collector is shut down."
×
659
                    );
660
                };
17✔
661
                self.remove_unmanaged_peers_from_pools().await?;
17✔
662
                if !self.is_pool_peer(&node_id) {
17✔
663
                    debug!(target: LOG_TARGET, "{node_id} is not managed by the DHT. Ignoring");
×
664
                    return Ok(());
×
665
                }
17✔
666
                if minimized == Minimized::Yes || self.config.minimize_connections {
17✔
667
                    debug!(
4✔
668
                        target: LOG_TARGET,
×
669
                        "Peer '{node_id}' was disconnected because it was minimized, will not reconnect."
×
670

671
                    );
672
                    // Remove from managed pool if applicable
673
                    self.replace_pool_peer(&node_id).await?;
4✔
674
                    // In case the connections was not managed, remove the connection handle
675
                    self.remove_connection_handle(&node_id);
2✔
676
                    return Ok(());
2✔
677
                }
13✔
678
                debug!(target: LOG_TARGET, "Pool peer {node_id} disconnected. Redialling...");
13✔
679
                // Attempt to reestablish the lost connection to the pool peer. If reconnection fails,
680
                // it is replaced with another peer (replace_pool_peer via PeerConnectFailed)
681
                self.dial_multiple_peers(&[node_id]).await?;
13✔
682
            },
683
            ConnectivityStateOnline(n) => {
32✔
684
                self.refresh_peer_pools(false).await?;
32✔
685
                if self.config.auto_join && self.should_send_join() {
32✔
686
                    debug!(
×
687
                        target: LOG_TARGET,
×
688
                        "Node is online ({n} peer(s) connected). Sending network join message."
×
689
                    );
690
                    self.dht_requester
×
691
                        .send_join()
×
692
                        .await
×
693
                        .map_err(DhtConnectivityError::SendJoinFailed)?;
×
694

695
                    self.stats.mark_join_sent();
×
696
                }
32✔
697
            },
698
            ConnectivityStateOffline => {
699
                debug!(target: LOG_TARGET, "Node is OFFLINE");
5✔
700
                tokio::time::sleep(Duration::from_secs(15)).await;
5✔
701
                self.refresh_peer_pools(true).await?;
×
702
            },
703
            _ => {},
36✔
704
        }
705

706
        Ok(())
142✔
707
    }
146✔
708

709
    async fn peer_allow_list(&mut self) -> Result<Vec<NodeId>, DhtConnectivityError> {
492✔
710
        Ok(self.connectivity.get_allow_list().await?)
492✔
711
    }
492✔
712

713
    async fn all_connected_comms_nodes(&mut self) -> Result<Vec<NodeId>, DhtConnectivityError> {
×
714
        let all_connections = self
×
715
            .connectivity
×
716
            .select_connections(ConnectivitySelection::closest_to(
×
717
                self.node_identity.node_id().clone(),
×
718
                usize::MAX,
×
719
                vec![],
×
720
            ))
×
721
            .await?;
×
722
        let comms_nodes = all_connections
×
723
            .iter()
×
724
            .filter(|p| p.peer_features().is_node())
×
725
            .map(|p| p.peer_node_id().clone())
×
726
            .collect();
×
727
        Ok(comms_nodes)
×
728
    }
×
729

730
    async fn replace_pool_peer(&mut self, current_peer: &NodeId) -> Result<(), DhtConnectivityError> {
7✔
731
        self.remove_unmanaged_peers_from_pools().await?;
7✔
732
        if self.is_allow_list_peer(current_peer).await? {
7✔
733
            debug!(
×
734
                target: LOG_TARGET,
×
735
                "Peer '{current_peer}' is on the allow list, ignoring replacement."
×
736

737
            );
738
            return Ok(());
×
739
        }
7✔
740

7✔
741
        if self.random_pool.contains(current_peer) {
7✔
742
            let mut exclude = self.get_pool_peers();
2✔
743
            if self.config.minimize_connections {
2✔
744
                exclude.extend(self.previous_random.iter().cloned());
×
745
                self.previous_random.push(current_peer.clone());
×
746
            }
2✔
747

748
            self.random_pool.retain(|n| n != current_peer);
2✔
749
            self.remove_connection_handle(current_peer);
2✔
750

2✔
751
            debug!(
2✔
752
                target: LOG_TARGET,
×
753
                "Peer '{current_peer}' in random pool is unavailable. Adding a new random peer if possible"
×
754
            );
755
            match self.fetch_random_peers(1, &exclude).await?.pop() {
2✔
756
                Some(new_peer) => {
×
757
                    self.insert_random_peer_ordered_by_distance(new_peer.clone());
×
758
                    self.dial_multiple_peers(&[new_peer]).await?;
×
759
                },
760
                None => {
761
                    debug!(
2✔
762
                        target: LOG_TARGET,
×
763
                        "Unable to fetch new random peer to replace disconnected peer '{}' because not enough peers \
×
764
                         are known. Random pool size is {}.",
×
765
                        current_peer,
×
766
                        self.random_pool.len()
×
767
                    );
768
                },
769
            }
770
        }
5✔
771

772
        if self.neighbours.contains(current_peer) {
7✔
773
            let exclude = self.get_pool_peers();
5✔
774

5✔
775
            self.neighbours.retain(|n| n != current_peer);
11✔
776
            self.remove_connection_handle(current_peer);
5✔
777

5✔
778
            debug!(
5✔
779
                target: LOG_TARGET,
×
780
                "Peer '{current_peer}' in neighbour pool is offline. Adding a new peer if possible"
×
781
            );
782
            match self.fetch_neighbouring_peers(1, &exclude, false).await?.pop() {
5✔
783
                Some(new_peer) => {
1✔
784
                    self.insert_neighbour_ordered_by_distance(new_peer.clone());
1✔
785
                    self.dial_multiple_peers(&[new_peer]).await?;
1✔
786
                },
787
                None => {
788
                    info!(
2✔
789
                        target: LOG_TARGET,
×
790
                        "Unable to fetch new neighbouring peer to replace disconnected peer '{}'. Neighbour pool size \
×
791
                         is {}.",
×
792
                        current_peer,
×
793
                        self.neighbours.len()
×
794
                    );
795
                },
796
            }
797
        }
2✔
798

799
        self.log_status();
5✔
800

5✔
801
        Ok(())
5✔
802
    }
7✔
803

804
    fn insert_neighbour_ordered_by_distance(&mut self, node_id: NodeId) -> Option<NodeId> {
81✔
805
        let dist = node_id.distance(self.node_identity.node_id());
81✔
806
        let pos = self
81✔
807
            .neighbours
81✔
808
            .iter()
81✔
809
            .position(|node_id| node_id.distance(self.node_identity.node_id()) > dist);
88✔
810

81✔
811
        match pos {
81✔
812
            Some(idx) => {
12✔
813
                self.neighbours.insert(idx, node_id);
12✔
814
            },
12✔
815
            None => {
69✔
816
                self.neighbours.push(node_id);
69✔
817
            },
69✔
818
        }
819

820
        if self.neighbours.len() > self.config.num_neighbouring_nodes {
81✔
821
            self.neighbours.pop()
2✔
822
        } else {
823
            None
79✔
824
        }
825
    }
81✔
826

827
    fn insert_random_peer_ordered_by_distance(&mut self, node_id: NodeId) {
14✔
828
        let dist = node_id.distance(self.node_identity.node_id());
14✔
829
        let pos = self
14✔
830
            .random_pool
14✔
831
            .iter()
14✔
832
            .position(|node_id| node_id.distance(self.node_identity.node_id()) > dist);
14✔
833

14✔
834
        match pos {
14✔
835
            Some(idx) => {
1✔
836
                self.random_pool.insert(idx, node_id);
1✔
837
            },
1✔
838
            None => {
13✔
839
                self.random_pool.push(node_id);
13✔
840
            },
13✔
841
        }
842

843
        if self.random_pool.len() > self.config.num_random_nodes {
14✔
844
            if let Some(removed_peer) = self.random_pool.pop() {
×
845
                if self.config.minimize_connections {
×
846
                    self.previous_random.push(removed_peer.clone());
×
847
                }
×
848
            }
×
849
        }
14✔
850
    }
14✔
851

852
    async fn remove_unmanaged_peers_from_pools(&mut self) -> Result<(), DhtConnectivityError> {
255✔
853
        self.remove_allow_list_peers_from_pools().await?;
255✔
854
        self.remove_exlcuded_peers_from_pools().await
252✔
855
    }
255✔
856

857
    async fn remove_allow_list_peers_from_pools(&mut self) -> Result<(), DhtConnectivityError> {
255✔
858
        let allow_list = self.peer_allow_list().await?;
255✔
859
        self.neighbours.retain(|n| !allow_list.contains(n));
252✔
860
        self.random_pool.retain(|n| !allow_list.contains(n));
252✔
861
        Ok(())
252✔
862
    }
255✔
863

864
    async fn remove_exlcuded_peers_from_pools(&mut self) -> Result<(), DhtConnectivityError> {
252✔
865
        if !self.config.excluded_dial_addresses.is_empty() {
252✔
866
            let mut neighbours = Vec::with_capacity(self.neighbours.len());
×
867
            for peer in &self.neighbours {
×
868
                if let Ok(addresses) = self.peer_manager.get_peer_multi_addresses(peer).await {
×
869
                    if !addresses.iter().all(|addr| {
×
870
                        self.config
×
871
                            .excluded_dial_addresses
×
872
                            .iter()
×
873
                            .any(|v| v.contains(addr.address()))
×
874
                    }) {
×
875
                        neighbours.push(peer.clone());
×
876
                    }
×
877
                }
×
878
            }
879
            self.neighbours = neighbours;
×
880

×
881
            let mut random_pool = Vec::with_capacity(self.random_pool.len());
×
882
            for peer in &self.random_pool {
×
883
                if let Ok(addresses) = self.peer_manager.get_peer_multi_addresses(peer).await {
×
884
                    if !addresses.iter().all(|addr| {
×
885
                        self.config
×
886
                            .excluded_dial_addresses
×
887
                            .iter()
×
888
                            .any(|v| v.contains(addr.address()))
×
889
                    }) {
×
890
                        random_pool.push(peer.clone());
×
891
                    }
×
892
                }
×
893
            }
894
            self.random_pool = random_pool;
×
895
        }
252✔
896
        Ok(())
252✔
897
    }
252✔
898

899
    async fn is_allow_list_peer(&mut self, node_id: &NodeId) -> Result<bool, DhtConnectivityError> {
63✔
900
        Ok(self.peer_allow_list().await?.contains(node_id))
63✔
901
    }
63✔
902

903
    fn is_pool_peer(&self, node_id: &NodeId) -> bool {
76✔
904
        self.neighbours.contains(node_id) || self.random_pool.contains(node_id)
76✔
905
    }
76✔
906

907
    fn get_pool_peers(&self) -> Vec<NodeId> {
7✔
908
        self.neighbours.iter().chain(self.random_pool.iter()).cloned().collect()
7✔
909
    }
7✔
910

911
    fn get_neighbour_max_distance(&self) -> NodeDistance {
126✔
912
        assert!(
126✔
913
            self.config.num_neighbouring_nodes > 0,
126✔
914
            "DhtConfig::num_neighbouring_nodes must be greater than zero"
×
915
        );
916

917
        if self.neighbours.len() < self.config.num_neighbouring_nodes {
126✔
918
            return NodeDistance::max_distance();
123✔
919
        }
3✔
920

3✔
921
        self.neighbours
3✔
922
            .last()
3✔
923
            .map(|node_id| node_id.distance(self.node_identity.node_id()))
3✔
924
            .expect("already checked")
3✔
925
    }
126✔
926

927
    async fn max_neighbour_distance_all_conncetions(&mut self) -> Result<NodeDistance, DhtConnectivityError> {
105✔
928
        let mut distance = self.get_neighbour_max_distance();
105✔
929
        if self.config.minimize_connections {
105✔
930
            let all_connected_comms_nodes = self.all_connected_comms_nodes().await?;
×
931
            if let Some(node_id) = all_connected_comms_nodes.get(self.config.num_neighbouring_nodes - 1) {
×
932
                let node_distance = self.node_identity.node_id().distance(node_id);
×
933
                if node_distance < distance {
×
934
                    distance = node_distance;
×
935
                }
×
936
            }
×
937
        }
105✔
938
        Ok(distance)
105✔
939
    }
105✔
940

941
    async fn fetch_neighbouring_peers(
107✔
942
        &mut self,
107✔
943
        n: usize,
107✔
944
        excluded: &[NodeId],
107✔
945
        try_revive_connections: bool,
107✔
946
    ) -> Result<Vec<NodeId>, DhtConnectivityError> {
107✔
947
        let peer_allow_list = self.peer_allow_list().await?;
107✔
948
        let neighbour_distance = self.max_neighbour_distance_all_conncetions().await?;
105✔
949
        let self_node_id = self.node_identity.node_id();
105✔
950
        let mut excluded = excluded.to_vec();
105✔
951

105✔
952
        // Exclude allowlist and already-connected peers at DB level
105✔
953
        excluded.extend(peer_allow_list);
105✔
954
        excluded.extend(self.connected_pool_peers_iter().cloned());
105✔
955

956
        // Set query options based on whether we're reviving
957
        let (stale_peer_threshold, exclude_if_all_address_failed) = if try_revive_connections {
105✔
958
            (Some(STALE_PEER_THRESHOLD_DURATION), false)
35✔
959
        } else {
960
            (Some(self.config.offline_peer_cooldown), true)
70✔
961
        };
962

963
        // Apply optional distance constraint only if minimize_connections is enabled
964
        let exclusion_distance = if self.config.minimize_connections {
105✔
965
            Some(neighbour_distance)
×
966
        } else {
967
            None
105✔
968
        };
969

970
        // Fetch n nearest neighbour Communication Nodes which are eligible for connection.
971
        // Currently, that means:
972
        // - the peer isn't banned;
973
        // - it has the required feature;
974
        // - it didn't recently fail to connect; and
975
        // - it is not in the exclusion list in closest_request.
976
        let peers = self
105✔
977
            .peer_manager
105✔
978
            .closest_n_active_peers(
105✔
979
                self_node_id,
105✔
980
                n,
105✔
981
                &excluded,
105✔
982
                Some(PeerFeatures::COMMUNICATION_NODE),
105✔
983
                None,
105✔
984
                stale_peer_threshold,
105✔
985
                exclude_if_all_address_failed,
105✔
986
                exclusion_distance,
105✔
987
                true,
105✔
988
            )
105✔
989
            .await?;
105✔
990

991
        // Return up to n node IDs
992
        Ok(peers.into_iter().map(|p| p.node_id).collect())
105✔
993
    }
107✔
994

995
    async fn fetch_random_peers(&mut self, n: usize, excluded: &[NodeId]) -> Result<Vec<NodeId>, DhtConnectivityError> {
67✔
996
        let mut excluded = excluded.to_vec();
67✔
997
        excluded.extend(self.peer_allow_list().await?);
67✔
998
        let peers = self.peer_manager.random_peers(n, &excluded, None).await?;
67✔
999
        Ok(peers.into_iter().map(|p| p.node_id).collect())
67✔
1000
    }
67✔
1001

1002
    fn should_send_join(&self) -> bool {
×
1003
        let cooldown = self.config.join_cooldown_interval;
×
1004
        self.stats
×
1005
            .join_last_sent_at()
×
1006
            .map(|at| at.elapsed() > cooldown)
×
1007
            .unwrap_or(true)
×
1008
    }
×
1009
}
1010

1011
/// Basic connectivity stats. Right now, it is only used to track the last time a join message was sent to prevent the
1012
/// node spamming the network if local connectivity changes.
1013
#[derive(Debug, Default)]
1014
struct Stats {
1015
    join_last_sent_at: Option<Instant>,
1016
}
1017

1018
impl Stats {
1019
    pub fn new() -> Self {
39✔
1020
        Default::default()
39✔
1021
    }
39✔
1022

1023
    pub fn join_last_sent_at(&self) -> Option<Instant> {
×
1024
        self.join_last_sent_at
×
1025
    }
×
1026

1027
    pub fn mark_join_sent(&mut self) {
×
1028
        self.join_last_sent_at = Some(Instant::now());
×
1029
    }
×
1030
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc