• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 23184374285

17 Mar 2026 08:04AM UTC coverage: 60.967% (-0.8%) from 61.722%
23184374285

push

github

SWvheerden
chore: new release v5.3.0-pre.3

69750 of 114406 relevant lines covered (60.97%)

227024.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.71
/comms/core/src/connectivity/manager.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22
use std::{
23
    collections::HashMap,
24
    fmt,
25
    sync::Arc,
26
    time::{Duration, Instant},
27
};
28

29
use log::*;
30
use nom::lib::std::collections::hash_map::Entry;
31
use tari_shutdown::ShutdownSignal;
32
use tokio::{
33
    sync::{mpsc, oneshot},
34
    task::JoinHandle,
35
    time,
36
    time::MissedTickBehavior,
37
};
38
use tracing::{Instrument, Level, span};
39

40
use super::{
41
    ConnectivityEventTx,
42
    config::ConnectivityConfig,
43
    connection_pool::{ConnectionPool, ConnectionStatus},
44
    connection_stats::PeerConnectionStats,
45
    error::ConnectivityError,
46
    proactive_dialer::ProactiveDialer,
47
    requester::{ConnectivityEvent, ConnectivityRequest},
48
    selection::ConnectivitySelection,
49
};
50
use crate::{
51
    Minimized,
52
    NodeIdentity,
53
    PeerConnection,
54
    PeerConnectionError,
55
    PeerManager,
56
    connection_manager::{
57
        ConnectionDirection,
58
        ConnectionManagerError,
59
        ConnectionManagerEvent,
60
        ConnectionManagerRequester,
61
    },
62
    peer_manager::NodeId,
63
    utils::datetime::format_duration,
64
};
65

66
const LOG_TARGET: &str = "comms::connectivity::manager";
67

68
// Maximum time allowed for refreshing the connection pool
69
const POOL_REFRESH_TIMEOUT: Duration = Duration::from_millis(2500);
70
// Maximum time allowed to disconnect a single peer
71
const PEER_DISCONNECT_TIMEOUT: Duration = Duration::from_millis(250);
72
// Warning threshold for request processing time
73
const ACCEPTABLE_CONNECTIVITY_REQUEST_PROCESSING_TIME: Duration = Duration::from_millis(500);
74
// Warning threshold for event processing time
75
const ACCEPTABLE_EVENT_PROCESSING_TIME: Duration = Duration::from_millis(500);
76

77
/// # Connectivity Manager
78
///
79
/// The ConnectivityManager actor is responsible for tracking the state of all peer
80
/// connections in the system and maintaining a _pool_ of peer connections.
81
///
82
/// It emits [ConnectivityEvent](crate::connectivity::ConnectivityEvent)s that can keep client components
83
/// in the loop with the state of the node's connectivity.
84
pub struct ConnectivityManager {
85
    pub config: ConnectivityConfig,
86
    pub request_rx: mpsc::Receiver<ConnectivityRequest>,
87
    pub event_tx: ConnectivityEventTx,
88
    pub connection_manager: ConnectionManagerRequester,
89
    pub peer_manager: Arc<PeerManager>,
90
    pub node_identity: Arc<NodeIdentity>,
91
    pub shutdown_signal: ShutdownSignal,
92
}
93

94
impl ConnectivityManager {
95
    pub fn spawn(self) -> JoinHandle<()> {
119✔
96
        let proactive_dialer =
119✔
97
            ProactiveDialer::new(self.config, self.connection_manager.clone(), self.peer_manager.clone());
119✔
98

99
        ConnectivityManagerActor {
119✔
100
            config: self.config,
119✔
101
            status: ConnectivityStatus::Initializing,
119✔
102
            request_rx: self.request_rx,
119✔
103
            connection_manager: self.connection_manager,
119✔
104
            peer_manager: self.peer_manager.clone(),
119✔
105
            event_tx: self.event_tx,
119✔
106
            connection_stats: HashMap::new(),
119✔
107
            node_identity: self.node_identity,
119✔
108
            pool: ConnectionPool::new(),
119✔
109
            shutdown_signal: self.shutdown_signal,
119✔
110
            #[cfg(feature = "metrics")]
119✔
111
            uptime: Some(Instant::now()),
119✔
112
            allow_list: vec![],
119✔
113
            proactive_dialer,
119✔
114
            seeds: vec![],
119✔
115
        }
119✔
116
        .spawn()
119✔
117
    }
119✔
118
}
119

120
/// Node connectivity status.
121
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
122
pub enum ConnectivityStatus {
123
    /// Initial connectivity status before the Connectivity actor has initialized.
124
    #[default]
125
    Initializing,
126
    /// Connectivity is online.
127
    Online(usize),
128
    /// Connectivity is less than the required minimum, but some connections are still active.
129
    Degraded(usize),
130
    /// There are no active connections.
131
    Offline,
132
}
133

134
impl ConnectivityStatus {
135
    is_fn!(is_initializing, ConnectivityStatus::Initializing);
136

137
    is_fn!(is_online, ConnectivityStatus::Online(_));
138

139
    is_fn!(is_offline, ConnectivityStatus::Offline);
140

141
    is_fn!(is_degraded, ConnectivityStatus::Degraded(_));
142

143
    pub fn num_connected_nodes(&self) -> usize {
30✔
144
        use ConnectivityStatus::{Degraded, Initializing, Offline, Online};
145
        match self {
30✔
146
            Initializing | Offline => 0,
29✔
147
            Online(n) | Degraded(n) => *n,
1✔
148
        }
149
    }
30✔
150
}
151

152
impl fmt::Display for ConnectivityStatus {
153
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
154
        write!(f, "{self:?}")
×
155
    }
×
156
}
157

158
struct ConnectivityManagerActor {
159
    config: ConnectivityConfig,
160
    status: ConnectivityStatus,
161
    request_rx: mpsc::Receiver<ConnectivityRequest>,
162
    connection_manager: ConnectionManagerRequester,
163
    node_identity: Arc<NodeIdentity>,
164
    peer_manager: Arc<PeerManager>,
165
    event_tx: ConnectivityEventTx,
166
    connection_stats: HashMap<NodeId, PeerConnectionStats>,
167
    pool: ConnectionPool,
168
    shutdown_signal: ShutdownSignal,
169
    #[cfg(feature = "metrics")]
170
    uptime: Option<Instant>,
171
    allow_list: Vec<NodeId>,
172
    proactive_dialer: ProactiveDialer,
173
    seeds: Vec<NodeId>,
174
}
175

176
impl ConnectivityManagerActor {
177
    pub fn spawn(self) -> JoinHandle<()> {
119✔
178
        tokio::spawn(async { Self::run(self).await })
119✔
179
    }
119✔
180

181
    pub async fn run(mut self) {
119✔
182
        debug!(target: LOG_TARGET, "ConnectivityManager started");
119✔
183

184
        let mut connection_manager_events = self.connection_manager.get_event_subscription();
119✔
185

186
        let interval = self.config.connection_pool_refresh_interval;
119✔
187
        let mut connection_pool_timer = time::interval_at(
119✔
188
            Instant::now()
119✔
189
                .checked_add(interval)
119✔
190
                .expect("connection_pool_refresh_interval cause overflow")
119✔
191
                .into(),
119✔
192
            interval,
119✔
193
        );
194
        connection_pool_timer.set_missed_tick_behavior(MissedTickBehavior::Delay);
119✔
195

196
        self.publish_event(ConnectivityEvent::ConnectivityStateInitialized);
119✔
197

198
        loop {
199
            tokio::select! {
16,266✔
200
                Some(req) = self.request_rx.recv() => {
16,266✔
201
                    let timer = Instant::now();
15,798✔
202
                    let task_id = rand::random::<u64>();
15,798✔
203
                    trace!(target: LOG_TARGET, "Request ({task_id}): {req:?}");
15,798✔
204
                    self.handle_request(req).await;
15,798✔
205
                    if timer.elapsed() > ACCEPTABLE_CONNECTIVITY_REQUEST_PROCESSING_TIME {
15,798✔
206
                        warn!(
×
207
                            target: LOG_TARGET,
×
208
                            "Request ({}) took too long to process: {:.2?}",
209
                            task_id,
210
                            format_duration(timer.elapsed())
×
211
                        );
212
                    }
15,798✔
213
                    trace!(target: LOG_TARGET, "Request ({task_id}) done");
15,798✔
214
                },
215

216
                Ok(event) = connection_manager_events.recv() => {
16,266✔
217
                    let timer = Instant::now();
302✔
218
                    let task_id = rand::random::<u64>();
302✔
219
                    trace!(target: LOG_TARGET, "Event ({task_id}): {event:?}");
302✔
220
                    if let Err(err) = self.handle_connection_manager_event(&event).await {
302✔
221
                        error!(target:LOG_TARGET, "Error handling connection manager event ({task_id}): {err:?}");
×
222
                    }
302✔
223
                    if timer.elapsed() > ACCEPTABLE_EVENT_PROCESSING_TIME {
302✔
224
                        warn!(
×
225
                            target: LOG_TARGET,
×
226
                            "Event ({}) took too long to process: {:.2?}",
227
                            task_id,
228
                            format_duration(timer.elapsed())
×
229
                        );
230
                    }
302✔
231
                    trace!(target: LOG_TARGET, "Event ({task_id}) done");
302✔
232
                },
233

234
                _ = connection_pool_timer.tick() => {
16,266✔
235
                    let task_id = rand::random::<u64>();
47✔
236
                    trace!(target: LOG_TARGET, "Pool refresh peers task ({task_id})");
47✔
237
                    self.cleanup_connection_stats();
47✔
238
                    match tokio::time::timeout(POOL_REFRESH_TIMEOUT, self.refresh_connection_pool(task_id)).await {
47✔
239
                        Ok(res) => {
47✔
240
                            if let Err(err) = res {
47✔
241
                                error!(target: LOG_TARGET, "Error refreshing connection pools ({task_id}): {err:?}");
×
242
                            }
47✔
243
                        },
244
                        Err(_) => {
245
                            warn!(
×
246
                                target: LOG_TARGET,
×
247
                                "Pool refresh task ({task_id}) timeout",
248
                            );
249
                        },
250
                    }
251
                    trace!(target: LOG_TARGET, "Pool refresh task ({task_id}) done" );
47✔
252
                },
253

254
                _ = self.shutdown_signal.wait() => {
16,266✔
255
                    info!(
91✔
256
                        target: LOG_TARGET,
×
257
                        "ConnectivityManager is shutting down because it received the shutdown signal"
258
                    );
259
                    self.disconnect_all().await;
91✔
260
                    break;
77✔
261
                }
262
            }
263
        }
264
    }
77✔
265

266
    async fn handle_request(&mut self, req: ConnectivityRequest) {
15,798✔
267
        #[allow(clippy::enum_glob_use)]
268
        use ConnectivityRequest::*;
269
        match req {
15,798✔
270
            WaitStarted(reply) => {
79✔
271
                let _ = reply.send(());
79✔
272
            },
79✔
273
            GetConnectivityStatus(reply) => {
60✔
274
                let _ = reply.send(self.status);
60✔
275
            },
60✔
276
            DialPeer { node_id, reply_tx } => {
233✔
277
                let tracing_id = tracing::Span::current().id();
233✔
278
                let span = span!(Level::TRACE, "handle_dial_peer");
233✔
279
                span.follows_from(tracing_id);
233✔
280
                self.handle_dial_peer(node_id.clone(), reply_tx).instrument(span).await;
233✔
281
            },
282
            SelectConnections(selection, reply) => {
14,453✔
283
                let _result = reply.send(self.select_connections(selection));
14,453✔
284
            },
14,453✔
285
            GetConnection(node_id, reply) => {
90✔
286
                let _result = reply.send(
90✔
287
                    self.pool
90✔
288
                        .get(&node_id)
90✔
289
                        .filter(|c| c.status() == ConnectionStatus::Connected)
90✔
290
                        .and_then(|c| c.connection())
90✔
291
                        .filter(|conn| conn.is_connected())
90✔
292
                        .cloned(),
90✔
293
                );
294
            },
295
            GetPeerStats(node_id, reply) => {
×
296
                let peer = match self.peer_manager.find_by_node_id(&node_id).await {
×
297
                    Ok(v) => v,
×
298
                    Err(e) => {
×
299
                        error!(target: LOG_TARGET, "Error when retrieving peer: {e:?}");
×
300
                        None
×
301
                    },
302
                };
303
                let _result = reply.send(peer);
×
304
            },
305
            GetAllConnectionStates(reply) => {
1✔
306
                let states = self.pool.all().into_iter().cloned().collect();
1✔
307
                let _result = reply.send(states);
1✔
308
            },
1✔
309
            GetMinimizeConnectionsThreshold(reply) => {
×
310
                let minimize_connections_threshold = self.config.maintain_n_closest_connections_only;
×
311
                let _result = reply.send(minimize_connections_threshold);
×
312
            },
×
313
            BanPeer(node_id, duration, reason) => {
11✔
314
                if self.allow_list.contains(&node_id) {
11✔
315
                    info!(
×
316
                        target: LOG_TARGET,
×
317
                        "Peer is excluded from being banned as it was found in the AllowList, NodeId: {node_id:?}"
318
                    );
319
                } else if let Err(err) = self.ban_peer(&node_id, duration, reason).await {
11✔
320
                    error!(target: LOG_TARGET, "Error when banning peer: {err:?}");
×
321
                } else {
11✔
322
                    // we banned the peer
11✔
323
                }
11✔
324
            },
325
            AddPeerToAllowList(node_id) => {
×
326
                if !self.allow_list.contains(&node_id) {
×
327
                    self.allow_list.push(node_id.clone());
×
328
                }
×
329
            },
330
            RemovePeerFromAllowList(node_id) => {
×
331
                if let Some(index) = self.allow_list.iter().position(|x| *x == node_id) {
×
332
                    self.allow_list.remove(index);
×
333
                }
×
334
            },
335
            GetAllowList(reply) => {
867✔
336
                let allow_list = self.allow_list.clone();
867✔
337
                let _result = reply.send(allow_list);
867✔
338
            },
867✔
339
            GetSeeds(reply) => {
×
340
                let seeds = self.peer_manager.get_seed_peers().await.unwrap_or_else(|e| {
×
341
                    error!(target: LOG_TARGET, "Error when retrieving seed peers: {e:?}");
×
342
                    vec![]
×
343
                });
×
344
                let _result = reply.send(seeds);
×
345
            },
346
            GetActiveConnections(reply) => {
4✔
347
                let _result = reply.send(
4✔
348
                    self.pool
4✔
349
                        .filter_connection_states(|s| s.is_connected())
23✔
350
                        .into_iter()
4✔
351
                        .cloned()
4✔
352
                        .collect(),
4✔
353
                );
354
            },
355
            GetNodeIdentity(reply) => {
×
356
                let identity = self.node_identity.as_ref();
×
357
                let _result = reply.send(identity.clone());
×
358
            },
×
359
        }
360
    }
15,798✔
361

362
    async fn handle_dial_peer(
233✔
363
        &mut self,
233✔
364
        node_id: NodeId,
233✔
365
        reply_tx: Option<oneshot::Sender<Result<PeerConnection, ConnectionManagerError>>>,
233✔
366
    ) {
233✔
367
        match self.peer_manager.is_peer_banned(&node_id).await {
233✔
368
            Ok(true) => {
369
                if let Some(reply) = reply_tx {
×
370
                    let _result = reply.send(Err(ConnectionManagerError::PeerBanned));
×
371
                }
×
372
                return;
×
373
            },
374
            Ok(false) => {},
233✔
375
            Err(err) => {
×
376
                if let Some(reply) = reply_tx {
×
377
                    let _result = reply.send(Err(err.into()));
×
378
                }
×
379
                return;
×
380
            },
381
        }
382
        match self.pool.get(&node_id) {
233✔
383
            // The connection pool may temporarily contain a connection that is not connected so we need to check this.
384
            Some(state) if state.is_connected() => {
123✔
385
                if let Some(reply_tx) = reply_tx {
101✔
386
                    let _result = reply_tx.send(Ok(state.connection().cloned().expect("Already checked")));
99✔
387
                }
99✔
388
            },
389
            maybe_state => {
132✔
390
                match maybe_state {
132✔
391
                    Some(state) => {
22✔
392
                        info!(
22✔
393
                            target: LOG_TARGET,
×
394
                            "Connection was previously attempted for peer {}. Current status is '{}'. Dialing again...",
395
                            node_id.short_str(),
×
396
                            state.status()
×
397
                        );
398
                    },
399
                    None => {
400
                        info!(
110✔
401
                            target: LOG_TARGET,
×
402
                            "No connection for peer {}. Dialing...",
403
                            node_id.short_str(),
×
404
                        );
405
                    },
406
                }
407

408
                if let Err(err) = self.connection_manager.send_dial_peer(node_id, reply_tx).await {
132✔
409
                    error!(
×
410
                        target: LOG_TARGET,
×
411
                        "Failed to send dial request to connection manager: {err:?}"
412
                    );
413
                }
132✔
414
            },
415
        }
416
    }
233✔
417

418
    async fn disconnect_all(&mut self) {
91✔
419
        let mut node_ids = Vec::with_capacity(self.pool.count_connected());
91✔
420
        for mut state in self.pool.filter_drain(|_| true) {
124✔
421
            if let Some(conn) = state.connection_mut() {
124✔
422
                if !conn.is_connected() {
124✔
423
                    continue;
45✔
424
                }
79✔
425
                match disconnect_silent_with_timeout(
79✔
426
                    conn,
79✔
427
                    Minimized::No,
79✔
428
                    None,
79✔
429
                    "ConnectivityManagerActor disconnect all",
79✔
430
                )
431
                .await
79✔
432
                {
433
                    Ok(_) => {
65✔
434
                        node_ids.push(conn.peer_node_id().clone());
65✔
435
                    },
65✔
436
                    Err(err) => {
×
437
                        debug!(
×
438
                            target: LOG_TARGET,
×
439
                            "In disconnect_all: Error when disconnecting peer '{}' because '{:?}'",
440
                            conn.peer_node_id().short_str(),
×
441
                            err
442
                        );
443
                    },
444
                }
445
            }
×
446
        }
447

448
        for node_id in node_ids {
77✔
449
            self.publish_event(ConnectivityEvent::PeerDisconnected(node_id, Minimized::No));
65✔
450
        }
65✔
451
    }
77✔
452

453
    async fn refresh_connection_pool(&mut self, task_id: u64) -> Result<(), ConnectivityError> {
47✔
454
        info!(
47✔
455
            target: LOG_TARGET,
×
456
            "CONNECTIVITY_REFRESH: Performing connection pool cleanup/refresh ({}). (#Peers = {}, #Connected={}, #Failed={}, #Disconnected={}, \
457
             #Clients={})",
458
            task_id,
459
            self.pool.count_entries(),
×
460
            self.pool.count_connected_nodes(),
×
461
            self.pool.count_failed(),
×
462
            self.pool.count_disconnected(),
×
463
            self.pool.count_connected_clients()
×
464
        );
465

466
        self.clean_connection_pool();
47✔
467
        self.disconnect_seed_peers(task_id).await;
47✔
468

469
        if self.config.is_connection_reaping_enabled {
47✔
470
            self.reap_inactive_connections(task_id).await;
47✔
471
        }
×
472
        if let Some(threshold) = self.config.maintain_n_closest_connections_only {
47✔
473
            self.maintain_n_closest_peer_connections_only(threshold, task_id).await;
×
474
        }
47✔
475

476
        // Execute proactive dialing logic (if enabled)
477
        debug!(
47✔
478
            target: LOG_TARGET,
×
479
            "({}) Proactive dialing config check: enabled={}, target_connections={}",
480
            task_id,
481
            self.config.proactive_dialing_enabled,
482
            self.config.target_connection_count
483
        );
484

485
        if self.config.proactive_dialing_enabled {
47✔
486
            debug!(
47✔
487
                target: LOG_TARGET,
×
488
                "({task_id}) Executing proactive dialing logic"
489
            );
490
            if let Err(err) = self.execute_proactive_dialing(task_id).await {
47✔
491
                warn!(
×
492
                    target: LOG_TARGET,
×
493
                    "({task_id}) Proactive dialing failed: {err:?}"
494
                );
495
            }
47✔
496
        } else {
497
            debug!(
×
498
                target: LOG_TARGET,
×
499
                "({task_id}) Proactive dialing disabled in configuration"
500

501
            );
502
        }
503

504
        self.update_connectivity_status();
47✔
505
        self.update_connectivity_metrics();
47✔
506
        Ok(())
47✔
507
    }
47✔
508

509
    async fn maintain_n_closest_peer_connections_only(&mut self, threshold: usize, task_id: u64) {
×
510
        let start = Instant::now();
×
511
        // Select all active peer connections (that are communication nodes) with health-aware selection
512
        let selection = ConnectivitySelection::random_nodes(self.pool.count_connected_nodes(), vec![]);
×
513
        let mut connections = match self.select_connections_with_health(selection) {
×
514
            Ok(peers) => peers,
×
515
            Err(e) => {
×
516
                warn!(
×
517
                    target: LOG_TARGET,
×
518
                    "Connectivity error trying to maintain {threshold} peer connections ({task_id}) ({e:?})",
519
                );
520
                return;
×
521
            },
522
        };
523
        let num_connections = connections.len();
×
524

525
        // Remove peers that are on the allow list
526
        connections.retain(|conn| !self.allow_list.contains(conn.peer_node_id()));
×
527
        debug!(
×
528
            target: LOG_TARGET,
×
529
            "minimize_connections: ({}) Filtered peers: {}, Handles: {}",
530
            task_id,
531
            connections.len(),
×
532
            num_connections,
533
        );
534

535
        // Disconnect all remaining peers above the threshold
536
        let len = connections.len();
×
537
        for conn in connections.iter_mut().skip(threshold) {
×
538
            debug!(
×
539
                target: LOG_TARGET,
×
540
                "minimize_connections: ({}) Disconnecting '{}' because the node exceeds the {} connection threshold",
541
                task_id,
542
                conn.peer_node_id(),
×
543
                threshold
544
            );
545
            match disconnect_if_unused_with_timeout(
×
546
                conn,
×
547
                Minimized::Yes,
×
548
                Some(task_id),
×
549
                "ConnectivityManagerActor maintain connections",
×
550
            )
551
            .await
×
552
            {
553
                Ok(_) => {
×
554
                    self.pool.remove(conn.peer_node_id());
×
555
                },
×
556
                Err(err) => {
×
557
                    debug!(
×
558
                        target: LOG_TARGET,
×
559
                        "Peer '{}' already disconnected ({:?}). Error: {:?}",
560
                        conn.peer_node_id().short_str(),
×
561
                        task_id,
562
                        err
563
                    );
564
                },
565
            }
566
        }
567
        if len > 0 {
×
568
            debug!(
×
569
                "minimize_connections: ({}) Minimized {} connections in {:.2?}",
570
                task_id,
571
                len,
572
                start.elapsed()
×
573
            );
574
        }
×
575
    }
×
576

577
    async fn reap_inactive_connections(&mut self, task_id: u64) {
47✔
578
        let start = Instant::now();
47✔
579
        let excess_connections = self
47✔
580
            .pool
47✔
581
            .count_connected()
47✔
582
            .saturating_sub(self.config.reaper_min_connection_threshold);
47✔
583
        if excess_connections == 0 {
47✔
584
            return;
47✔
585
        }
×
586

587
        let mut connections = self
×
588
            .pool
×
589
            .get_inactive_outbound_connections_mut(self.config.reaper_min_inactive_age);
×
590
        connections.truncate(excess_connections);
×
591
        let mut nodes_to_remove = Vec::new();
×
592
        for conn in &mut connections {
×
593
            if !conn.is_connected() {
×
594
                continue;
×
595
            }
×
596

597
            debug!(
×
598
                target: LOG_TARGET,
×
599
                "({}) Disconnecting '{}' because connection was inactive ({} handles)",
600
                task_id,
601
                conn.peer_node_id().short_str(),
×
602
                conn.handle_count()
×
603
            );
604
            match disconnect_with_timeout(
×
605
                conn,
×
606
                Minimized::Yes,
×
607
                Some(task_id),
×
608
                "ConnectivityManagerActor reap inactive",
×
609
            )
610
            .await
×
611
            {
612
                Ok(_) => {
×
613
                    nodes_to_remove.push(conn.peer_node_id().clone());
×
614
                },
×
615
                Err(err) => {
×
616
                    debug!(
×
617
                        target: LOG_TARGET,
×
618
                        "Peer '{}' already disconnected ({:?}). Error: {:?}",
619
                        conn.peer_node_id().short_str(),
×
620
                        task_id,
621
                        err
622
                    );
623
                },
624
            }
625
        }
626
        let len = nodes_to_remove.len();
×
627
        if len > 0 {
×
628
            for node_id in nodes_to_remove {
×
629
                self.pool.remove(&node_id);
×
630
            }
×
631
            debug!(
×
632
                "({}) Reaped {} inactive connections in {:.2?}",
633
                task_id,
634
                len,
635
                start.elapsed()
×
636
            );
637
        }
×
638
    }
47✔
639

640
    async fn refresh_seeds_list(&mut self) {
94✔
641
        match self.peer_manager.get_seed_peers().await {
94✔
642
            Ok(seeds) => {
94✔
643
                self.seeds = seeds.into_iter().map(|p| p.node_id).collect();
94✔
644
            },
645
            Err(err) => {
×
646
                error!(target: LOG_TARGET, "Failed to fetch seed peers: {}", err);
×
647
            },
648
        }
649
    }
94✔
650

651
    async fn disconnect_seed_peers(&mut self, task_id: u64) {
47✔
652
        self.refresh_seeds_list().await;
47✔
653

654
        if self.seeds.is_empty() {
47✔
655
            return;
10✔
656
        }
37✔
657

658
        // Identify seeds that are too old
659
        let mut seeds_to_disconnect = Vec::new();
37✔
660
        for seed_node_id in &self.seeds {
37✔
661
            if let Some(conn) = self.pool.get_connection(seed_node_id) &&
37✔
662
                conn.is_connected() &&
32✔
663
                conn.age() > self.config.max_seed_peer_age
32✔
664
            {
1✔
665
                seeds_to_disconnect.push(conn.clone());
1✔
666
            }
36✔
667
        }
668

669
        if seeds_to_disconnect.is_empty() {
37✔
670
            return;
36✔
671
        }
1✔
672

673
        debug!(
1✔
674
            target: LOG_TARGET,
×
675
            "({}) Found {} seed peer(s) eligible for cleanup", task_id, seeds_to_disconnect.len()
×
676
        );
677

678
        for mut conn in seeds_to_disconnect {
1✔
679
            if self.pool.count_connected_nodes() <= self.config.min_connectivity {
1✔
680
                debug!(
×
681
                    target: LOG_TARGET,
×
682
                    "({}) SKIPPING seed disconnect for '{}'. Connected Nodes ({}) <= Min ({})",
683
                    task_id,
684
                    conn.peer_node_id().short_str(),
×
685
                    self.pool.count_connected_nodes(),
×
686
                    self.config.min_connectivity
687
                );
688
                break;
×
689
            }
1✔
690

691
            debug!(
1✔
692
                target: LOG_TARGET,
×
693
                "({}) Disconnecting seed peer '{}' ...",
694
                task_id,
695
                conn.peer_node_id().short_str()
×
696
            );
697

698
            match disconnect_with_timeout(
1✔
699
                &mut conn,
1✔
700
                Minimized::Yes,
1✔
701
                Some(task_id),
1✔
702
                "ConnectivityManagerActor disconnect seed",
1✔
703
            )
704
            .await
1✔
705
            {
706
                Ok(_) => {
1✔
707
                    self.pool.remove(conn.peer_node_id());
1✔
708
                },
1✔
709
                Err(err) => {
×
710
                    debug!(
×
711
                        target: LOG_TARGET,
×
712
                        "Seed peer '{}' already disconnected ({:?}). Error: {:?}",
713
                        conn.peer_node_id().short_str(),
×
714
                        task_id,
715
                        err
716
                    );
717
                },
718
            }
719
        }
720
    }
47✔
721

722
    fn clean_connection_pool(&mut self) {
47✔
723
        let cleared_states = self.pool.filter_drain(|state| {
77✔
724
            matches!(
77✔
725
                state.status(),
77✔
726
                ConnectionStatus::Failed | ConnectionStatus::Disconnected(_)
727
            )
728
        });
77✔
729

730
        if !cleared_states.is_empty() {
47✔
731
            debug!(
×
732
                target: LOG_TARGET,
×
733
                "Cleared connection states: {}",
734
                cleared_states
×
735
                    .iter()
×
736
                    .map(ToString::to_string)
×
737
                    .collect::<Vec<_>>()
×
738
                    .join(",")
×
739
            )
740
        }
47✔
741
    }
47✔
742

743
    fn select_connections(&self, selection: ConnectivitySelection) -> Result<Vec<PeerConnection>, ConnectivityError> {
14,453✔
744
        trace!(target: LOG_TARGET, "Selection query: {selection:?}");
14,453✔
745
        trace!(
14,453✔
746
            target: LOG_TARGET,
×
747
            "Selecting from {} connected node peers",
748
            self.pool.count_connected_nodes()
×
749
        );
750

751
        let conns = selection.select(&self.pool);
14,453✔
752
        debug!(target: LOG_TARGET, "Selected {} connections(s)", conns.len());
14,453✔
753

754
        Ok(conns.into_iter().cloned().collect())
14,453✔
755
    }
14,453✔
756

757
    fn select_connections_with_health(
×
758
        &self,
×
759
        selection: ConnectivitySelection,
×
760
    ) -> Result<Vec<PeerConnection>, ConnectivityError> {
×
761
        trace!(target: LOG_TARGET, "Health-aware selection query: {selection:?}");
×
762
        trace!(
×
763
            target: LOG_TARGET,
×
764
            "Selecting from {} connected node peers with health metrics",
765
            self.pool.count_connected_nodes()
×
766
        );
767

768
        let conns = selection.select_with_health(
×
769
            &self.pool,
×
770
            &self.connection_stats,
×
771
            self.config.success_rate_tracking_window,
×
772
        );
773
        debug!(target: LOG_TARGET, "Selected {} healthy connections(s)", conns.len());
×
774

775
        Ok(conns.into_iter().cloned().collect())
×
776
    }
×
777

778
    fn get_connection_stat_mut(&mut self, node_id: NodeId) -> &mut PeerConnectionStats {
203✔
779
        match self.connection_stats.entry(node_id) {
203✔
780
            Entry::Occupied(entry) => entry.into_mut(),
15✔
781
            Entry::Vacant(entry) => entry.insert(PeerConnectionStats::new()),
188✔
782
        }
783
    }
203✔
784

785
    fn mark_connection_success(&mut self, node_id: NodeId) {
199✔
786
        let entry = self.get_connection_stat_mut(node_id);
199✔
787
        entry.set_connection_success();
199✔
788

789
        // Update proactive dialing success metrics
790
    }
199✔
791

792
    fn mark_peer_failed(&mut self, node_id: NodeId) -> usize {
4✔
793
        let threshold = self.config.circuit_breaker_failure_threshold;
4✔
794
        let entry = self.get_connection_stat_mut(node_id);
4✔
795
        entry.set_connection_failed_with_threshold(threshold);
4✔
796

797
        entry.failed_attempts()
4✔
798
    }
4✔
799

800
    async fn on_peer_connection_failure(&mut self, node_id: &NodeId) -> Result<(), ConnectivityError> {
13✔
801
        if self.status.is_offline() {
13✔
802
            info!(
9✔
803
                target: LOG_TARGET,
×
804
                "Node is offline. Ignoring connection failure event for peer '{node_id}'."
805
            );
806
            self.publish_event(ConnectivityEvent::ConnectivityStateOffline);
9✔
807
            return Ok(());
9✔
808
        }
4✔
809

810
        let num_failed = self.mark_peer_failed(node_id.clone());
4✔
811

812
        if num_failed >= self.config.max_failures_mark_offline {
4✔
813
            debug!(
4✔
814
                target: LOG_TARGET,
×
815
                "Marking peer '{}' as offline because this node failed to connect to them {} times",
816
                node_id.short_str(),
×
817
                num_failed
818
            );
819

820
            if let Some(peer) = self.peer_manager.find_by_node_id(node_id).await? &&
4✔
821
                !peer.is_banned() &&
4✔
822
                peer
4✔
823
                    .last_seen_since()
4✔
824
                    // Haven't seen them in expire_peer_last_seen_duration
825
                    .map(|t| t > self.config.expire_peer_last_seen_duration)
4✔
826
                    // Or don't delete if never seen
827
                    .unwrap_or(false)
4✔
828
            {
829
                debug!(
×
830
                    target: LOG_TARGET,
×
831
                    "Peer `{}` was marked as offline after {} attempts (last seen: {}). Removing peer from peer \
832
                     list",
833
                    node_id,
834
                    num_failed,
835
                    peer.last_seen_since()
×
836
                        .map(|d| format!("{}s ago", d.as_secs()))
×
837
                        .unwrap_or_else(|| "Never".to_string()),
×
838
                );
839
                self.peer_manager.soft_delete_peer(node_id).await?;
×
840
            }
4✔
841
        }
×
842

843
        Ok(())
4✔
844
    }
13✔
845

846
    async fn handle_connection_manager_event(
302✔
847
        &mut self,
302✔
848
        event: &ConnectionManagerEvent,
302✔
849
    ) -> Result<(), ConnectivityError> {
302✔
850
        self.update_state_on_connectivity_event(event).await?;
302✔
851
        self.update_connectivity_status();
302✔
852
        self.update_connectivity_metrics();
302✔
853
        Ok(())
302✔
854
    }
302✔
855

856
    #[allow(clippy::too_many_lines)]
857
    async fn update_state_on_connectivity_event(
302✔
858
        &mut self,
302✔
859
        event: &ConnectionManagerEvent,
302✔
860
    ) -> Result<(), ConnectivityError> {
302✔
861
        use ConnectionManagerEvent::*;
862
        match event {
302✔
863
            PeerConnected(new_conn) => {
205✔
864
                match self.on_new_connection(new_conn).await {
205✔
865
                    TieBreak::KeepExisting => {
866
                        debug!(
6✔
867
                            target: LOG_TARGET,
×
868
                            "Discarding new connection to peer '{}' because we already have an existing connection",
869
                            new_conn.peer_node_id().short_str()
×
870
                        );
871
                        // Ignore event, we discarded the new connection and keeping the current one
872
                        return Ok(());
6✔
873
                    },
874
                    TieBreak::UseNew | TieBreak::None => {},
199✔
875
                }
876
            },
877
            PeerDisconnected(id, node_id, _minimized) => {
77✔
878
                if let Some(conn) = self.pool.get_connection(node_id) &&
77✔
879
                    conn.id() != *id
77✔
880
                {
881
                    debug!(
14✔
882
                        target: LOG_TARGET,
×
883
                        "Ignoring peer disconnected event for stale peer connection (id: {id}) for peer '{node_id}'"
884

885
                    );
886
                    return Ok(());
14✔
887
                }
63✔
888
            },
889
            PeerViolation { peer_node_id, details } => {
×
890
                self.ban_peer(
×
891
                    peer_node_id,
×
892
                    Duration::from_secs(2 * 60 * 60),
×
893
                    format!("Peer violation: {details}"),
×
894
                )
×
895
                .await?;
×
896
                return Ok(());
×
897
            },
898
            _ => {},
20✔
899
        }
900

901
        let (node_id, mut new_status, connection) = match event {
275✔
902
            PeerDisconnected(_, node_id, minimized) => (node_id, ConnectionStatus::Disconnected(*minimized), None),
63✔
903
            PeerConnected(conn) => (conn.peer_node_id(), ConnectionStatus::Connected, Some(conn.clone())),
199✔
904
            PeerConnectFailed(node_id, ConnectionManagerError::AllPeerAddressesAreExcluded(msg)) => {
×
905
                debug!(
×
906
                    target: LOG_TARGET,
×
907
                    "Peer '{node_id}' contains only excluded addresses ({msg})"
908

909
                );
910
                (node_id, ConnectionStatus::Failed, None)
×
911
            },
912
            PeerConnectFailed(node_id, ConnectionManagerError::NoiseHandshakeError(msg)) => {
×
913
                if let Some(conn) = self.pool.get_connection(node_id) {
×
914
                    debug!(
×
915
                        target: LOG_TARGET,
×
916
                        "Handshake error to peer '{node_id}', disconnecting for a fresh retry ({msg})"
917
                    );
918
                    let mut conn = conn.clone();
×
919
                    disconnect_with_timeout(
×
920
                        &mut conn,
×
921
                        Minimized::No,
×
922
                        None,
×
923
                        "ConnectivityManagerActor peer connect failed",
×
924
                    )
×
925
                    .await?;
×
926
                }
×
927
                (node_id, ConnectionStatus::Failed, None)
×
928
            },
929
            PeerConnectFailed(node_id, ConnectionManagerError::DialCancelled) => {
1✔
930
                if let Some(conn) = self.pool.get_connection(node_id) &&
1✔
931
                    conn.is_connected() &&
1✔
932
                    conn.direction().is_inbound()
1✔
933
                {
934
                    debug!(
1✔
935
                        target: LOG_TARGET,
×
936
                        "Ignoring DialCancelled({node_id}) event because an inbound connection already exists"
937
                    );
938

939
                    return Ok(());
1✔
940
                }
×
941
                debug!(
×
942
                    target: LOG_TARGET,
×
943
                    "Dial was cancelled before connection completed to peer '{node_id}'"
944
                );
945
                (node_id, ConnectionStatus::Failed, None)
×
946
            },
947
            PeerConnectFailed(node_id, err) => {
13✔
948
                debug!(
13✔
949
                    target: LOG_TARGET,
×
950
                    "Connection to peer '{node_id}' failed because '{err:?}'"
951
                );
952
                self.on_peer_connection_failure(node_id).await?;
13✔
953
                (node_id, ConnectionStatus::Failed, None)
13✔
954
            },
955
            _ => return Ok(()),
6✔
956
        };
957

958
        let old_status = self.pool.set_status(node_id, new_status);
275✔
959
        if let Some(conn) = connection {
275✔
960
            new_status = self.pool.insert_connection(*conn);
199✔
961
        }
199✔
962
        if old_status != new_status {
275✔
963
            debug!(
273✔
964
                target: LOG_TARGET,
×
965
                "Peer connection for node '{node_id}' transitioned from {old_status} to {new_status}"
966
            );
967
        }
2✔
968

969
        let node_id = node_id.clone();
275✔
970

971
        use ConnectionStatus::{Connected, Disconnected, Failed};
972
        match (old_status, new_status) {
275✔
973
            (_, Connected) => match self.pool.get_connection_mut(&node_id).cloned() {
199✔
974
                Some(conn) => {
199✔
975
                    self.mark_connection_success(conn.peer_node_id().clone());
199✔
976
                    self.publish_event(ConnectivityEvent::PeerConnected(conn.into()));
199✔
977
                },
199✔
978
                None => unreachable!(
×
979
                    "Connection transitioning to CONNECTED state must always have a connection set i.e. \
980
                     ConnectionPool::get_connection is Some"
981
                ),
982
            },
983
            (Connected, Disconnected(..)) => {
984
                self.publish_event(ConnectivityEvent::PeerDisconnected(node_id, match new_status {
52✔
985
                    ConnectionStatus::Disconnected(reason) => reason,
52✔
986
                    _ => Minimized::No,
×
987
                }));
988
            },
989
            // Was not connected so don't broadcast event
990
            (_, Disconnected(..)) => {},
11✔
991
            (_, Failed) => {
13✔
992
                self.publish_event(ConnectivityEvent::PeerConnectFailed(node_id));
13✔
993
            },
13✔
994
            _ => {
995
                error!(
×
996
                    target: LOG_TARGET,
×
997
                    "Unexpected connection status transition ({old_status} to {new_status}) for peer '{node_id}'"
998
                );
999
            },
1000
        }
1001

1002
        Ok(())
275✔
1003
    }
302✔
1004

1005
    async fn on_new_connection(&mut self, new_conn: &PeerConnection) -> TieBreak {
205✔
1006
        match self.pool.get(new_conn.peer_node_id()).cloned() {
205✔
1007
            Some(existing_state) if !existing_state.is_connected() => {
18✔
1008
                debug!(
4✔
1009
                    target: LOG_TARGET,
×
1010
                    "Tie break: Existing connection (id: {}, peer: {}, direction: {}) was not connected, resolving \
1011
                     tie break by using the new connection. (New: id: {}, peer: {}, direction: {})",
1012
                    existing_state.connection().map(|c| c.id()).unwrap_or_default(),
×
1013
                    existing_state.node_id(),
×
1014
                    existing_state.connection().map(|c| c.direction().as_str()).unwrap_or("--"),
×
1015
                    new_conn.id(),
×
1016
                    new_conn.peer_node_id(),
×
1017
                    new_conn.direction(),
×
1018
                );
1019
                self.pool.remove(existing_state.node_id());
4✔
1020
                TieBreak::UseNew
4✔
1021
            },
1022
            Some(mut existing_state) => {
14✔
1023
                let Some(existing_conn) = existing_state.connection_mut() else {
14✔
1024
                    error!(
×
1025
                        target: LOG_TARGET,
×
1026
                        "INVARIANT ERROR in Tie break: PeerConnection is None but state is CONNECTED: Existing \
1027
                        connection (id: {}, peer: {}, direction: {}), new connection. (id: {}, peer: {}, direction: {})",
1028
                        existing_state.connection().map(|c| c.id()).unwrap_or_default(),
×
1029
                        existing_state.node_id(),
×
1030
                        existing_state.connection().map(|c| c.direction().as_str()).unwrap_or("--"),
×
1031
                        new_conn.id(),
×
1032
                        new_conn.peer_node_id(),
×
1033
                        new_conn.direction(),
×
1034
                    );
1035
                    return TieBreak::UseNew;
×
1036
                };
1037
                if self.tie_break_existing_connection(existing_conn, new_conn) {
14✔
1038
                    info!(
8✔
1039
                        target: LOG_TARGET,
×
1040
                        "Tie break: Keep new connection (id: {}, peer: {}, direction: {}). Disconnect existing \
1041
                         connection (id: {}, peer: {}, direction: {})",
1042
                        new_conn.id(),
×
1043
                        new_conn.peer_node_id(),
×
1044
                        new_conn.direction(),
×
1045
                        existing_conn.id(),
×
1046
                        existing_conn.peer_node_id(),
×
1047
                        existing_conn.direction(),
×
1048
                    );
1049

1050
                    let _result = disconnect_silent_with_timeout(
8✔
1051
                        existing_conn,
8✔
1052
                        Minimized::Yes,
8✔
1053
                        None,
8✔
1054
                        "ConnectivityManagerActor tie break",
8✔
1055
                    )
8✔
1056
                    .await;
8✔
1057
                    self.pool.remove(existing_conn.peer_node_id());
8✔
1058
                    TieBreak::UseNew
8✔
1059
                } else {
1060
                    debug!(
6✔
1061
                        target: LOG_TARGET,
×
1062
                        "Tie break: Keeping existing connection (id: {}, peer: {}, direction: {}). Disconnecting new \
1063
                         connection (id: {}, peer: {}, direction: {})",
1064
                        new_conn.id(),
×
1065
                        new_conn.peer_node_id(),
×
1066
                        new_conn.direction(),
×
1067
                        existing_conn.id(),
×
1068
                        existing_conn.peer_node_id(),
×
1069
                        existing_conn.direction(),
×
1070
                    );
1071

1072
                    let _result = disconnect_silent_with_timeout(
6✔
1073
                        &mut new_conn.clone(),
6✔
1074
                        Minimized::Yes,
6✔
1075
                        None,
6✔
1076
                        "ConnectivityManagerActor tie break",
6✔
1077
                    )
6✔
1078
                    .await;
6✔
1079
                    TieBreak::KeepExisting
6✔
1080
                }
1081
            },
1082

1083
            None => TieBreak::None,
187✔
1084
        }
1085
    }
205✔
1086

1087
    /// Two connections to the same peer have been created. This function deterministically determines which peer
1088
    /// connection to close. It does this by comparing our NodeId to that of the peer. This rule enables both sides to
1089
    /// agree which connection to disconnect
1090
    ///
1091
    /// Returns true if the existing connection should close, otherwise false if the new connection should be closed.
1092
    fn tie_break_existing_connection(&self, existing_conn: &PeerConnection, new_conn: &PeerConnection) -> bool {
14✔
1093
        debug_assert_eq!(existing_conn.peer_node_id(), new_conn.peer_node_id());
14✔
1094
        let peer_node_id = existing_conn.peer_node_id();
14✔
1095
        let our_node_id = self.node_identity.node_id();
14✔
1096

1097
        debug!(
14✔
1098
            target: LOG_TARGET,
×
1099
            "Tie-break: (Existing = {}, New = {})",
1100
            existing_conn.direction(),
×
1101
            new_conn.direction()
×
1102
        );
1103
        use ConnectionDirection::{Inbound, Outbound};
1104
        match (existing_conn.direction(), new_conn.direction()) {
14✔
1105
            // They connected to us twice for some reason. Drop the older connection
1106
            (Inbound, Inbound) => true,
3✔
1107
            // They connected to us at the same time we connected to them
1108
            (Inbound, Outbound) => peer_node_id > our_node_id,
4✔
1109
            // We connected to them at the same time as they connected to us
1110
            (Outbound, Inbound) => our_node_id > peer_node_id,
4✔
1111
            // We connected to them twice for some reason. Drop the older connection.
1112
            (Outbound, Outbound) => true,
3✔
1113
        }
1114
    }
14✔
1115

1116
    fn update_connectivity_status(&mut self) {
349✔
1117
        // The contract we are making with online/degraded status transitions is as follows:
1118
        // - If min_connectivity peers are connected we MUST transition to ONLINE
1119
        // - Clients SHOULD tolerate entering a DEGRADED/OFFLINE status
1120
        // - If a number of peers disconnect or the local system's network goes down, the status MAY transition to
1121
        //   DEGRADED
1122
        let min_peers = self.config.min_connectivity;
349✔
1123
        let num_connected_nodes = self.pool.count_connected_nodes();
349✔
1124
        let num_connected_clients = self.pool.count_connected_clients();
349✔
1125
        debug!(
349✔
1126
            target: LOG_TARGET,
×
1127
            "#min_peers = {min_peers}, #nodes = {num_connected_nodes}, #clients = {num_connected_clients}"
1128
        );
1129

1130
        match num_connected_nodes {
349✔
1131
            n if n >= min_peers => {
349✔
1132
                self.transition(ConnectivityStatus::Online(n), min_peers);
305✔
1133
            },
305✔
1134
            n if n > 0 && n < min_peers => {
44✔
1135
                self.transition(ConnectivityStatus::Degraded(n), min_peers);
3✔
1136
            },
3✔
1137
            n if n == 0 => {
41✔
1138
                if num_connected_clients == 0 {
41✔
1139
                    self.transition(ConnectivityStatus::Offline, min_peers);
37✔
1140
                } else {
37✔
1141
                    self.transition(ConnectivityStatus::Degraded(n), min_peers);
4✔
1142
                }
4✔
1143
            },
1144
            _ => unreachable!("num_connected is unsigned and only negative pattern covered on this branch"),
×
1145
        }
1146
    }
349✔
1147

1148
    #[cfg(not(feature = "metrics"))]
1149
    fn update_connectivity_metrics(&mut self) {}
1150

1151
    #[allow(clippy::cast_possible_wrap)]
1152
    #[cfg(feature = "metrics")]
1153
    fn update_connectivity_metrics(&mut self) {
349✔
1154
        use std::convert::TryFrom;
1155

1156
        use super::metrics;
1157

1158
        let total = self.pool.count_connected() as i64;
349✔
1159
        let num_inbound = self.pool.count_filtered(|state| match state.connection() {
780✔
1160
            Some(conn) => conn.is_connected() && conn.direction().is_inbound(),
780✔
1161
            None => false,
×
1162
        }) as i64;
780✔
1163

1164
        metrics::connections(ConnectionDirection::Inbound).set(num_inbound);
349✔
1165
        metrics::connections(ConnectionDirection::Outbound).set(total - num_inbound);
349✔
1166

1167
        let uptime = self
349✔
1168
            .uptime
349✔
1169
            .map(|ts| i64::try_from(ts.elapsed().as_secs()).unwrap_or(i64::MAX))
349✔
1170
            .unwrap_or(0);
349✔
1171
        metrics::uptime().set(uptime);
349✔
1172
    }
349✔
1173

1174
    fn transition(&mut self, next_status: ConnectivityStatus, required_num_peers: usize) {
349✔
1175
        use ConnectivityStatus::{Degraded, Offline, Online};
1176
        if self.status != next_status {
349✔
1177
            debug!(
240✔
1178
                target: LOG_TARGET,
×
1179
                "Connectivity status transitioning from {} to {}", self.status, next_status
1180
            );
1181
        }
109✔
1182

1183
        match (self.status, next_status) {
349✔
1184
            (Online(_), Online(_)) => {},
193✔
1185
            (_, Online(n)) => {
112✔
1186
                info!(
112✔
1187
                    target: LOG_TARGET,
×
1188
                    "Connectivity is ONLINE ({n}/{required_num_peers} connections)"
1189
                );
1190

1191
                #[cfg(feature = "metrics")]
1192
                if self.uptime.is_none() {
112✔
1193
                    self.uptime = Some(Instant::now());
2✔
1194
                }
110✔
1195
                self.publish_event(ConnectivityEvent::ConnectivityStateOnline(n));
112✔
1196
            },
1197
            (Degraded(m), Degraded(n)) => {
2✔
1198
                info!(
2✔
1199
                    target: LOG_TARGET,
×
1200
                    "Connectivity is DEGRADED ({n}/{required_num_peers} connections)"
1201
                );
1202
                if m != n {
2✔
1203
                    self.publish_event(ConnectivityEvent::ConnectivityStateDegraded(n));
1✔
1204
                }
1✔
1205
            },
1206
            (_, Degraded(n)) => {
5✔
1207
                info!(
5✔
1208
                    target: LOG_TARGET,
×
1209
                    "Connectivity is DEGRADED ({n}/{required_num_peers} connections)"
1210
                );
1211
                self.publish_event(ConnectivityEvent::ConnectivityStateDegraded(n));
5✔
1212
            },
1213
            (Offline, Offline) => {},
16✔
1214
            (_, Offline) => {
1215
                warn!(
21✔
1216
                    target: LOG_TARGET,
×
1217
                    "Connectivity is OFFLINE (0/{required_num_peers} connections)"
1218
                );
1219
                #[cfg(feature = "metrics")]
1220
                {
21✔
1221
                    self.uptime = None;
21✔
1222
                }
21✔
1223
                self.publish_event(ConnectivityEvent::ConnectivityStateOffline);
21✔
1224
            },
1225
            (status, next_status) => unreachable!("Unexpected status transition ({status} to {next_status})"),
×
1226
        }
1227
        self.status = next_status;
349✔
1228
    }
349✔
1229

1230
    fn publish_event(&mut self, event: ConnectivityEvent) {
607✔
1231
        // A send operation can only fail if there are no subscribers, so it is safe to ignore the error
1232
        let _result = self.event_tx.send(event);
607✔
1233
    }
607✔
1234

1235
    async fn ban_peer(
11✔
1236
        &mut self,
11✔
1237
        node_id: &NodeId,
11✔
1238
        duration: Duration,
11✔
1239
        reason: String,
11✔
1240
    ) -> Result<(), ConnectivityError> {
11✔
1241
        info!(
11✔
1242
            target: LOG_TARGET,
×
1243
            "Banning peer {} for {} because: {}",
1244
            node_id,
1245
            format_duration(duration),
×
1246
            reason
1247
        );
1248
        let ban_result = self.peer_manager.ban_peer_by_node_id(node_id, duration, reason).await;
11✔
1249

1250
        #[cfg(feature = "metrics")]
1251
        super::metrics::banned_peers_counter().inc();
11✔
1252

1253
        self.publish_event(ConnectivityEvent::PeerBanned(node_id.clone()));
11✔
1254

1255
        if let Some(conn) = self.pool.get_connection_mut(node_id) {
11✔
1256
            disconnect_with_timeout(conn, Minimized::Yes, None, "ConnectivityManagerActor ban peer").await?;
11✔
1257
            let status = self.pool.get_connection_status(node_id);
11✔
1258
            debug!(
11✔
1259
                target: LOG_TARGET,
×
1260
                "Disconnected banned peer {node_id}. The peer connection status is {status}"
1261
            );
1262
        }
×
1263
        ban_result?;
11✔
1264
        Ok(())
11✔
1265
    }
11✔
1266

1267
    async fn execute_proactive_dialing(&mut self, task_id: u64) -> Result<(), ConnectivityError> {
47✔
1268
        debug!(
47✔
1269
            target: LOG_TARGET,
×
1270
            "({}) Starting proactive dialing execution - current connections: {}, target: {}",
1271
            task_id,
1272
            self.pool.count_connected_nodes(),
×
1273
            self.config.target_connection_count
1274
        );
1275

1276
        // First, clean up old health data to keep metrics accurate
1277
        for stats in self.connection_stats.values_mut() {
77✔
1278
            stats.cleanup_old_health_data(self.config.success_rate_tracking_window);
77✔
1279
        }
77✔
1280

1281
        // Update circuit breaker metrics
1282
        self.update_circuit_breaker_metrics();
47✔
1283

1284
        self.refresh_seeds_list().await;
47✔
1285

1286
        // Determine if we should exclude seeds.
1287
        let excluded_peers = if self.pool.count_connected_nodes() < self.config.min_connectivity {
47✔
1288
            debug!(target: LOG_TARGET, "({}) Critical connectivity level ({} < {}). Allowing proactive dialer to retry Seed Nodes.",
×
1289
                task_id,
1290
                self.pool.count_connected_nodes(),
×
1291
                self.config.min_connectivity
1292
            );
1293
            vec![]
×
1294
        } else {
1295
            self.seeds.clone()
47✔
1296
        };
1297

1298
        // Execute proactive dialing logic
1299
        match self
47✔
1300
            .proactive_dialer
47✔
1301
            .execute_proactive_dialing(&self.pool, &self.connection_stats, &excluded_peers, task_id)
47✔
1302
            .await
47✔
1303
        {
1304
            Ok(dialed_count) => {
47✔
1305
                if dialed_count > 0 {
47✔
1306
                    debug!(
6✔
1307
                        target: LOG_TARGET,
×
1308
                        "({task_id}) Proactive dialing initiated {dialed_count} peer connections"
1309
                    );
1310
                }
41✔
1311
                Ok(())
47✔
1312
            },
1313
            Err(err) => {
×
1314
                error!(
×
1315
                    target: LOG_TARGET,
×
1316
                    "({task_id}) Proactive dialing failed: {err:?}"
1317

1318
                );
1319
                Err(err)
×
1320
            },
1321
        }
1322
    }
47✔
1323

1324
    fn update_circuit_breaker_metrics(&self) {
47✔
1325
        let _circuit_breaker_open_count = self
47✔
1326
            .connection_stats
47✔
1327
            .values()
47✔
1328
            .filter(|stats| stats.health_metrics().circuit_breaker_state().is_open())
77✔
1329
            .count();
47✔
1330

1331
        // Calculate average peer health score
1332
        if !self.connection_stats.is_empty() {
47✔
1333
            let total_health: f32 = self
47✔
1334
                .connection_stats
47✔
1335
                .values()
47✔
1336
                .map(|stats| stats.health_score(self.config.success_rate_tracking_window))
77✔
1337
                .sum();
47✔
1338
            let _avg_health = total_health / self.connection_stats.len() as f32;
47✔
1339
        }
×
1340
    }
47✔
1341

1342
    fn cleanup_connection_stats(&mut self) {
47✔
1343
        let mut to_remove = Vec::new();
47✔
1344
        for node_id in self.connection_stats.keys() {
78✔
1345
            let status = self.pool.get_connection_status(node_id);
78✔
1346
            if matches!(
77✔
1347
                status,
78✔
1348
                ConnectionStatus::NotConnected | ConnectionStatus::Failed | ConnectionStatus::Disconnected(_)
1349
            ) {
1✔
1350
                to_remove.push(node_id.clone());
1✔
1351
            }
77✔
1352
        }
1353
        for node_id in to_remove {
47✔
1354
            self.connection_stats.remove(&node_id);
1✔
1355
        }
1✔
1356
    }
47✔
1357
}
1358

1359
enum TieBreak {
1360
    None,
1361
    UseNew,
1362
    KeepExisting,
1363
}
1364

1365
async fn disconnect_with_timeout(
12✔
1366
    connection: &mut PeerConnection,
12✔
1367
    minimized: Minimized,
12✔
1368
    task_id: Option<u64>,
12✔
1369
    requester: &str,
12✔
1370
) -> Result<(), PeerConnectionError> {
12✔
1371
    match tokio::time::timeout(PEER_DISCONNECT_TIMEOUT, connection.disconnect(minimized, requester)).await {
12✔
1372
        Ok(res) => res,
12✔
1373
        Err(_) => {
1374
            warn!(
×
1375
                target: LOG_TARGET,
×
1376
                "Timeout disconnecting peer ({:?}) '{}'",
1377
                task_id,
1378
                connection.peer_node_id().short_str(),
×
1379
            );
1380
            Err(PeerConnectionError::DisconnectTimeout)
×
1381
        },
1382
    }
1383
}
12✔
1384

1385
async fn disconnect_if_unused_with_timeout(
×
1386
    connection: &mut PeerConnection,
×
1387
    minimized: Minimized,
×
1388
    task_id: Option<u64>,
×
1389
    requester: &str,
×
1390
) -> Result<(), PeerConnectionError> {
×
1391
    match tokio::time::timeout(
×
1392
        PEER_DISCONNECT_TIMEOUT,
1393
        connection.disconnect_if_unused(minimized, 0, 0, requester),
×
1394
    )
1395
    .await
×
1396
    {
1397
        Ok(res) => res,
×
1398
        Err(_) => {
1399
            warn!(
×
1400
                target: LOG_TARGET,
×
1401
                "Timeout disconnecting peer ({:?}) '{}'",
1402
                task_id,
1403
                connection.peer_node_id().short_str(),
×
1404
            );
1405
            Err(PeerConnectionError::DisconnectTimeout)
×
1406
        },
1407
    }
1408
}
×
1409

1410
async fn disconnect_silent_with_timeout(
93✔
1411
    connection: &mut PeerConnection,
93✔
1412
    minimized: Minimized,
93✔
1413
    task_id: Option<u64>,
93✔
1414
    requester: &str,
93✔
1415
) -> Result<(), PeerConnectionError> {
93✔
1416
    match tokio::time::timeout(
93✔
1417
        PEER_DISCONNECT_TIMEOUT,
1418
        connection.disconnect_silent(minimized, requester),
93✔
1419
    )
1420
    .await
93✔
1421
    {
1422
        Ok(res) => res,
79✔
1423
        Err(_) => {
1424
            warn!(
×
1425
                target: LOG_TARGET,
×
1426
                "Timeout disconnecting peer ({:?}) '{}'",
1427
                task_id,
1428
                connection.peer_node_id().short_str(),
×
1429
            );
1430
            Err(PeerConnectionError::DisconnectTimeout)
×
1431
        },
1432
    }
1433
}
79✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc