• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 17297453740

28 Aug 2025 01:33PM UTC coverage: 61.046% (+0.9%) from 60.14%
17297453740

push

github

web-flow
chore(ci): add a wasm build step in ci (#7448)

Description
Add a wasm build step in ci

Motivation and Context
Test the wasm builds

How Has This Been Tested?
Builds in local fork


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

* **Chores**
* Added CI workflow to build WebAssembly targets with optimized caching
on both hosted and self-hosted runners, improving build consistency and
speed.
* **Tests**
* Expanded automated checks to include WebAssembly build verification
for multiple modules, increasing coverage and early detection of build
issues.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

72582 of 118897 relevant lines covered (61.05%)

301536.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.21
/comms/core/src/connectivity/manager.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22
use std::{
23
    collections::HashMap,
24
    fmt,
25
    sync::Arc,
26
    time::{Duration, Instant},
27
};
28

29
use log::*;
30
use nom::lib::std::collections::hash_map::Entry;
31
use tari_shutdown::ShutdownSignal;
32
use tokio::{
33
    sync::{mpsc, oneshot},
34
    task::JoinHandle,
35
    time,
36
    time::MissedTickBehavior,
37
};
38
use tracing::{span, Instrument, Level};
39

40
use super::{
41
    config::ConnectivityConfig,
42
    connection_pool::{ConnectionPool, ConnectionStatus},
43
    connection_stats::PeerConnectionStats,
44
    error::ConnectivityError,
45
    proactive_dialer::ProactiveDialer,
46
    requester::{ConnectivityEvent, ConnectivityRequest},
47
    selection::ConnectivitySelection,
48
    ConnectivityEventTx,
49
};
50
use crate::{
51
    connection_manager::{
52
        ConnectionDirection,
53
        ConnectionManagerError,
54
        ConnectionManagerEvent,
55
        ConnectionManagerRequester,
56
    },
57
    peer_manager::NodeId,
58
    utils::datetime::format_duration,
59
    Minimized,
60
    NodeIdentity,
61
    PeerConnection,
62
    PeerConnectionError,
63
    PeerManager,
64
};
65

66
const LOG_TARGET: &str = "comms::connectivity::manager";
67

68
// Maximum time allowed for refreshing the connection pool
69
const POOL_REFRESH_TIMEOUT: Duration = Duration::from_millis(2500);
70
// Maximum time allowed to disconnect a single peer
71
const PEER_DISCONNECT_TIMEOUT: Duration = Duration::from_millis(250);
72
// Warning threshold for request processing time
73
const ACCEPTABLE_CONNECTIVITY_REQUEST_PROCESSING_TIME: Duration = Duration::from_millis(500);
74
// Warning threshold for event processing time
75
const ACCEPTABLE_EVENT_PROCESSING_TIME: Duration = Duration::from_millis(500);
76

77
/// # Connectivity Manager
78
///
79
/// The ConnectivityManager actor is responsible for tracking the state of all peer
80
/// connections in the system and maintaining a _pool_ of peer connections.
81
///
82
/// It emits [ConnectivityEvent](crate::connectivity::ConnectivityEvent)s that can keep client components
83
/// in the loop with the state of the node's connectivity.
84
pub struct ConnectivityManager {
85
    pub config: ConnectivityConfig,
86
    pub request_rx: mpsc::Receiver<ConnectivityRequest>,
87
    pub event_tx: ConnectivityEventTx,
88
    pub connection_manager: ConnectionManagerRequester,
89
    pub peer_manager: Arc<PeerManager>,
90
    pub node_identity: Arc<NodeIdentity>,
91
    pub shutdown_signal: ShutdownSignal,
92
}
93

94
impl ConnectivityManager {
95
    pub fn spawn(self) -> JoinHandle<()> {
71✔
96
        let proactive_dialer = ProactiveDialer::new(
71✔
97
            self.config,
71✔
98
            self.connection_manager.clone(),
71✔
99
            self.peer_manager.clone(),
71✔
100
            self.node_identity.clone(),
71✔
101
        );
71✔
102

71✔
103
        ConnectivityManagerActor {
71✔
104
            config: self.config,
71✔
105
            status: ConnectivityStatus::Initializing,
71✔
106
            request_rx: self.request_rx,
71✔
107
            connection_manager: self.connection_manager,
71✔
108
            peer_manager: self.peer_manager.clone(),
71✔
109
            event_tx: self.event_tx,
71✔
110
            connection_stats: HashMap::new(),
71✔
111
            node_identity: self.node_identity,
71✔
112
            pool: ConnectionPool::new(),
71✔
113
            shutdown_signal: self.shutdown_signal,
71✔
114
            #[cfg(feature = "metrics")]
71✔
115
            uptime: Some(Instant::now()),
71✔
116
            allow_list: vec![],
71✔
117
            proactive_dialer,
71✔
118
            seeds: vec![],
71✔
119
        }
71✔
120
        .spawn()
71✔
121
    }
71✔
122
}
123

124
/// Node connectivity status.
125
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
126
pub enum ConnectivityStatus {
127
    /// Initial connectivity status before the Connectivity actor has initialized.
128
    #[default]
129
    Initializing,
130
    /// Connectivity is online.
131
    Online(usize),
132
    /// Connectivity is less than the required minimum, but some connections are still active.
133
    Degraded(usize),
134
    /// There are no active connections.
135
    Offline,
136
}
137

138
impl ConnectivityStatus {
139
    is_fn!(is_initializing, ConnectivityStatus::Initializing);
140

141
    is_fn!(is_online, ConnectivityStatus::Online(_));
142

143
    is_fn!(is_offline, ConnectivityStatus::Offline);
144

145
    is_fn!(is_degraded, ConnectivityStatus::Degraded(_));
146

147
    pub fn num_connected_nodes(&self) -> usize {
4✔
148
        use ConnectivityStatus::{Degraded, Initializing, Offline, Online};
149
        match self {
4✔
150
            Initializing | Offline => 0,
4✔
151
            Online(n) | Degraded(n) => *n,
×
152
        }
153
    }
4✔
154
}
155

156
impl fmt::Display for ConnectivityStatus {
157
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
158
        write!(f, "{self:?}")
×
159
    }
×
160
}
161

162
struct ConnectivityManagerActor {
163
    config: ConnectivityConfig,
164
    status: ConnectivityStatus,
165
    request_rx: mpsc::Receiver<ConnectivityRequest>,
166
    connection_manager: ConnectionManagerRequester,
167
    node_identity: Arc<NodeIdentity>,
168
    peer_manager: Arc<PeerManager>,
169
    event_tx: ConnectivityEventTx,
170
    connection_stats: HashMap<NodeId, PeerConnectionStats>,
171
    pool: ConnectionPool,
172
    shutdown_signal: ShutdownSignal,
173
    #[cfg(feature = "metrics")]
174
    uptime: Option<Instant>,
175
    allow_list: Vec<NodeId>,
176
    proactive_dialer: ProactiveDialer,
177
    seeds: Vec<NodeId>,
178
}
179

180
impl ConnectivityManagerActor {
181
    pub fn spawn(self) -> JoinHandle<()> {
71✔
182
        tokio::spawn(async { Self::run(self).await })
71✔
183
    }
71✔
184

185
    pub async fn run(mut self) {
71✔
186
        debug!(target: LOG_TARGET, "ConnectivityManager started");
71✔
187

188
        let mut connection_manager_events = self.connection_manager.get_event_subscription();
71✔
189

71✔
190
        let interval = self.config.connection_pool_refresh_interval;
71✔
191
        let mut connection_pool_timer = time::interval_at(
71✔
192
            Instant::now()
71✔
193
                .checked_add(interval)
71✔
194
                .expect("connection_pool_refresh_interval cause overflow")
71✔
195
                .into(),
71✔
196
            interval,
71✔
197
        );
71✔
198
        connection_pool_timer.set_missed_tick_behavior(MissedTickBehavior::Delay);
71✔
199

71✔
200
        self.publish_event(ConnectivityEvent::ConnectivityStateInitialized);
71✔
201

202
        self.seeds = self
71✔
203
            .peer_manager
71✔
204
            .get_seed_peers()
71✔
205
            .await
71✔
206
            .unwrap_or({
207
                warn!(target: LOG_TARGET, "Failed to get seed peers from PeerManager, using empty list");
71✔
208
                vec![]
71✔
209
            })
71✔
210
            .iter()
71✔
211
            .map(|s| s.node_id.clone())
71✔
212
            .collect();
71✔
213

214
        loop {
215
            tokio::select! {
1,038✔
216
                Some(req) = self.request_rx.recv() => {
1,038✔
217
                    let timer = Instant::now();
769✔
218
                    let task_id = rand::random::<u64>();
769✔
219
                    trace!(target: LOG_TARGET, "Request ({task_id}): {req:?}");
769✔
220
                    self.handle_request(req).await;
769✔
221
                    if timer.elapsed() > ACCEPTABLE_CONNECTIVITY_REQUEST_PROCESSING_TIME {
769✔
222
                        warn!(
×
223
                            target: LOG_TARGET,
×
224
                            "Request ({}) took too long to process: {:.2?}",
×
225
                            task_id,
×
226
                            format_duration(timer.elapsed())
×
227
                        );
228
                    }
769✔
229
                    trace!(target: LOG_TARGET, "Request ({task_id}) done");
769✔
230
                },
231

232
                Ok(event) = connection_manager_events.recv() => {
1,038✔
233
                    let timer = Instant::now();
189✔
234
                    let task_id = rand::random::<u64>();
189✔
235
                    trace!(target: LOG_TARGET, "Event ({task_id}): {event:?}");
189✔
236
                    if let Err(err) = self.handle_connection_manager_event(&event).await {
189✔
237
                        error!(target:LOG_TARGET, "Error handling connection manager event ({task_id}): {err:?}");
×
238
                    }
189✔
239
                    if timer.elapsed() > ACCEPTABLE_EVENT_PROCESSING_TIME {
189✔
240
                        warn!(
×
241
                            target: LOG_TARGET,
×
242
                            "Event ({}) took too long to process: {:.2?}",
×
243
                            task_id,
×
244
                            format_duration(timer.elapsed())
×
245
                        );
246
                    }
189✔
247
                    trace!(target: LOG_TARGET, "Event ({task_id}) done");
189✔
248
                },
249

250
                _ = connection_pool_timer.tick() => {
1,038✔
251
                    let task_id = rand::random::<u64>();
9✔
252
                    trace!(target: LOG_TARGET, "Pool refresh peers task ({task_id})");
9✔
253
                    self.cleanup_connection_stats();
9✔
254
                    match tokio::time::timeout(POOL_REFRESH_TIMEOUT, self.refresh_connection_pool(task_id)).await {
9✔
255
                        Ok(res) => {
9✔
256
                            if let Err(err) = res {
9✔
257
                                error!(target: LOG_TARGET, "Error refreshing connection pools ({task_id}): {err:?}");
×
258
                            }
9✔
259
                        },
260
                        Err(_) => {
261
                            warn!(
×
262
                                target: LOG_TARGET,
×
263
                                "Pool refresh task ({task_id}) timeout",
×
264
                            );
265
                        },
266
                    }
267
                    trace!(target: LOG_TARGET, "Pool refresh task ({task_id}) done" );
9✔
268
                },
269

270
                _ = self.shutdown_signal.wait() => {
1,038✔
271
                    info!(
49✔
272
                        target: LOG_TARGET,
×
273
                        "ConnectivityManager is shutting down because it received the shutdown signal"
×
274
                    );
275
                    self.disconnect_all().await;
49✔
276
                    break;
46✔
277
                }
46✔
278
            }
46✔
279
        }
46✔
280
    }
46✔
281

282
    async fn handle_request(&mut self, req: ConnectivityRequest) {
769✔
283
        #[allow(clippy::enum_glob_use)]
284
        use ConnectivityRequest::*;
285
        match req {
769✔
286
            WaitStarted(reply) => {
32✔
287
                let _ = reply.send(());
32✔
288
            },
32✔
289
            GetConnectivityStatus(reply) => {
17✔
290
                let _ = reply.send(self.status);
17✔
291
            },
17✔
292
            DialPeer { node_id, reply_tx } => {
142✔
293
                let tracing_id = tracing::Span::current().id();
142✔
294
                let span = span!(Level::TRACE, "handle_dial_peer");
142✔
295
                span.follows_from(tracing_id);
142✔
296
                self.handle_dial_peer(node_id.clone(), reply_tx).instrument(span).await;
142✔
297
            },
298
            SelectConnections(selection, reply) => {
54✔
299
                let _result = reply.send(self.select_connections(selection));
54✔
300
            },
54✔
301
            GetConnection(node_id, reply) => {
48✔
302
                let _result = reply.send(
48✔
303
                    self.pool
48✔
304
                        .get(&node_id)
48✔
305
                        .filter(|c| c.status() == ConnectionStatus::Connected)
48✔
306
                        .and_then(|c| c.connection())
48✔
307
                        .filter(|conn| conn.is_connected())
48✔
308
                        .cloned(),
48✔
309
                );
48✔
310
            },
48✔
311
            GetPeerStats(node_id, reply) => {
×
312
                let peer = match self.peer_manager.find_by_node_id(&node_id).await {
×
313
                    Ok(v) => v,
×
314
                    Err(e) => {
×
315
                        error!(target: LOG_TARGET, "Error when retrieving peer: {e:?}");
×
316
                        None
×
317
                    },
318
                };
319
                let _result = reply.send(peer);
×
320
            },
321
            GetAllConnectionStates(reply) => {
1✔
322
                let states = self.pool.all().into_iter().cloned().collect();
1✔
323
                let _result = reply.send(states);
1✔
324
            },
1✔
325
            GetMinimizeConnectionsThreshold(reply) => {
×
326
                let minimize_connections_threshold = self.config.maintain_n_closest_connections_only;
×
327
                let _result = reply.send(minimize_connections_threshold);
×
328
            },
×
329
            BanPeer(node_id, duration, reason) => {
5✔
330
                if self.allow_list.contains(&node_id) {
5✔
331
                    info!(
×
332
                        target: LOG_TARGET,
×
333
                        "Peer is excluded from being banned as it was found in the AllowList, NodeId: {node_id:?}"
×
334
                    );
335
                } else if let Err(err) = self.ban_peer(&node_id, duration, reason).await {
5✔
336
                    error!(target: LOG_TARGET, "Error when banning peer: {err:?}");
×
337
                } else {
5✔
338
                    // we banned the peer
5✔
339
                }
5✔
340
            },
341
            AddPeerToAllowList(node_id) => {
×
342
                if !self.allow_list.contains(&node_id) {
×
343
                    self.allow_list.push(node_id.clone());
×
344
                }
×
345
            },
346
            RemovePeerFromAllowList(node_id) => {
×
347
                if let Some(index) = self.allow_list.iter().position(|x| *x == node_id) {
×
348
                    self.allow_list.remove(index);
×
349
                }
×
350
            },
351
            GetAllowList(reply) => {
468✔
352
                let allow_list = self.allow_list.clone();
468✔
353
                let _result = reply.send(allow_list);
468✔
354
            },
468✔
355
            GetSeeds(reply) => {
×
356
                let seeds = self.peer_manager.get_seed_peers().await.unwrap_or_else(|e| {
×
357
                    error!(target: LOG_TARGET, "Error when retrieving seed peers: {e:?}");
×
358
                    vec![]
×
359
                });
×
360
                let _result = reply.send(seeds);
×
361
            },
×
362
            GetActiveConnections(reply) => {
2✔
363
                let _result = reply.send(
2✔
364
                    self.pool
2✔
365
                        .filter_connection_states(|s| s.is_connected())
20✔
366
                        .into_iter()
2✔
367
                        .cloned()
2✔
368
                        .collect(),
2✔
369
                );
2✔
370
            },
2✔
371
            GetNodeIdentity(reply) => {
×
372
                let identity = self.node_identity.as_ref();
×
373
                let _result = reply.send(identity.clone());
×
374
            },
×
375
        }
376
    }
769✔
377

378
    async fn handle_dial_peer(
142✔
379
        &mut self,
142✔
380
        node_id: NodeId,
142✔
381
        reply_tx: Option<oneshot::Sender<Result<PeerConnection, ConnectionManagerError>>>,
142✔
382
    ) {
142✔
383
        match self.peer_manager.is_peer_banned(&node_id).await {
142✔
384
            Ok(true) => {
385
                if let Some(reply) = reply_tx {
×
386
                    let _result = reply.send(Err(ConnectionManagerError::PeerBanned));
×
387
                }
×
388
                return;
×
389
            },
390
            Ok(false) => {},
142✔
391
            Err(err) => {
×
392
                if let Some(reply) = reply_tx {
×
393
                    let _result = reply.send(Err(err.into()));
×
394
                }
×
395
                return;
×
396
            },
397
        }
398
        match self.pool.get(&node_id) {
142✔
399
            // The connection pool may temporarily contain a connection that is not connected so we need to check this.
400
            Some(state) if state.is_connected() => {
65✔
401
                if let Some(reply_tx) = reply_tx {
54✔
402
                    let _result = reply_tx.send(Ok(state.connection().cloned().expect("Already checked")));
36✔
403
                }
36✔
404
            },
405
            maybe_state => {
88✔
406
                match maybe_state {
88✔
407
                    Some(state) => {
11✔
408
                        info!(
11✔
409
                            target: LOG_TARGET,
×
410
                            "Connection was previously attempted for peer {}. Current status is '{}'. Dialing again...",
×
411
                            node_id.short_str(),
×
412
                            state.status()
×
413
                        );
414
                    },
415
                    None => {
416
                        info!(
77✔
417
                            target: LOG_TARGET,
×
418
                            "No connection for peer {}. Dialing...",
×
419
                            node_id.short_str(),
×
420
                        );
421
                    },
422
                }
423

424
                if let Err(err) = self.connection_manager.send_dial_peer(node_id, reply_tx).await {
88✔
425
                    error!(
×
426
                        target: LOG_TARGET,
×
427
                        "Failed to send dial request to connection manager: {err:?}"
×
428
                    );
429
                }
88✔
430
            },
431
        }
432
    }
142✔
433

434
    async fn disconnect_all(&mut self) {
49✔
435
        let mut node_ids = Vec::with_capacity(self.pool.count_connected());
49✔
436
        for mut state in self.pool.filter_drain(|_| true) {
75✔
437
            if let Some(conn) = state.connection_mut() {
74✔
438
                if !conn.is_connected() {
74✔
439
                    continue;
27✔
440
                }
47✔
441
                match disconnect_silent_with_timeout(
47✔
442
                    conn,
47✔
443
                    Minimized::No,
47✔
444
                    None,
47✔
445
                    "ConnectivityManagerActor disconnect all",
47✔
446
                )
47✔
447
                .await
47✔
448
                {
449
                    Ok(_) => {
43✔
450
                        node_ids.push(conn.peer_node_id().clone());
43✔
451
                    },
43✔
452
                    Err(err) => {
1✔
453
                        debug!(
1✔
454
                            target: LOG_TARGET,
×
455
                            "In disconnect_all: Error when disconnecting peer '{}' because '{:?}'",
×
456
                            conn.peer_node_id().short_str(),
×
457
                            err
458
                        );
459
                    },
460
                }
461
            }
×
462
        }
463

464
        for node_id in node_ids {
89✔
465
            self.publish_event(ConnectivityEvent::PeerDisconnected(node_id, Minimized::No));
43✔
466
        }
43✔
467
    }
46✔
468

469
    async fn refresh_connection_pool(&mut self, task_id: u64) -> Result<(), ConnectivityError> {
9✔
470
        info!(
9✔
471
            target: LOG_TARGET,
×
472
            "CONNECTIVITY_REFRESH: Performing connection pool cleanup/refresh ({}). (#Peers = {}, #Connected={}, #Failed={}, #Disconnected={}, \
×
473
             #Clients={})",
×
474
            task_id,
×
475
            self.pool.count_entries(),
×
476
            self.pool.count_connected_nodes(),
×
477
            self.pool.count_failed(),
×
478
            self.pool.count_disconnected(),
×
479
            self.pool.count_connected_clients()
×
480
        );
481

482
        self.clean_connection_pool();
9✔
483
        if self.config.is_connection_reaping_enabled {
9✔
484
            self.reap_inactive_connections(task_id).await;
9✔
485
        }
×
486
        if let Some(threshold) = self.config.maintain_n_closest_connections_only {
9✔
487
            self.maintain_n_closest_peer_connections_only(threshold, task_id).await;
×
488
        }
9✔
489

490
        // Execute proactive dialing logic (if enabled)
491
        debug!(
9✔
492
            target: LOG_TARGET,
×
493
            "({}) Proactive dialing config check: enabled={}, target_connections={}",
×
494
            task_id,
495
            self.config.proactive_dialing_enabled,
496
            self.config.target_connection_count
497
        );
498

499
        if self.config.proactive_dialing_enabled {
9✔
500
            debug!(
9✔
501
                target: LOG_TARGET,
×
502
                "({task_id}) Executing proactive dialing logic"
×
503
            );
504
            if let Err(err) = self.execute_proactive_dialing(task_id).await {
9✔
505
                warn!(
×
506
                    target: LOG_TARGET,
×
507
                    "({task_id}) Proactive dialing failed: {err:?}"
×
508
                );
509
            }
9✔
510
        } else {
511
            debug!(
×
512
                target: LOG_TARGET,
×
513
                "({task_id}) Proactive dialing disabled in configuration"
×
514

515
            );
516
        }
517

518
        self.update_connectivity_status();
9✔
519
        self.update_connectivity_metrics();
9✔
520
        Ok(())
9✔
521
    }
9✔
522

523
    async fn maintain_n_closest_peer_connections_only(&mut self, threshold: usize, task_id: u64) {
×
524
        let start = Instant::now();
×
525
        // Select all active peer connections (that are communication nodes) with health-aware selection
×
526
        let selection = ConnectivitySelection::healthy_closest_to(
×
527
            self.node_identity.node_id().clone(),
×
528
            self.pool.count_connected_nodes(),
×
529
            vec![],
×
530
        );
×
531
        let mut connections = match self.select_connections_with_health(selection) {
×
532
            Ok(peers) => peers,
×
533
            Err(e) => {
×
534
                warn!(
×
535
                    target: LOG_TARGET,
×
536
                    "Connectivity error trying to maintain {threshold} closest peers ({task_id}) ({e:?})",
×
537
                );
538
                return;
×
539
            },
540
        };
541
        let num_connections = connections.len();
×
542

×
543
        // Remove peers that are on the allow list
×
544
        connections.retain(|conn| !self.allow_list.contains(conn.peer_node_id()));
×
545
        debug!(
×
546
            target: LOG_TARGET,
×
547
            "minimize_connections: ({}) Filtered peers: {}, Handles: {}",
×
548
            task_id,
×
549
            connections.len(),
×
550
            num_connections,
551
        );
552

553
        // Disconnect all remaining peers above the threshold
554
        let len = connections.len();
×
555
        for conn in connections.iter_mut().skip(threshold) {
×
556
            debug!(
×
557
                target: LOG_TARGET,
×
558
                "minimize_connections: ({}) Disconnecting '{}' because the node is not among the {} closest peers",
×
559
                task_id,
×
560
                conn.peer_node_id(),
×
561
                threshold
562
            );
563
            match disconnect_if_unused_with_timeout(
×
564
                conn,
×
565
                Minimized::Yes,
×
566
                Some(task_id),
×
567
                "ConnectivityManagerActor maintain closest",
×
568
            )
×
569
            .await
×
570
            {
571
                Ok(_) => {
×
572
                    self.pool.remove(conn.peer_node_id());
×
573
                },
×
574
                Err(err) => {
×
575
                    debug!(
×
576
                        target: LOG_TARGET,
×
577
                        "Peer '{}' already disconnected ({:?}). Error: {:?}",
×
578
                        conn.peer_node_id().short_str(),
×
579
                        task_id,
580
                        err
581
                    );
582
                },
583
            }
584
        }
585
        if len > 0 {
×
586
            debug!(
×
587
                "minimize_connections: ({}) Minimized {} connections in {:.2?}",
×
588
                task_id,
×
589
                len,
×
590
                start.elapsed()
×
591
            );
592
        }
×
593
    }
×
594

595
    async fn reap_inactive_connections(&mut self, task_id: u64) {
9✔
596
        let start = Instant::now();
9✔
597
        let excess_connections = self
9✔
598
            .pool
9✔
599
            .count_connected()
9✔
600
            .saturating_sub(self.config.reaper_min_connection_threshold);
9✔
601
        if excess_connections == 0 {
9✔
602
            return;
9✔
603
        }
×
604

×
605
        let mut connections = self
×
606
            .pool
×
607
            .get_inactive_outbound_connections_mut(self.config.reaper_min_inactive_age);
×
608
        connections.truncate(excess_connections);
×
609
        let mut nodes_to_remove = Vec::new();
×
610
        for conn in &mut connections {
×
611
            if !conn.is_connected() {
×
612
                continue;
×
613
            }
×
614

×
615
            debug!(
×
616
                target: LOG_TARGET,
×
617
                "({}) Disconnecting '{}' because connection was inactive ({} handles)",
×
618
                task_id,
×
619
                conn.peer_node_id().short_str(),
×
620
                conn.handle_count()
×
621
            );
622
            match disconnect_with_timeout(
×
623
                conn,
×
624
                Minimized::Yes,
×
625
                Some(task_id),
×
626
                "ConnectivityManagerActor reap inactive",
×
627
            )
×
628
            .await
×
629
            {
630
                Ok(_) => {
×
631
                    nodes_to_remove.push(conn.peer_node_id().clone());
×
632
                },
×
633
                Err(err) => {
×
634
                    debug!(
×
635
                        target: LOG_TARGET,
×
636
                        "Peer '{}' already disconnected ({:?}). Error: {:?}",
×
637
                        conn.peer_node_id().short_str(),
×
638
                        task_id,
639
                        err
640
                    );
641
                },
642
            }
643
        }
644
        let len = nodes_to_remove.len();
×
645
        if len > 0 {
×
646
            for node_id in nodes_to_remove {
×
647
                self.pool.remove(&node_id);
×
648
            }
×
649
            debug!(
×
650
                "({}) Reaped {} inactive connections in {:.2?}",
×
651
                task_id,
×
652
                len,
×
653
                start.elapsed()
×
654
            );
655
        }
×
656
    }
9✔
657

658
    fn clean_connection_pool(&mut self) {
9✔
659
        let cleared_states = self.pool.filter_drain(|state| {
9✔
660
            matches!(
9✔
661
                state.status(),
9✔
662
                ConnectionStatus::Failed | ConnectionStatus::Disconnected(_)
663
            )
664
        });
9✔
665

9✔
666
        if !cleared_states.is_empty() {
9✔
667
            debug!(
×
668
                target: LOG_TARGET,
×
669
                "Cleared connection states: {}",
×
670
                cleared_states
×
671
                    .iter()
×
672
                    .map(ToString::to_string)
×
673
                    .collect::<Vec<_>>()
×
674
                    .join(",")
×
675
            )
676
        }
9✔
677
    }
9✔
678

679
    fn select_connections(&self, selection: ConnectivitySelection) -> Result<Vec<PeerConnection>, ConnectivityError> {
54✔
680
        trace!(target: LOG_TARGET, "Selection query: {selection:?}");
54✔
681
        trace!(
54✔
682
            target: LOG_TARGET,
×
683
            "Selecting from {} connected node peers",
×
684
            self.pool.count_connected_nodes()
×
685
        );
686

687
        let conns = selection.select(&self.pool);
54✔
688
        debug!(target: LOG_TARGET, "Selected {} connections(s)", conns.len());
54✔
689

690
        Ok(conns.into_iter().cloned().collect())
54✔
691
    }
54✔
692

693
    fn select_connections_with_health(
×
694
        &self,
×
695
        selection: ConnectivitySelection,
×
696
    ) -> Result<Vec<PeerConnection>, ConnectivityError> {
×
697
        trace!(target: LOG_TARGET, "Health-aware selection query: {selection:?}");
×
698
        trace!(
×
699
            target: LOG_TARGET,
×
700
            "Selecting from {} connected node peers with health metrics",
×
701
            self.pool.count_connected_nodes()
×
702
        );
703

704
        let conns = selection.select_with_health(
×
705
            &self.pool,
×
706
            &self.connection_stats,
×
707
            self.config.success_rate_tracking_window,
×
708
        );
×
709
        debug!(target: LOG_TARGET, "Selected {} healthy connections(s)", conns.len());
×
710

711
        Ok(conns.into_iter().cloned().collect())
×
712
    }
×
713

714
    fn get_connection_stat_mut(&mut self, node_id: NodeId) -> &mut PeerConnectionStats {
136✔
715
        match self.connection_stats.entry(node_id) {
136✔
716
            Entry::Occupied(entry) => entry.into_mut(),
6✔
717
            Entry::Vacant(entry) => entry.insert(PeerConnectionStats::new()),
130✔
718
        }
719
    }
136✔
720

721
    fn mark_connection_success(&mut self, node_id: NodeId) {
133✔
722
        let entry = self.get_connection_stat_mut(node_id);
133✔
723
        entry.set_connection_success();
133✔
724

133✔
725
        // Update proactive dialing success metrics
133✔
726
    }
133✔
727

728
    fn mark_peer_failed(&mut self, node_id: NodeId) -> usize {
3✔
729
        let threshold = self.config.circuit_breaker_failure_threshold;
3✔
730
        let entry = self.get_connection_stat_mut(node_id);
3✔
731
        entry.set_connection_failed_with_threshold(threshold);
3✔
732

3✔
733
        entry.failed_attempts()
3✔
734
    }
3✔
735

736
    async fn on_peer_connection_failure(&mut self, node_id: &NodeId) -> Result<(), ConnectivityError> {
4✔
737
        if self.status.is_offline() {
4✔
738
            info!(
1✔
739
                target: LOG_TARGET,
×
740
                "Node is offline. Ignoring connection failure event for peer '{node_id}'."
×
741
            );
742
            self.publish_event(ConnectivityEvent::ConnectivityStateOffline);
1✔
743
            return Ok(());
1✔
744
        }
3✔
745

3✔
746
        let num_failed = self.mark_peer_failed(node_id.clone());
3✔
747

3✔
748
        if num_failed >= self.config.max_failures_mark_offline {
3✔
749
            debug!(
3✔
750
                target: LOG_TARGET,
×
751
                "Marking peer '{}' as offline because this node failed to connect to them {} times",
×
752
                node_id.short_str(),
×
753
                num_failed
754
            );
755

756
            if let Some(peer) = self.peer_manager.find_by_node_id(node_id).await? {
3✔
757
                if !peer.is_banned() &&
3✔
758
                    peer.last_seen_since()
3✔
759
                        // Haven't seen them in expire_peer_last_seen_duration
3✔
760
                        .map(|t| t > self.config.expire_peer_last_seen_duration)
3✔
761
                        // Or don't delete if never seen
3✔
762
                        .unwrap_or(false)
3✔
763
                {
764
                    debug!(
×
765
                        target: LOG_TARGET,
×
766
                        "Peer `{}` was marked as offline after {} attempts (last seen: {}). Removing peer from peer \
×
767
                         list",
×
768
                        node_id,
×
769
                        num_failed,
×
770
                        peer.last_seen_since()
×
771
                            .map(|d| format!("{}s ago", d.as_secs()))
×
772
                            .unwrap_or_else(|| "Never".to_string()),
×
773
                    );
774
                    self.peer_manager.soft_delete_peer(node_id).await?;
×
775
                }
3✔
776
            }
×
777
        }
×
778

779
        Ok(())
3✔
780
    }
4✔
781

782
    async fn handle_connection_manager_event(
189✔
783
        &mut self,
189✔
784
        event: &ConnectionManagerEvent,
189✔
785
    ) -> Result<(), ConnectivityError> {
189✔
786
        self.update_state_on_connectivity_event(event).await?;
189✔
787
        self.update_connectivity_status();
189✔
788
        self.update_connectivity_metrics();
189✔
789
        Ok(())
189✔
790
    }
189✔
791

792
    #[allow(clippy::too_many_lines)]
793
    async fn update_state_on_connectivity_event(
189✔
794
        &mut self,
189✔
795
        event: &ConnectionManagerEvent,
189✔
796
    ) -> Result<(), ConnectivityError> {
189✔
797
        use ConnectionManagerEvent::*;
798
        match event {
189✔
799
            PeerConnected(new_conn) => {
137✔
800
                match self.on_new_connection(new_conn).await {
137✔
801
                    TieBreak::KeepExisting => {
802
                        debug!(
4✔
803
                            target: LOG_TARGET,
×
804
                            "Discarding new connection to peer '{}' because we already have an existing connection",
×
805
                            new_conn.peer_node_id().short_str()
×
806
                        );
807
                        // Ignore event, we discarded the new connection and keeping the current one
808
                        return Ok(());
4✔
809
                    },
810
                    TieBreak::UseNew | TieBreak::None => {},
133✔
811
                }
812
            },
813
            PeerDisconnected(id, node_id, _minimized) => {
45✔
814
                if let Some(conn) = self.pool.get_connection(node_id) {
45✔
815
                    if conn.id() != *id {
45✔
816
                        debug!(
4✔
817
                            target: LOG_TARGET,
×
818
                            "Ignoring peer disconnected event for stale peer connection (id: {id}) for peer '{node_id}'"
×
819

820
                        );
821
                        return Ok(());
4✔
822
                    }
41✔
823
                }
×
824
            },
825
            PeerViolation { peer_node_id, details } => {
×
826
                self.ban_peer(
×
827
                    peer_node_id,
×
828
                    Duration::from_secs(2 * 60 * 60),
×
829
                    format!("Peer violation: {details}"),
×
830
                )
×
831
                .await?;
×
832
                return Ok(());
×
833
            },
834
            _ => {},
7✔
835
        }
836

837
        let (node_id, mut new_status, connection) = match event {
178✔
838
            PeerDisconnected(_, node_id, minimized) => (node_id, ConnectionStatus::Disconnected(*minimized), None),
41✔
839
            PeerConnected(conn) => (conn.peer_node_id(), ConnectionStatus::Connected, Some(conn.clone())),
133✔
840
            PeerConnectFailed(node_id, ConnectionManagerError::AllPeerAddressesAreExcluded(msg)) => {
×
841
                debug!(
×
842
                    target: LOG_TARGET,
×
843
                    "Peer '{node_id}' contains only excluded addresses ({msg})"
×
844

845
                );
846
                (node_id, ConnectionStatus::Failed, None)
×
847
            },
848
            PeerConnectFailed(node_id, ConnectionManagerError::NoiseHandshakeError(msg)) => {
×
849
                if let Some(conn) = self.pool.get_connection(node_id) {
×
850
                    debug!(
×
851
                        target: LOG_TARGET,
×
852
                        "Handshake error to peer '{node_id}', disconnecting for a fresh retry ({msg})"
×
853
                    );
854
                    let mut conn = conn.clone();
×
855
                    disconnect_with_timeout(
×
856
                        &mut conn,
×
857
                        Minimized::No,
×
858
                        None,
×
859
                        "ConnectivityManagerActor peer connect failed",
×
860
                    )
×
861
                    .await?;
×
862
                }
×
863
                (node_id, ConnectionStatus::Failed, None)
×
864
            },
865
            PeerConnectFailed(node_id, ConnectionManagerError::DialCancelled) => {
1✔
866
                if let Some(conn) = self.pool.get_connection(node_id) {
1✔
867
                    if conn.is_connected() && conn.direction().is_inbound() {
1✔
868
                        debug!(
1✔
869
                            target: LOG_TARGET,
×
870
                            "Ignoring DialCancelled({node_id}) event because an inbound connection already exists"
×
871
                        );
872

873
                        return Ok(());
1✔
874
                    }
×
875
                }
×
876
                debug!(
×
877
                    target: LOG_TARGET,
×
878
                    "Dial was cancelled before connection completed to peer '{node_id}'"
×
879
                );
880
                (node_id, ConnectionStatus::Failed, None)
×
881
            },
882
            PeerConnectFailed(node_id, err) => {
4✔
883
                debug!(
4✔
884
                    target: LOG_TARGET,
×
885
                    "Connection to peer '{node_id}' failed because '{err:?}'"
×
886
                );
887
                self.on_peer_connection_failure(node_id).await?;
4✔
888
                (node_id, ConnectionStatus::Failed, None)
4✔
889
            },
890
            _ => return Ok(()),
2✔
891
        };
892

893
        let old_status = self.pool.set_status(node_id, new_status);
178✔
894
        if let Some(conn) = connection {
178✔
895
            new_status = self.pool.insert_connection(*conn);
133✔
896
        }
133✔
897
        if old_status != new_status {
178✔
898
            debug!(
177✔
899
                target: LOG_TARGET,
×
900
                "Peer connection for node '{node_id}' transitioned from {old_status} to {new_status}"
×
901
            );
902
        }
1✔
903

904
        let node_id = node_id.clone();
178✔
905

906
        use ConnectionStatus::{Connected, Disconnected, Failed};
907
        match (old_status, new_status) {
178✔
908
            (_, Connected) => match self.pool.get_connection_mut(&node_id).cloned() {
133✔
909
                Some(conn) => {
133✔
910
                    self.mark_connection_success(conn.peer_node_id().clone());
133✔
911
                    self.publish_event(ConnectivityEvent::PeerConnected(conn.into()));
133✔
912
                },
133✔
913
                None => unreachable!(
×
914
                    "Connection transitioning to CONNECTED state must always have a connection set i.e. \
×
915
                     ConnectionPool::get_connection is Some"
×
916
                ),
×
917
            },
918
            (Connected, Disconnected(..)) => {
919
                self.publish_event(ConnectivityEvent::PeerDisconnected(node_id, match new_status {
36✔
920
                    ConnectionStatus::Disconnected(reason) => reason,
36✔
921
                    _ => Minimized::No,
×
922
                }));
923
            },
924
            // Was not connected so don't broadcast event
925
            (_, Disconnected(..)) => {},
5✔
926
            (_, Failed) => {
4✔
927
                self.publish_event(ConnectivityEvent::PeerConnectFailed(node_id));
4✔
928
            },
4✔
929
            _ => {
930
                error!(
×
931
                    target: LOG_TARGET,
×
932
                    "Unexpected connection status transition ({old_status} to {new_status}) for peer '{node_id}'"
×
933
                );
934
            },
935
        }
936

937
        Ok(())
178✔
938
    }
189✔
939

940
    async fn on_new_connection(&mut self, new_conn: &PeerConnection) -> TieBreak {
137✔
941
        match self.pool.get(new_conn.peer_node_id()).cloned() {
137✔
942
            Some(existing_state) if !existing_state.is_connected() => {
8✔
943
                debug!(
4✔
944
                    target: LOG_TARGET,
×
945
                    "Tie break: Existing connection (id: {}, peer: {}, direction: {}) was not connected, resolving \
×
946
                     tie break by using the new connection. (New: id: {}, peer: {}, direction: {})",
×
947
                    existing_state.connection().map(|c| c.id()).unwrap_or_default(),
×
948
                    existing_state.node_id(),
×
949
                    existing_state.connection().map(|c| c.direction().as_str()).unwrap_or("--"),
×
950
                    new_conn.id(),
×
951
                    new_conn.peer_node_id(),
×
952
                    new_conn.direction(),
×
953
                );
954
                self.pool.remove(existing_state.node_id());
4✔
955
                TieBreak::UseNew
4✔
956
            },
957
            Some(mut existing_state) => {
4✔
958
                let Some(existing_conn) = existing_state.connection_mut() else {
4✔
959
                    error!(
×
960
                        target: LOG_TARGET,
×
961
                        "INVARIANT ERROR in Tie break: PeerConnection is None but state is CONNECTED: Existing \
×
962
                        connection (id: {}, peer: {}, direction: {}), new connection. (id: {}, peer: {}, direction: {})",
×
963
                        existing_state.connection().map(|c| c.id()).unwrap_or_default(),
×
964
                        existing_state.node_id(),
×
965
                        existing_state.connection().map(|c| c.direction().as_str()).unwrap_or("--"),
×
966
                        new_conn.id(),
×
967
                        new_conn.peer_node_id(),
×
968
                        new_conn.direction(),
×
969
                    );
970
                    return TieBreak::UseNew;
×
971
                };
972
                if self.tie_break_existing_connection(existing_conn, new_conn) {
4✔
973
                    info!(
×
974
                        target: LOG_TARGET,
×
975
                        "Tie break: Keep new connection (id: {}, peer: {}, direction: {}). Disconnect existing \
×
976
                         connection (id: {}, peer: {}, direction: {})",
×
977
                        new_conn.id(),
×
978
                        new_conn.peer_node_id(),
×
979
                        new_conn.direction(),
×
980
                        existing_conn.id(),
×
981
                        existing_conn.peer_node_id(),
×
982
                        existing_conn.direction(),
×
983
                    );
984

985
                    let _result = disconnect_silent_with_timeout(
×
986
                        existing_conn,
×
987
                        Minimized::Yes,
×
988
                        None,
×
989
                        "ConnectivityManagerActor tie break",
×
990
                    )
×
991
                    .await;
×
992
                    self.pool.remove(existing_conn.peer_node_id());
×
993
                    TieBreak::UseNew
×
994
                } else {
995
                    debug!(
4✔
996
                        target: LOG_TARGET,
×
997
                        "Tie break: Keeping existing connection (id: {}, peer: {}, direction: {}). Disconnecting new \
×
998
                         connection (id: {}, peer: {}, direction: {})",
×
999
                        new_conn.id(),
×
1000
                        new_conn.peer_node_id(),
×
1001
                        new_conn.direction(),
×
1002
                        existing_conn.id(),
×
1003
                        existing_conn.peer_node_id(),
×
1004
                        existing_conn.direction(),
×
1005
                    );
1006

1007
                    let _result = disconnect_silent_with_timeout(
4✔
1008
                        &mut new_conn.clone(),
4✔
1009
                        Minimized::Yes,
4✔
1010
                        None,
4✔
1011
                        "ConnectivityManagerActor tie break",
4✔
1012
                    )
4✔
1013
                    .await;
4✔
1014
                    TieBreak::KeepExisting
4✔
1015
                }
1016
            },
1017

1018
            None => TieBreak::None,
129✔
1019
        }
1020
    }
137✔
1021

1022
    /// Two connections to the same peer have been created. This function deterministically determines which peer
1023
    /// connection to close. It does this by comparing our NodeId to that of the peer. This rule enables both sides to
1024
    /// agree which connection to disconnect
1025
    ///
1026
    /// Returns true if the existing connection should close, otherwise false if the new connection should be closed.
1027
    fn tie_break_existing_connection(&self, existing_conn: &PeerConnection, new_conn: &PeerConnection) -> bool {
4✔
1028
        debug_assert_eq!(existing_conn.peer_node_id(), new_conn.peer_node_id());
4✔
1029
        let peer_node_id = existing_conn.peer_node_id();
4✔
1030
        let our_node_id = self.node_identity.node_id();
4✔
1031

4✔
1032
        debug!(
4✔
1033
            target: LOG_TARGET,
×
1034
            "Tie-break: (Existing = {}, New = {})",
×
1035
            existing_conn.direction(),
×
1036
            new_conn.direction()
×
1037
        );
1038
        use ConnectionDirection::{Inbound, Outbound};
1039
        match (existing_conn.direction(), new_conn.direction()) {
4✔
1040
            // They connected to us twice for some reason. Drop the older connection
1041
            (Inbound, Inbound) => true,
×
1042
            // They connected to us at the same time we connected to them
1043
            (Inbound, Outbound) => peer_node_id > our_node_id,
2✔
1044
            // We connected to them at the same time as they connected to us
1045
            (Outbound, Inbound) => our_node_id > peer_node_id,
2✔
1046
            // We connected to them twice for some reason. Drop the older connection.
1047
            (Outbound, Outbound) => true,
×
1048
        }
1049
    }
4✔
1050

1051
    fn update_connectivity_status(&mut self) {
198✔
1052
        // The contract we are making with online/degraded status transitions is as follows:
198✔
1053
        // - If min_connectivity peers are connected we MUST transition to ONLINE
198✔
1054
        // - Clients SHOULD tolerate entering a DEGRADED/OFFLINE status
198✔
1055
        // - If a number of peers disconnect or the local system's network goes down, the status MAY transition to
198✔
1056
        //   DEGRADED
198✔
1057
        let min_peers = self.config.min_connectivity;
198✔
1058
        let num_connected_nodes = self.pool.count_connected_nodes();
198✔
1059
        let num_connected_clients = self.pool.count_connected_clients();
198✔
1060
        debug!(
198✔
1061
            target: LOG_TARGET,
×
1062
            "#min_peers = {min_peers}, #nodes = {num_connected_nodes}, #clients = {num_connected_clients}"
×
1063
        );
1064

1065
        match num_connected_nodes {
198✔
1066
            n if n >= min_peers => {
198✔
1067
                self.transition(ConnectivityStatus::Online(n), min_peers);
178✔
1068
            },
178✔
1069
            n if n > 0 && n < min_peers => {
20✔
1070
                self.transition(ConnectivityStatus::Degraded(n), min_peers);
3✔
1071
            },
3✔
1072
            n if n == 0 => {
17✔
1073
                if num_connected_clients == 0 {
17✔
1074
                    self.transition(ConnectivityStatus::Offline, min_peers);
13✔
1075
                } else {
13✔
1076
                    self.transition(ConnectivityStatus::Degraded(n), min_peers);
4✔
1077
                }
4✔
1078
            },
1079
            _ => unreachable!("num_connected is unsigned and only negative pattern covered on this branch"),
×
1080
        }
1081
    }
198✔
1082

1083
    #[cfg(not(feature = "metrics"))]
1084
    fn update_connectivity_metrics(&mut self) {}
1085

1086
    #[allow(clippy::cast_possible_wrap)]
1087
    #[cfg(feature = "metrics")]
1088
    fn update_connectivity_metrics(&mut self) {
198✔
1089
        use std::convert::TryFrom;
1090

1091
        use super::metrics;
1092

1093
        let total = self.pool.count_connected() as i64;
198✔
1094
        let num_inbound = self.pool.count_filtered(|state| match state.connection() {
554✔
1095
            Some(conn) => conn.is_connected() && conn.direction().is_inbound(),
554✔
1096
            None => false,
×
1097
        }) as i64;
554✔
1098

198✔
1099
        metrics::connections(ConnectionDirection::Inbound).set(num_inbound);
198✔
1100
        metrics::connections(ConnectionDirection::Outbound).set(total - num_inbound);
198✔
1101

198✔
1102
        let uptime = self
198✔
1103
            .uptime
198✔
1104
            .map(|ts| i64::try_from(ts.elapsed().as_secs()).unwrap_or(i64::MAX))
198✔
1105
            .unwrap_or(0);
198✔
1106
        metrics::uptime().set(uptime);
198✔
1107
    }
198✔
1108

1109
    fn transition(&mut self, next_status: ConnectivityStatus, required_num_peers: usize) {
198✔
1110
        use ConnectivityStatus::{Degraded, Offline, Online};
1111
        if self.status != next_status {
198✔
1112
            debug!(
164✔
1113
                target: LOG_TARGET,
×
1114
                "Connectivity status transitioning from {} to {}", self.status, next_status
×
1115
            );
1116
        }
34✔
1117

1118
        match (self.status, next_status) {
198✔
1119
            (Online(_), Online(_)) => {},
108✔
1120
            (_, Online(n)) => {
70✔
1121
                info!(
70✔
1122
                    target: LOG_TARGET,
×
1123
                    "Connectivity is ONLINE ({n}/{required_num_peers} connections)"
×
1124
                );
1125

1126
                #[cfg(feature = "metrics")]
1127
                if self.uptime.is_none() {
70✔
1128
                    self.uptime = Some(Instant::now());
1✔
1129
                }
69✔
1130
                self.publish_event(ConnectivityEvent::ConnectivityStateOnline(n));
70✔
1131
            },
1132
            (Degraded(m), Degraded(n)) => {
2✔
1133
                info!(
2✔
1134
                    target: LOG_TARGET,
×
1135
                    "Connectivity is DEGRADED ({n}/{required_num_peers} connections)"
×
1136
                );
1137
                if m != n {
2✔
1138
                    self.publish_event(ConnectivityEvent::ConnectivityStateDegraded(n));
1✔
1139
                }
1✔
1140
            },
1141
            (_, Degraded(n)) => {
5✔
1142
                info!(
5✔
1143
                    target: LOG_TARGET,
×
1144
                    "Connectivity is DEGRADED ({n}/{required_num_peers} connections)"
×
1145
                );
1146
                self.publish_event(ConnectivityEvent::ConnectivityStateDegraded(n));
5✔
1147
            },
1148
            (Offline, Offline) => {},
3✔
1149
            (_, Offline) => {
1150
                warn!(
10✔
1151
                    target: LOG_TARGET,
×
1152
                    "Connectivity is OFFLINE (0/{required_num_peers} connections)"
×
1153
                );
1154
                #[cfg(feature = "metrics")]
1155
                {
10✔
1156
                    self.uptime = None;
10✔
1157
                }
10✔
1158
                self.publish_event(ConnectivityEvent::ConnectivityStateOffline);
10✔
1159
            },
1160
            (status, next_status) => unreachable!("Unexpected status transition ({status} to {next_status})"),
×
1161
        }
1162
        self.status = next_status;
198✔
1163
    }
198✔
1164

1165
    fn publish_event(&mut self, event: ConnectivityEvent) {
379✔
1166
        // A send operation can only fail if there are no subscribers, so it is safe to ignore the error
379✔
1167
        let _result = self.event_tx.send(event);
379✔
1168
    }
379✔
1169

1170
    async fn ban_peer(
5✔
1171
        &mut self,
5✔
1172
        node_id: &NodeId,
5✔
1173
        duration: Duration,
5✔
1174
        reason: String,
5✔
1175
    ) -> Result<(), ConnectivityError> {
5✔
1176
        info!(
5✔
1177
            target: LOG_TARGET,
×
1178
            "Banning peer {} for {} because: {}",
×
1179
            node_id,
×
1180
            format_duration(duration),
×
1181
            reason
1182
        );
1183
        let ban_result = self.peer_manager.ban_peer_by_node_id(node_id, duration, reason).await;
5✔
1184

1185
        #[cfg(feature = "metrics")]
1186
        super::metrics::banned_peers_counter().inc();
5✔
1187

5✔
1188
        self.publish_event(ConnectivityEvent::PeerBanned(node_id.clone()));
5✔
1189

1190
        if let Some(conn) = self.pool.get_connection_mut(node_id) {
5✔
1191
            disconnect_with_timeout(conn, Minimized::Yes, None, "ConnectivityManagerActor ban peer").await?;
5✔
1192
            let status = self.pool.get_connection_status(node_id);
5✔
1193
            debug!(
5✔
1194
                target: LOG_TARGET,
×
1195
                "Disconnected banned peer {node_id}. The peer connection status is {status}"
×
1196
            );
1197
        }
×
1198
        ban_result?;
5✔
1199
        Ok(())
5✔
1200
    }
5✔
1201

1202
    async fn execute_proactive_dialing(&mut self, task_id: u64) -> Result<(), ConnectivityError> {
9✔
1203
        debug!(
9✔
1204
            target: LOG_TARGET,
×
1205
            "({}) Starting proactive dialing execution - current connections: {}, target: {}",
×
1206
            task_id,
×
1207
            self.pool.count_connected_nodes(),
×
1208
            self.config.target_connection_count
1209
        );
1210

1211
        // First, clean up old health data to keep metrics accurate
1212
        for stats in self.connection_stats.values_mut() {
9✔
1213
            stats.cleanup_old_health_data(self.config.success_rate_tracking_window);
9✔
1214
        }
9✔
1215

1216
        // Update circuit breaker metrics
1217
        self.update_circuit_breaker_metrics();
9✔
1218

9✔
1219
        // Execute proactive dialing logic
9✔
1220
        match self
9✔
1221
            .proactive_dialer
9✔
1222
            .execute_proactive_dialing(&self.pool, &self.connection_stats, &self.seeds, task_id)
9✔
1223
            .await
9✔
1224
        {
1225
            Ok(dialed_count) => {
9✔
1226
                if dialed_count > 0 {
9✔
1227
                    debug!(
×
1228
                        target: LOG_TARGET,
×
1229
                        "({task_id}) Proactive dialing initiated {dialed_count} peer connections"
×
1230
                    );
1231
                }
9✔
1232
                Ok(())
9✔
1233
            },
1234
            Err(err) => {
×
1235
                error!(
×
1236
                    target: LOG_TARGET,
×
1237
                    "({task_id}) Proactive dialing failed: {err:?}"
×
1238

1239
                );
1240
                Err(err)
×
1241
            },
1242
        }
1243
    }
9✔
1244

1245
    fn update_circuit_breaker_metrics(&self) {
9✔
1246
        let _circuit_breaker_open_count = self
9✔
1247
            .connection_stats
9✔
1248
            .values()
9✔
1249
            .filter(|stats| stats.health_metrics().circuit_breaker_state().is_open())
9✔
1250
            .count();
9✔
1251

9✔
1252
        // Calculate average peer health score
9✔
1253
        if !self.connection_stats.is_empty() {
9✔
1254
            let total_health: f32 = self
9✔
1255
                .connection_stats
9✔
1256
                .values()
9✔
1257
                .map(|stats| stats.health_score(self.config.success_rate_tracking_window))
9✔
1258
                .sum();
9✔
1259
            let _avg_health = total_health / self.connection_stats.len() as f32;
9✔
1260
        }
9✔
1261
    }
9✔
1262

1263
    fn cleanup_connection_stats(&mut self) {
9✔
1264
        let mut to_remove = Vec::new();
9✔
1265
        for node_id in self.connection_stats.keys() {
9✔
1266
            let status = self.pool.get_connection_status(node_id);
9✔
1267
            if matches!(
9✔
1268
                status,
9✔
1269
                ConnectionStatus::NotConnected | ConnectionStatus::Failed | ConnectionStatus::Disconnected(_)
1270
            ) {
×
1271
                to_remove.push(node_id.clone());
×
1272
            }
9✔
1273
        }
1274
        for node_id in to_remove {
9✔
1275
            self.connection_stats.remove(&node_id);
×
1276
        }
×
1277
    }
9✔
1278
}
1279

1280
enum TieBreak {
1281
    None,
1282
    UseNew,
1283
    KeepExisting,
1284
}
1285

1286
async fn disconnect_with_timeout(
5✔
1287
    connection: &mut PeerConnection,
5✔
1288
    minimized: Minimized,
5✔
1289
    task_id: Option<u64>,
5✔
1290
    requester: &str,
5✔
1291
) -> Result<(), PeerConnectionError> {
5✔
1292
    match tokio::time::timeout(PEER_DISCONNECT_TIMEOUT, connection.disconnect(minimized, requester)).await {
5✔
1293
        Ok(res) => res,
5✔
1294
        Err(_) => {
1295
            warn!(
×
1296
                target: LOG_TARGET,
×
1297
                "Timeout disconnecting peer ({:?}) '{}'",
×
1298
                task_id,
×
1299
                connection.peer_node_id().short_str(),
×
1300
            );
1301
            Err(PeerConnectionError::DisconnectTimeout)
×
1302
        },
1303
    }
1304
}
5✔
1305

1306
async fn disconnect_if_unused_with_timeout(
×
1307
    connection: &mut PeerConnection,
×
1308
    minimized: Minimized,
×
1309
    task_id: Option<u64>,
×
1310
    requester: &str,
×
1311
) -> Result<(), PeerConnectionError> {
×
1312
    match tokio::time::timeout(
×
1313
        PEER_DISCONNECT_TIMEOUT,
×
1314
        connection.disconnect_if_unused(minimized, 0, 0, requester),
×
1315
    )
×
1316
    .await
×
1317
    {
1318
        Ok(res) => res,
×
1319
        Err(_) => {
1320
            warn!(
×
1321
                target: LOG_TARGET,
×
1322
                "Timeout disconnecting peer ({:?}) '{}'",
×
1323
                task_id,
×
1324
                connection.peer_node_id().short_str(),
×
1325
            );
1326
            Err(PeerConnectionError::DisconnectTimeout)
×
1327
        },
1328
    }
1329
}
×
1330

1331
async fn disconnect_silent_with_timeout(
51✔
1332
    connection: &mut PeerConnection,
51✔
1333
    minimized: Minimized,
51✔
1334
    task_id: Option<u64>,
51✔
1335
    requester: &str,
51✔
1336
) -> Result<(), PeerConnectionError> {
51✔
1337
    match tokio::time::timeout(
51✔
1338
        PEER_DISCONNECT_TIMEOUT,
51✔
1339
        connection.disconnect_silent(minimized, requester),
51✔
1340
    )
51✔
1341
    .await
51✔
1342
    {
1343
        Ok(res) => res,
47✔
1344
        Err(_) => {
1345
            warn!(
1✔
1346
                target: LOG_TARGET,
×
1347
                "Timeout disconnecting peer ({:?}) '{}'",
×
1348
                task_id,
×
1349
                connection.peer_node_id().short_str(),
×
1350
            );
1351
            Err(PeerConnectionError::DisconnectTimeout)
1✔
1352
        },
1353
    }
1354
}
48✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc