• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 18402812670

10 Oct 2025 09:44AM UTC coverage: 59.552% (+1.0%) from 58.589%
18402812670

push

github

web-flow
chore(ci): remove bitnami docker images for debian slim images and minor updates (#7543)

Description
remove bitnami docker images for debian slim images
minor updates and tor version bump

Motivation and Context
Move away from bitnami docker images as they going pay for use

How Has This Been Tested?
Builds in local fork, runnings in local container


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- New Features
  - None
- Bug Fixes
  - None
- Chores
- Switched build and runtime base images to Debian slim variants for
improved consistency.
- Updated Rust toolchain to 1.90.0 and moved OS base to Debian “trixie.”
  - Bumped Tor package to 0.4.8.19-r0.
- Documentation
- Refreshed inline comments to reflect new base image sources and Tor
branch reference.
- Tests
  - None

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

67582 of 113484 relevant lines covered (59.55%)

304718.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.41
/comms/core/src/connectivity/manager.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22
use std::{
23
    collections::HashMap,
24
    fmt,
25
    sync::Arc,
26
    time::{Duration, Instant},
27
};
28

29
use log::*;
30
use nom::lib::std::collections::hash_map::Entry;
31
use tari_shutdown::ShutdownSignal;
32
use tokio::{
33
    sync::{mpsc, oneshot},
34
    task::JoinHandle,
35
    time,
36
    time::MissedTickBehavior,
37
};
38
use tracing::{span, Instrument, Level};
39

40
use super::{
41
    config::ConnectivityConfig,
42
    connection_pool::{ConnectionPool, ConnectionStatus},
43
    connection_stats::PeerConnectionStats,
44
    error::ConnectivityError,
45
    proactive_dialer::ProactiveDialer,
46
    requester::{ConnectivityEvent, ConnectivityRequest},
47
    selection::ConnectivitySelection,
48
    ConnectivityEventTx,
49
};
50
use crate::{
51
    connection_manager::{
52
        ConnectionDirection,
53
        ConnectionManagerError,
54
        ConnectionManagerEvent,
55
        ConnectionManagerRequester,
56
    },
57
    peer_manager::NodeId,
58
    utils::datetime::format_duration,
59
    Minimized,
60
    NodeIdentity,
61
    PeerConnection,
62
    PeerConnectionError,
63
    PeerManager,
64
};
65

66
const LOG_TARGET: &str = "comms::connectivity::manager";
67

68
// Maximum time allowed for refreshing the connection pool
69
const POOL_REFRESH_TIMEOUT: Duration = Duration::from_millis(2500);
70
// Maximum time allowed to disconnect a single peer
71
const PEER_DISCONNECT_TIMEOUT: Duration = Duration::from_millis(250);
72
// Warning threshold for request processing time
73
const ACCEPTABLE_CONNECTIVITY_REQUEST_PROCESSING_TIME: Duration = Duration::from_millis(500);
74
// Warning threshold for event processing time
75
const ACCEPTABLE_EVENT_PROCESSING_TIME: Duration = Duration::from_millis(500);
76

77
/// # Connectivity Manager
78
///
79
/// The ConnectivityManager actor is responsible for tracking the state of all peer
80
/// connections in the system and maintaining a _pool_ of peer connections.
81
///
82
/// It emits [ConnectivityEvent](crate::connectivity::ConnectivityEvent)s that can keep client components
83
/// in the loop with the state of the node's connectivity.
84
pub struct ConnectivityManager {
85
    pub config: ConnectivityConfig,
86
    pub request_rx: mpsc::Receiver<ConnectivityRequest>,
87
    pub event_tx: ConnectivityEventTx,
88
    pub connection_manager: ConnectionManagerRequester,
89
    pub peer_manager: Arc<PeerManager>,
90
    pub node_identity: Arc<NodeIdentity>,
91
    pub shutdown_signal: ShutdownSignal,
92
}
93

94
impl ConnectivityManager {
95
    pub fn spawn(self) -> JoinHandle<()> {
71✔
96
        let proactive_dialer = ProactiveDialer::new(
71✔
97
            self.config,
71✔
98
            self.connection_manager.clone(),
71✔
99
            self.peer_manager.clone(),
71✔
100
            self.node_identity.clone(),
71✔
101
        );
102

103
        ConnectivityManagerActor {
71✔
104
            config: self.config,
71✔
105
            status: ConnectivityStatus::Initializing,
71✔
106
            request_rx: self.request_rx,
71✔
107
            connection_manager: self.connection_manager,
71✔
108
            peer_manager: self.peer_manager.clone(),
71✔
109
            event_tx: self.event_tx,
71✔
110
            connection_stats: HashMap::new(),
71✔
111
            node_identity: self.node_identity,
71✔
112
            pool: ConnectionPool::new(),
71✔
113
            shutdown_signal: self.shutdown_signal,
71✔
114
            #[cfg(feature = "metrics")]
71✔
115
            uptime: Some(Instant::now()),
71✔
116
            allow_list: vec![],
71✔
117
            proactive_dialer,
71✔
118
            seeds: vec![],
71✔
119
        }
71✔
120
        .spawn()
71✔
121
    }
71✔
122
}
123

124
/// Node connectivity status.
125
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
126
pub enum ConnectivityStatus {
127
    /// Initial connectivity status before the Connectivity actor has initialized.
128
    #[default]
129
    Initializing,
130
    /// Connectivity is online.
131
    Online(usize),
132
    /// Connectivity is less than the required minimum, but some connections are still active.
133
    Degraded(usize),
134
    /// There are no active connections.
135
    Offline,
136
}
137

138
impl ConnectivityStatus {
139
    is_fn!(is_initializing, ConnectivityStatus::Initializing);
140

141
    is_fn!(is_online, ConnectivityStatus::Online(_));
142

143
    is_fn!(is_offline, ConnectivityStatus::Offline);
144

145
    is_fn!(is_degraded, ConnectivityStatus::Degraded(_));
146

147
    pub fn num_connected_nodes(&self) -> usize {
5✔
148
        use ConnectivityStatus::{Degraded, Initializing, Offline, Online};
149
        match self {
5✔
150
            Initializing | Offline => 0,
5✔
151
            Online(n) | Degraded(n) => *n,
×
152
        }
153
    }
5✔
154
}
155

156
impl fmt::Display for ConnectivityStatus {
157
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
158
        write!(f, "{self:?}")
×
159
    }
×
160
}
161

162
struct ConnectivityManagerActor {
163
    config: ConnectivityConfig,
164
    status: ConnectivityStatus,
165
    request_rx: mpsc::Receiver<ConnectivityRequest>,
166
    connection_manager: ConnectionManagerRequester,
167
    node_identity: Arc<NodeIdentity>,
168
    peer_manager: Arc<PeerManager>,
169
    event_tx: ConnectivityEventTx,
170
    connection_stats: HashMap<NodeId, PeerConnectionStats>,
171
    pool: ConnectionPool,
172
    shutdown_signal: ShutdownSignal,
173
    #[cfg(feature = "metrics")]
174
    uptime: Option<Instant>,
175
    allow_list: Vec<NodeId>,
176
    proactive_dialer: ProactiveDialer,
177
    seeds: Vec<NodeId>,
178
}
179

180
impl ConnectivityManagerActor {
181
    pub fn spawn(self) -> JoinHandle<()> {
71✔
182
        tokio::spawn(async { Self::run(self).await })
71✔
183
    }
71✔
184

185
    pub async fn run(mut self) {
71✔
186
        debug!(target: LOG_TARGET, "ConnectivityManager started");
71✔
187

188
        let mut connection_manager_events = self.connection_manager.get_event_subscription();
71✔
189

190
        let interval = self.config.connection_pool_refresh_interval;
71✔
191
        let mut connection_pool_timer = time::interval_at(
71✔
192
            Instant::now()
71✔
193
                .checked_add(interval)
71✔
194
                .expect("connection_pool_refresh_interval cause overflow")
71✔
195
                .into(),
71✔
196
            interval,
71✔
197
        );
198
        connection_pool_timer.set_missed_tick_behavior(MissedTickBehavior::Delay);
71✔
199

200
        self.publish_event(ConnectivityEvent::ConnectivityStateInitialized);
71✔
201

202
        loop {
203
            tokio::select! {
1,062✔
204
                Some(req) = self.request_rx.recv() => {
1,062✔
205
                    let timer = Instant::now();
777✔
206
                    let task_id = rand::random::<u64>();
777✔
207
                    trace!(target: LOG_TARGET, "Request ({task_id}): {req:?}");
777✔
208
                    self.handle_request(req).await;
777✔
209
                    if timer.elapsed() > ACCEPTABLE_CONNECTIVITY_REQUEST_PROCESSING_TIME {
777✔
210
                        warn!(
×
211
                            target: LOG_TARGET,
×
212
                            "Request ({}) took too long to process: {:.2?}",
×
213
                            task_id,
214
                            format_duration(timer.elapsed())
×
215
                        );
216
                    }
777✔
217
                    trace!(target: LOG_TARGET, "Request ({task_id}) done");
777✔
218
                },
219

220
                Ok(event) = connection_manager_events.recv() => {
1,062✔
221
                    let timer = Instant::now();
202✔
222
                    let task_id = rand::random::<u64>();
202✔
223
                    trace!(target: LOG_TARGET, "Event ({task_id}): {event:?}");
202✔
224
                    if let Err(err) = self.handle_connection_manager_event(&event).await {
202✔
225
                        error!(target:LOG_TARGET, "Error handling connection manager event ({task_id}): {err:?}");
×
226
                    }
202✔
227
                    if timer.elapsed() > ACCEPTABLE_EVENT_PROCESSING_TIME {
202✔
228
                        warn!(
×
229
                            target: LOG_TARGET,
×
230
                            "Event ({}) took too long to process: {:.2?}",
×
231
                            task_id,
232
                            format_duration(timer.elapsed())
×
233
                        );
234
                    }
202✔
235
                    trace!(target: LOG_TARGET, "Event ({task_id}) done");
202✔
236
                },
237

238
                _ = connection_pool_timer.tick() => {
1,062✔
239
                    let task_id = rand::random::<u64>();
12✔
240
                    trace!(target: LOG_TARGET, "Pool refresh peers task ({task_id})");
12✔
241
                    self.cleanup_connection_stats();
12✔
242
                    match tokio::time::timeout(POOL_REFRESH_TIMEOUT, self.refresh_connection_pool(task_id)).await {
12✔
243
                        Ok(res) => {
12✔
244
                            if let Err(err) = res {
12✔
245
                                error!(target: LOG_TARGET, "Error refreshing connection pools ({task_id}): {err:?}");
×
246
                            }
12✔
247
                        },
248
                        Err(_) => {
249
                            warn!(
×
250
                                target: LOG_TARGET,
×
251
                                "Pool refresh task ({task_id}) timeout",
×
252
                            );
253
                        },
254
                    }
255
                    trace!(target: LOG_TARGET, "Pool refresh task ({task_id}) done" );
12✔
256
                },
257

258
                _ = self.shutdown_signal.wait() => {
1,062✔
259
                    info!(
50✔
260
                        target: LOG_TARGET,
×
261
                        "ConnectivityManager is shutting down because it received the shutdown signal"
×
262
                    );
263
                    self.disconnect_all().await;
50✔
264
                    break;
45✔
265
                }
266
            }
267
        }
268
    }
45✔
269

270
    async fn handle_request(&mut self, req: ConnectivityRequest) {
777✔
271
        #[allow(clippy::enum_glob_use)]
272
        use ConnectivityRequest::*;
273
        match req {
777✔
274
            WaitStarted(reply) => {
32✔
275
                let _ = reply.send(());
32✔
276
            },
32✔
277
            GetConnectivityStatus(reply) => {
17✔
278
                let _ = reply.send(self.status);
17✔
279
            },
17✔
280
            DialPeer { node_id, reply_tx } => {
141✔
281
                let tracing_id = tracing::Span::current().id();
141✔
282
                let span = span!(Level::TRACE, "handle_dial_peer");
141✔
283
                span.follows_from(tracing_id);
141✔
284
                self.handle_dial_peer(node_id.clone(), reply_tx).instrument(span).await;
141✔
285
            },
286
            SelectConnections(selection, reply) => {
54✔
287
                let _result = reply.send(self.select_connections(selection));
54✔
288
            },
54✔
289
            GetConnection(node_id, reply) => {
48✔
290
                let _result = reply.send(
48✔
291
                    self.pool
48✔
292
                        .get(&node_id)
48✔
293
                        .filter(|c| c.status() == ConnectionStatus::Connected)
48✔
294
                        .and_then(|c| c.connection())
48✔
295
                        .filter(|conn| conn.is_connected())
48✔
296
                        .cloned(),
48✔
297
                );
298
            },
299
            GetPeerStats(node_id, reply) => {
×
300
                let peer = match self.peer_manager.find_by_node_id(&node_id).await {
×
301
                    Ok(v) => v,
×
302
                    Err(e) => {
×
303
                        error!(target: LOG_TARGET, "Error when retrieving peer: {e:?}");
×
304
                        None
×
305
                    },
306
                };
307
                let _result = reply.send(peer);
×
308
            },
309
            GetAllConnectionStates(reply) => {
1✔
310
                let states = self.pool.all().into_iter().cloned().collect();
1✔
311
                let _result = reply.send(states);
1✔
312
            },
1✔
313
            GetMinimizeConnectionsThreshold(reply) => {
×
314
                let minimize_connections_threshold = self.config.maintain_n_closest_connections_only;
×
315
                let _result = reply.send(minimize_connections_threshold);
×
316
            },
×
317
            BanPeer(node_id, duration, reason) => {
5✔
318
                if self.allow_list.contains(&node_id) {
5✔
319
                    info!(
×
320
                        target: LOG_TARGET,
×
321
                        "Peer is excluded from being banned as it was found in the AllowList, NodeId: {node_id:?}"
×
322
                    );
323
                } else if let Err(err) = self.ban_peer(&node_id, duration, reason).await {
5✔
324
                    error!(target: LOG_TARGET, "Error when banning peer: {err:?}");
×
325
                } else {
5✔
326
                    // we banned the peer
5✔
327
                }
5✔
328
            },
329
            AddPeerToAllowList(node_id) => {
×
330
                if !self.allow_list.contains(&node_id) {
×
331
                    self.allow_list.push(node_id.clone());
×
332
                }
×
333
            },
334
            RemovePeerFromAllowList(node_id) => {
×
335
                if let Some(index) = self.allow_list.iter().position(|x| *x == node_id) {
×
336
                    self.allow_list.remove(index);
×
337
                }
×
338
            },
339
            GetAllowList(reply) => {
477✔
340
                let allow_list = self.allow_list.clone();
477✔
341
                let _result = reply.send(allow_list);
477✔
342
            },
477✔
343
            GetSeeds(reply) => {
×
344
                let seeds = self.peer_manager.get_seed_peers().await.unwrap_or_else(|e| {
×
345
                    error!(target: LOG_TARGET, "Error when retrieving seed peers: {e:?}");
×
346
                    vec![]
×
347
                });
×
348
                let _result = reply.send(seeds);
×
349
            },
350
            GetActiveConnections(reply) => {
2✔
351
                let _result = reply.send(
2✔
352
                    self.pool
2✔
353
                        .filter_connection_states(|s| s.is_connected())
20✔
354
                        .into_iter()
2✔
355
                        .cloned()
2✔
356
                        .collect(),
2✔
357
                );
358
            },
359
            GetNodeIdentity(reply) => {
×
360
                let identity = self.node_identity.as_ref();
×
361
                let _result = reply.send(identity.clone());
×
362
            },
×
363
        }
364
    }
777✔
365

366
    async fn handle_dial_peer(
141✔
367
        &mut self,
141✔
368
        node_id: NodeId,
141✔
369
        reply_tx: Option<oneshot::Sender<Result<PeerConnection, ConnectionManagerError>>>,
141✔
370
    ) {
141✔
371
        match self.peer_manager.is_peer_banned(&node_id).await {
141✔
372
            Ok(true) => {
373
                if let Some(reply) = reply_tx {
×
374
                    let _result = reply.send(Err(ConnectionManagerError::PeerBanned));
×
375
                }
×
376
                return;
×
377
            },
378
            Ok(false) => {},
141✔
379
            Err(err) => {
×
380
                if let Some(reply) = reply_tx {
×
381
                    let _result = reply.send(Err(err.into()));
×
382
                }
×
383
                return;
×
384
            },
385
        }
386
        match self.pool.get(&node_id) {
141✔
387
            // The connection pool may temporarily contain a connection that is not connected so we need to check this.
388
            Some(state) if state.is_connected() => {
63✔
389
                if let Some(reply_tx) = reply_tx {
52✔
390
                    let _result = reply_tx.send(Ok(state.connection().cloned().expect("Already checked")));
35✔
391
                }
35✔
392
            },
393
            maybe_state => {
89✔
394
                match maybe_state {
89✔
395
                    Some(state) => {
11✔
396
                        info!(
11✔
397
                            target: LOG_TARGET,
×
398
                            "Connection was previously attempted for peer {}. Current status is '{}'. Dialing again...",
×
399
                            node_id.short_str(),
×
400
                            state.status()
×
401
                        );
402
                    },
403
                    None => {
404
                        info!(
78✔
405
                            target: LOG_TARGET,
×
406
                            "No connection for peer {}. Dialing...",
×
407
                            node_id.short_str(),
×
408
                        );
409
                    },
410
                }
411

412
                if let Err(err) = self.connection_manager.send_dial_peer(node_id, reply_tx).await {
89✔
413
                    error!(
×
414
                        target: LOG_TARGET,
×
415
                        "Failed to send dial request to connection manager: {err:?}"
×
416
                    );
417
                }
89✔
418
            },
419
        }
420
    }
141✔
421

422
    async fn disconnect_all(&mut self) {
50✔
423
        let mut node_ids = Vec::with_capacity(self.pool.count_connected());
50✔
424
        for mut state in self.pool.filter_drain(|_| true) {
75✔
425
            if let Some(conn) = state.connection_mut() {
75✔
426
                if !conn.is_connected() {
75✔
427
                    continue;
27✔
428
                }
48✔
429
                match disconnect_silent_with_timeout(
48✔
430
                    conn,
48✔
431
                    Minimized::No,
48✔
432
                    None,
48✔
433
                    "ConnectivityManagerActor disconnect all",
48✔
434
                )
435
                .await
48✔
436
                {
437
                    Ok(_) => {
42✔
438
                        node_ids.push(conn.peer_node_id().clone());
42✔
439
                    },
42✔
440
                    Err(err) => {
1✔
441
                        debug!(
1✔
442
                            target: LOG_TARGET,
×
443
                            "In disconnect_all: Error when disconnecting peer '{}' because '{:?}'",
×
444
                            conn.peer_node_id().short_str(),
×
445
                            err
446
                        );
447
                    },
448
                }
449
            }
×
450
        }
451

452
        for node_id in node_ids {
86✔
453
            self.publish_event(ConnectivityEvent::PeerDisconnected(node_id, Minimized::No));
41✔
454
        }
41✔
455
    }
45✔
456

457
    async fn refresh_connection_pool(&mut self, task_id: u64) -> Result<(), ConnectivityError> {
12✔
458
        info!(
12✔
459
            target: LOG_TARGET,
×
460
            "CONNECTIVITY_REFRESH: Performing connection pool cleanup/refresh ({}). (#Peers = {}, #Connected={}, #Failed={}, #Disconnected={}, \
×
461
             #Clients={})",
×
462
            task_id,
463
            self.pool.count_entries(),
×
464
            self.pool.count_connected_nodes(),
×
465
            self.pool.count_failed(),
×
466
            self.pool.count_disconnected(),
×
467
            self.pool.count_connected_clients()
×
468
        );
469

470
        self.clean_connection_pool();
12✔
471
        if self.config.is_connection_reaping_enabled {
12✔
472
            self.reap_inactive_connections(task_id).await;
12✔
473
        }
×
474
        if let Some(threshold) = self.config.maintain_n_closest_connections_only {
12✔
475
            self.maintain_n_closest_peer_connections_only(threshold, task_id).await;
×
476
        }
12✔
477

478
        // Execute proactive dialing logic (if enabled)
479
        debug!(
12✔
480
            target: LOG_TARGET,
×
481
            "({}) Proactive dialing config check: enabled={}, target_connections={}",
×
482
            task_id,
483
            self.config.proactive_dialing_enabled,
484
            self.config.target_connection_count
485
        );
486

487
        if self.config.proactive_dialing_enabled {
12✔
488
            debug!(
12✔
489
                target: LOG_TARGET,
×
490
                "({task_id}) Executing proactive dialing logic"
×
491
            );
492
            if let Err(err) = self.execute_proactive_dialing(task_id).await {
12✔
493
                warn!(
×
494
                    target: LOG_TARGET,
×
495
                    "({task_id}) Proactive dialing failed: {err:?}"
×
496
                );
497
            }
12✔
498
        } else {
499
            debug!(
×
500
                target: LOG_TARGET,
×
501
                "({task_id}) Proactive dialing disabled in configuration"
×
502

503
            );
504
        }
505

506
        self.update_connectivity_status();
12✔
507
        self.update_connectivity_metrics();
12✔
508
        Ok(())
12✔
509
    }
12✔
510

511
    async fn maintain_n_closest_peer_connections_only(&mut self, threshold: usize, task_id: u64) {
×
512
        let start = Instant::now();
×
513
        // Select all active peer connections (that are communication nodes) with health-aware selection
514
        let selection = ConnectivitySelection::healthy_closest_to(
×
515
            self.node_identity.node_id().clone(),
×
516
            self.pool.count_connected_nodes(),
×
517
            vec![],
×
518
        );
519
        let mut connections = match self.select_connections_with_health(selection) {
×
520
            Ok(peers) => peers,
×
521
            Err(e) => {
×
522
                warn!(
×
523
                    target: LOG_TARGET,
×
524
                    "Connectivity error trying to maintain {threshold} closest peers ({task_id}) ({e:?})",
×
525
                );
526
                return;
×
527
            },
528
        };
529
        let num_connections = connections.len();
×
530

531
        // Remove peers that are on the allow list
532
        connections.retain(|conn| !self.allow_list.contains(conn.peer_node_id()));
×
533
        debug!(
×
534
            target: LOG_TARGET,
×
535
            "minimize_connections: ({}) Filtered peers: {}, Handles: {}",
×
536
            task_id,
537
            connections.len(),
×
538
            num_connections,
539
        );
540

541
        // Disconnect all remaining peers above the threshold
542
        let len = connections.len();
×
543
        for conn in connections.iter_mut().skip(threshold) {
×
544
            debug!(
×
545
                target: LOG_TARGET,
×
546
                "minimize_connections: ({}) Disconnecting '{}' because the node is not among the {} closest peers",
×
547
                task_id,
548
                conn.peer_node_id(),
×
549
                threshold
550
            );
551
            match disconnect_if_unused_with_timeout(
×
552
                conn,
×
553
                Minimized::Yes,
×
554
                Some(task_id),
×
555
                "ConnectivityManagerActor maintain closest",
×
556
            )
557
            .await
×
558
            {
559
                Ok(_) => {
×
560
                    self.pool.remove(conn.peer_node_id());
×
561
                },
×
562
                Err(err) => {
×
563
                    debug!(
×
564
                        target: LOG_TARGET,
×
565
                        "Peer '{}' already disconnected ({:?}). Error: {:?}",
×
566
                        conn.peer_node_id().short_str(),
×
567
                        task_id,
568
                        err
569
                    );
570
                },
571
            }
572
        }
573
        if len > 0 {
×
574
            debug!(
×
575
                "minimize_connections: ({}) Minimized {} connections in {:.2?}",
×
576
                task_id,
577
                len,
578
                start.elapsed()
×
579
            );
580
        }
×
581
    }
×
582

583
    async fn reap_inactive_connections(&mut self, task_id: u64) {
12✔
584
        let start = Instant::now();
12✔
585
        let excess_connections = self
12✔
586
            .pool
12✔
587
            .count_connected()
12✔
588
            .saturating_sub(self.config.reaper_min_connection_threshold);
12✔
589
        if excess_connections == 0 {
12✔
590
            return;
12✔
591
        }
×
592

593
        let mut connections = self
×
594
            .pool
×
595
            .get_inactive_outbound_connections_mut(self.config.reaper_min_inactive_age);
×
596
        connections.truncate(excess_connections);
×
597
        let mut nodes_to_remove = Vec::new();
×
598
        for conn in &mut connections {
×
599
            if !conn.is_connected() {
×
600
                continue;
×
601
            }
×
602

603
            debug!(
×
604
                target: LOG_TARGET,
×
605
                "({}) Disconnecting '{}' because connection was inactive ({} handles)",
×
606
                task_id,
607
                conn.peer_node_id().short_str(),
×
608
                conn.handle_count()
×
609
            );
610
            match disconnect_with_timeout(
×
611
                conn,
×
612
                Minimized::Yes,
×
613
                Some(task_id),
×
614
                "ConnectivityManagerActor reap inactive",
×
615
            )
616
            .await
×
617
            {
618
                Ok(_) => {
×
619
                    nodes_to_remove.push(conn.peer_node_id().clone());
×
620
                },
×
621
                Err(err) => {
×
622
                    debug!(
×
623
                        target: LOG_TARGET,
×
624
                        "Peer '{}' already disconnected ({:?}). Error: {:?}",
×
625
                        conn.peer_node_id().short_str(),
×
626
                        task_id,
627
                        err
628
                    );
629
                },
630
            }
631
        }
632
        let len = nodes_to_remove.len();
×
633
        if len > 0 {
×
634
            for node_id in nodes_to_remove {
×
635
                self.pool.remove(&node_id);
×
636
            }
×
637
            debug!(
×
638
                "({}) Reaped {} inactive connections in {:.2?}",
×
639
                task_id,
640
                len,
641
                start.elapsed()
×
642
            );
643
        }
×
644
    }
12✔
645

646
    fn clean_connection_pool(&mut self) {
12✔
647
        let cleared_states = self.pool.filter_drain(|state| {
12✔
648
            matches!(
12✔
649
                state.status(),
12✔
650
                ConnectionStatus::Failed | ConnectionStatus::Disconnected(_)
651
            )
652
        });
12✔
653

654
        if !cleared_states.is_empty() {
12✔
655
            debug!(
×
656
                target: LOG_TARGET,
×
657
                "Cleared connection states: {}",
×
658
                cleared_states
×
659
                    .iter()
×
660
                    .map(ToString::to_string)
×
661
                    .collect::<Vec<_>>()
×
662
                    .join(",")
×
663
            )
664
        }
12✔
665
    }
12✔
666

667
    fn select_connections(&self, selection: ConnectivitySelection) -> Result<Vec<PeerConnection>, ConnectivityError> {
54✔
668
        trace!(target: LOG_TARGET, "Selection query: {selection:?}");
54✔
669
        trace!(
54✔
670
            target: LOG_TARGET,
×
671
            "Selecting from {} connected node peers",
×
672
            self.pool.count_connected_nodes()
×
673
        );
674

675
        let conns = selection.select(&self.pool);
54✔
676
        debug!(target: LOG_TARGET, "Selected {} connections(s)", conns.len());
54✔
677

678
        Ok(conns.into_iter().cloned().collect())
54✔
679
    }
54✔
680

681
    fn select_connections_with_health(
×
682
        &self,
×
683
        selection: ConnectivitySelection,
×
684
    ) -> Result<Vec<PeerConnection>, ConnectivityError> {
×
685
        trace!(target: LOG_TARGET, "Health-aware selection query: {selection:?}");
×
686
        trace!(
×
687
            target: LOG_TARGET,
×
688
            "Selecting from {} connected node peers with health metrics",
×
689
            self.pool.count_connected_nodes()
×
690
        );
691

692
        let conns = selection.select_with_health(
×
693
            &self.pool,
×
694
            &self.connection_stats,
×
695
            self.config.success_rate_tracking_window,
×
696
        );
697
        debug!(target: LOG_TARGET, "Selected {} healthy connections(s)", conns.len());
×
698

699
        Ok(conns.into_iter().cloned().collect())
×
700
    }
×
701

702
    fn get_connection_stat_mut(&mut self, node_id: NodeId) -> &mut PeerConnectionStats {
140✔
703
        match self.connection_stats.entry(node_id) {
140✔
704
            Entry::Occupied(entry) => entry.into_mut(),
10✔
705
            Entry::Vacant(entry) => entry.insert(PeerConnectionStats::new()),
130✔
706
        }
707
    }
140✔
708

709
    fn mark_connection_success(&mut self, node_id: NodeId) {
137✔
710
        let entry = self.get_connection_stat_mut(node_id);
137✔
711
        entry.set_connection_success();
137✔
712

713
        // Update proactive dialing success metrics
714
    }
137✔
715

716
    fn mark_peer_failed(&mut self, node_id: NodeId) -> usize {
3✔
717
        let threshold = self.config.circuit_breaker_failure_threshold;
3✔
718
        let entry = self.get_connection_stat_mut(node_id);
3✔
719
        entry.set_connection_failed_with_threshold(threshold);
3✔
720

721
        entry.failed_attempts()
3✔
722
    }
3✔
723

724
    async fn on_peer_connection_failure(&mut self, node_id: &NodeId) -> Result<(), ConnectivityError> {
4✔
725
        if self.status.is_offline() {
4✔
726
            info!(
1✔
727
                target: LOG_TARGET,
×
728
                "Node is offline. Ignoring connection failure event for peer '{node_id}'."
×
729
            );
730
            self.publish_event(ConnectivityEvent::ConnectivityStateOffline);
1✔
731
            return Ok(());
1✔
732
        }
3✔
733

734
        let num_failed = self.mark_peer_failed(node_id.clone());
3✔
735

736
        if num_failed >= self.config.max_failures_mark_offline {
3✔
737
            debug!(
3✔
738
                target: LOG_TARGET,
×
739
                "Marking peer '{}' as offline because this node failed to connect to them {} times",
×
740
                node_id.short_str(),
×
741
                num_failed
742
            );
743

744
            if let Some(peer) = self.peer_manager.find_by_node_id(node_id).await? {
3✔
745
                if !peer.is_banned() &&
3✔
746
                    peer.last_seen_since()
3✔
747
                        // Haven't seen them in expire_peer_last_seen_duration
748
                        .map(|t| t > self.config.expire_peer_last_seen_duration)
3✔
749
                        // Or don't delete if never seen
750
                        .unwrap_or(false)
3✔
751
                {
752
                    debug!(
×
753
                        target: LOG_TARGET,
×
754
                        "Peer `{}` was marked as offline after {} attempts (last seen: {}). Removing peer from peer \
×
755
                         list",
×
756
                        node_id,
757
                        num_failed,
758
                        peer.last_seen_since()
×
759
                            .map(|d| format!("{}s ago", d.as_secs()))
×
760
                            .unwrap_or_else(|| "Never".to_string()),
×
761
                    );
762
                    self.peer_manager.soft_delete_peer(node_id).await?;
×
763
                }
3✔
764
            }
×
765
        }
×
766

767
        Ok(())
3✔
768
    }
4✔
769

770
    async fn handle_connection_manager_event(
202✔
771
        &mut self,
202✔
772
        event: &ConnectionManagerEvent,
202✔
773
    ) -> Result<(), ConnectivityError> {
202✔
774
        self.update_state_on_connectivity_event(event).await?;
202✔
775
        self.update_connectivity_status();
202✔
776
        self.update_connectivity_metrics();
202✔
777
        Ok(())
202✔
778
    }
202✔
779

780
    #[allow(clippy::too_many_lines)]
781
    async fn update_state_on_connectivity_event(
202✔
782
        &mut self,
202✔
783
        event: &ConnectionManagerEvent,
202✔
784
    ) -> Result<(), ConnectivityError> {
202✔
785
        use ConnectionManagerEvent::*;
786
        match event {
202✔
787
            PeerConnected(new_conn) => {
143✔
788
                match self.on_new_connection(new_conn).await {
143✔
789
                    TieBreak::KeepExisting => {
790
                        debug!(
6✔
791
                            target: LOG_TARGET,
×
792
                            "Discarding new connection to peer '{}' because we already have an existing connection",
×
793
                            new_conn.peer_node_id().short_str()
×
794
                        );
795
                        // Ignore event, we discarded the new connection and keeping the current one
796
                        return Ok(());
6✔
797
                    },
798
                    TieBreak::UseNew | TieBreak::None => {},
137✔
799
                }
800
            },
801
            PeerDisconnected(id, node_id, _minimized) => {
53✔
802
                if let Some(conn) = self.pool.get_connection(node_id) {
53✔
803
                    if conn.id() != *id {
53✔
804
                        debug!(
10✔
805
                            target: LOG_TARGET,
×
806
                            "Ignoring peer disconnected event for stale peer connection (id: {id}) for peer '{node_id}'"
×
807

808
                        );
809
                        return Ok(());
10✔
810
                    }
43✔
811
                }
×
812
            },
813
            PeerViolation { peer_node_id, details } => {
×
814
                self.ban_peer(
×
815
                    peer_node_id,
×
816
                    Duration::from_secs(2 * 60 * 60),
×
817
                    format!("Peer violation: {details}"),
×
818
                )
×
819
                .await?;
×
820
                return Ok(());
×
821
            },
822
            _ => {},
6✔
823
        }
824

825
        let (node_id, mut new_status, connection) = match event {
184✔
826
            PeerDisconnected(_, node_id, minimized) => (node_id, ConnectionStatus::Disconnected(*minimized), None),
43✔
827
            PeerConnected(conn) => (conn.peer_node_id(), ConnectionStatus::Connected, Some(conn.clone())),
137✔
828
            PeerConnectFailed(node_id, ConnectionManagerError::AllPeerAddressesAreExcluded(msg)) => {
×
829
                debug!(
×
830
                    target: LOG_TARGET,
×
831
                    "Peer '{node_id}' contains only excluded addresses ({msg})"
×
832

833
                );
834
                (node_id, ConnectionStatus::Failed, None)
×
835
            },
836
            PeerConnectFailed(node_id, ConnectionManagerError::NoiseHandshakeError(msg)) => {
×
837
                if let Some(conn) = self.pool.get_connection(node_id) {
×
838
                    debug!(
×
839
                        target: LOG_TARGET,
×
840
                        "Handshake error to peer '{node_id}', disconnecting for a fresh retry ({msg})"
×
841
                    );
842
                    let mut conn = conn.clone();
×
843
                    disconnect_with_timeout(
×
844
                        &mut conn,
×
845
                        Minimized::No,
×
846
                        None,
×
847
                        "ConnectivityManagerActor peer connect failed",
×
848
                    )
×
849
                    .await?;
×
850
                }
×
851
                (node_id, ConnectionStatus::Failed, None)
×
852
            },
853
            PeerConnectFailed(node_id, ConnectionManagerError::DialCancelled) => {
×
854
                if let Some(conn) = self.pool.get_connection(node_id) {
×
855
                    if conn.is_connected() && conn.direction().is_inbound() {
×
856
                        debug!(
×
857
                            target: LOG_TARGET,
×
858
                            "Ignoring DialCancelled({node_id}) event because an inbound connection already exists"
×
859
                        );
860

861
                        return Ok(());
×
862
                    }
×
863
                }
×
864
                debug!(
×
865
                    target: LOG_TARGET,
×
866
                    "Dial was cancelled before connection completed to peer '{node_id}'"
×
867
                );
868
                (node_id, ConnectionStatus::Failed, None)
×
869
            },
870
            PeerConnectFailed(node_id, err) => {
4✔
871
                debug!(
4✔
872
                    target: LOG_TARGET,
×
873
                    "Connection to peer '{node_id}' failed because '{err:?}'"
×
874
                );
875
                self.on_peer_connection_failure(node_id).await?;
4✔
876
                (node_id, ConnectionStatus::Failed, None)
4✔
877
            },
878
            _ => return Ok(()),
2✔
879
        };
880

881
        let old_status = self.pool.set_status(node_id, new_status);
184✔
882
        if let Some(conn) = connection {
184✔
883
            new_status = self.pool.insert_connection(*conn);
137✔
884
        }
137✔
885
        if old_status != new_status {
184✔
886
            debug!(
183✔
887
                target: LOG_TARGET,
×
888
                "Peer connection for node '{node_id}' transitioned from {old_status} to {new_status}"
×
889
            );
890
        }
1✔
891

892
        let node_id = node_id.clone();
184✔
893

894
        use ConnectionStatus::{Connected, Disconnected, Failed};
895
        match (old_status, new_status) {
184✔
896
            (_, Connected) => match self.pool.get_connection_mut(&node_id).cloned() {
137✔
897
                Some(conn) => {
137✔
898
                    self.mark_connection_success(conn.peer_node_id().clone());
137✔
899
                    self.publish_event(ConnectivityEvent::PeerConnected(conn.into()));
137✔
900
                },
137✔
901
                None => unreachable!(
×
902
                    "Connection transitioning to CONNECTED state must always have a connection set i.e. \
903
                     ConnectionPool::get_connection is Some"
904
                ),
905
            },
906
            (Connected, Disconnected(..)) => {
907
                self.publish_event(ConnectivityEvent::PeerDisconnected(node_id, match new_status {
38✔
908
                    ConnectionStatus::Disconnected(reason) => reason,
38✔
909
                    _ => Minimized::No,
×
910
                }));
911
            },
912
            // Was not connected so don't broadcast event
913
            (_, Disconnected(..)) => {},
5✔
914
            (_, Failed) => {
4✔
915
                self.publish_event(ConnectivityEvent::PeerConnectFailed(node_id));
4✔
916
            },
4✔
917
            _ => {
918
                error!(
×
919
                    target: LOG_TARGET,
×
920
                    "Unexpected connection status transition ({old_status} to {new_status}) for peer '{node_id}'"
×
921
                );
922
            },
923
        }
924

925
        Ok(())
184✔
926
    }
202✔
927

928
    async fn on_new_connection(&mut self, new_conn: &PeerConnection) -> TieBreak {
143✔
929
        match self.pool.get(new_conn.peer_node_id()).cloned() {
143✔
930
            Some(existing_state) if !existing_state.is_connected() => {
14✔
931
                debug!(
4✔
932
                    target: LOG_TARGET,
×
933
                    "Tie break: Existing connection (id: {}, peer: {}, direction: {}) was not connected, resolving \
×
934
                     tie break by using the new connection. (New: id: {}, peer: {}, direction: {})",
×
935
                    existing_state.connection().map(|c| c.id()).unwrap_or_default(),
×
936
                    existing_state.node_id(),
×
937
                    existing_state.connection().map(|c| c.direction().as_str()).unwrap_or("--"),
×
938
                    new_conn.id(),
×
939
                    new_conn.peer_node_id(),
×
940
                    new_conn.direction(),
×
941
                );
942
                self.pool.remove(existing_state.node_id());
4✔
943
                TieBreak::UseNew
4✔
944
            },
945
            Some(mut existing_state) => {
10✔
946
                let Some(existing_conn) = existing_state.connection_mut() else {
10✔
947
                    error!(
×
948
                        target: LOG_TARGET,
×
949
                        "INVARIANT ERROR in Tie break: PeerConnection is None but state is CONNECTED: Existing \
×
950
                        connection (id: {}, peer: {}, direction: {}), new connection. (id: {}, peer: {}, direction: {})",
×
951
                        existing_state.connection().map(|c| c.id()).unwrap_or_default(),
×
952
                        existing_state.node_id(),
×
953
                        existing_state.connection().map(|c| c.direction().as_str()).unwrap_or("--"),
×
954
                        new_conn.id(),
×
955
                        new_conn.peer_node_id(),
×
956
                        new_conn.direction(),
×
957
                    );
958
                    return TieBreak::UseNew;
×
959
                };
960
                if self.tie_break_existing_connection(existing_conn, new_conn) {
10✔
961
                    info!(
4✔
962
                        target: LOG_TARGET,
×
963
                        "Tie break: Keep new connection (id: {}, peer: {}, direction: {}). Disconnect existing \
×
964
                         connection (id: {}, peer: {}, direction: {})",
×
965
                        new_conn.id(),
×
966
                        new_conn.peer_node_id(),
×
967
                        new_conn.direction(),
×
968
                        existing_conn.id(),
×
969
                        existing_conn.peer_node_id(),
×
970
                        existing_conn.direction(),
×
971
                    );
972

973
                    let _result = disconnect_silent_with_timeout(
4✔
974
                        existing_conn,
4✔
975
                        Minimized::Yes,
4✔
976
                        None,
4✔
977
                        "ConnectivityManagerActor tie break",
4✔
978
                    )
4✔
979
                    .await;
4✔
980
                    self.pool.remove(existing_conn.peer_node_id());
4✔
981
                    TieBreak::UseNew
4✔
982
                } else {
983
                    debug!(
6✔
984
                        target: LOG_TARGET,
×
985
                        "Tie break: Keeping existing connection (id: {}, peer: {}, direction: {}). Disconnecting new \
×
986
                         connection (id: {}, peer: {}, direction: {})",
×
987
                        new_conn.id(),
×
988
                        new_conn.peer_node_id(),
×
989
                        new_conn.direction(),
×
990
                        existing_conn.id(),
×
991
                        existing_conn.peer_node_id(),
×
992
                        existing_conn.direction(),
×
993
                    );
994

995
                    let _result = disconnect_silent_with_timeout(
6✔
996
                        &mut new_conn.clone(),
6✔
997
                        Minimized::Yes,
6✔
998
                        None,
6✔
999
                        "ConnectivityManagerActor tie break",
6✔
1000
                    )
6✔
1001
                    .await;
6✔
1002
                    TieBreak::KeepExisting
6✔
1003
                }
1004
            },
1005

1006
            None => TieBreak::None,
129✔
1007
        }
1008
    }
143✔
1009

1010
    /// Two connections to the same peer have been created. This function deterministically determines which peer
1011
    /// connection to close. It does this by comparing our NodeId to that of the peer. This rule enables both sides to
1012
    /// agree which connection to disconnect
1013
    ///
1014
    /// Returns true if the existing connection should close, otherwise false if the new connection should be closed.
1015
    fn tie_break_existing_connection(&self, existing_conn: &PeerConnection, new_conn: &PeerConnection) -> bool {
10✔
1016
        debug_assert_eq!(existing_conn.peer_node_id(), new_conn.peer_node_id());
10✔
1017
        let peer_node_id = existing_conn.peer_node_id();
10✔
1018
        let our_node_id = self.node_identity.node_id();
10✔
1019

1020
        debug!(
10✔
1021
            target: LOG_TARGET,
×
1022
            "Tie-break: (Existing = {}, New = {})",
×
1023
            existing_conn.direction(),
×
1024
            new_conn.direction()
×
1025
        );
1026
        use ConnectionDirection::{Inbound, Outbound};
1027
        match (existing_conn.direction(), new_conn.direction()) {
10✔
1028
            // They connected to us twice for some reason. Drop the older connection
1029
            (Inbound, Inbound) => true,
2✔
1030
            // They connected to us at the same time we connected to them
1031
            (Inbound, Outbound) => peer_node_id > our_node_id,
3✔
1032
            // We connected to them at the same time as they connected to us
1033
            (Outbound, Inbound) => our_node_id > peer_node_id,
3✔
1034
            // We connected to them twice for some reason. Drop the older connection.
1035
            (Outbound, Outbound) => true,
2✔
1036
        }
1037
    }
10✔
1038

1039
    fn update_connectivity_status(&mut self) {
214✔
1040
        // The contract we are making with online/degraded status transitions is as follows:
1041
        // - If min_connectivity peers are connected we MUST transition to ONLINE
1042
        // - Clients SHOULD tolerate entering a DEGRADED/OFFLINE status
1043
        // - If a number of peers disconnect or the local system's network goes down, the status MAY transition to
1044
        //   DEGRADED
1045
        let min_peers = self.config.min_connectivity;
214✔
1046
        let num_connected_nodes = self.pool.count_connected_nodes();
214✔
1047
        let num_connected_clients = self.pool.count_connected_clients();
214✔
1048
        debug!(
214✔
1049
            target: LOG_TARGET,
×
1050
            "#min_peers = {min_peers}, #nodes = {num_connected_nodes}, #clients = {num_connected_clients}"
×
1051
        );
1052

1053
        match num_connected_nodes {
214✔
1054
            n if n >= min_peers => {
214✔
1055
                self.transition(ConnectivityStatus::Online(n), min_peers);
192✔
1056
            },
192✔
1057
            n if n > 0 && n < min_peers => {
22✔
1058
                self.transition(ConnectivityStatus::Degraded(n), min_peers);
3✔
1059
            },
3✔
1060
            n if n == 0 => {
19✔
1061
                if num_connected_clients == 0 {
19✔
1062
                    self.transition(ConnectivityStatus::Offline, min_peers);
15✔
1063
                } else {
15✔
1064
                    self.transition(ConnectivityStatus::Degraded(n), min_peers);
4✔
1065
                }
4✔
1066
            },
1067
            _ => unreachable!("num_connected is unsigned and only negative pattern covered on this branch"),
×
1068
        }
1069
    }
214✔
1070

1071
    #[cfg(not(feature = "metrics"))]
1072
    fn update_connectivity_metrics(&mut self) {}
1073

1074
    #[allow(clippy::cast_possible_wrap)]
1075
    #[cfg(feature = "metrics")]
1076
    fn update_connectivity_metrics(&mut self) {
214✔
1077
        use std::convert::TryFrom;
1078

1079
        use super::metrics;
1080

1081
        let total = self.pool.count_connected() as i64;
214✔
1082
        let num_inbound = self.pool.count_filtered(|state| match state.connection() {
578✔
1083
            Some(conn) => conn.is_connected() && conn.direction().is_inbound(),
578✔
1084
            None => false,
×
1085
        }) as i64;
578✔
1086

1087
        metrics::connections(ConnectionDirection::Inbound).set(num_inbound);
214✔
1088
        metrics::connections(ConnectionDirection::Outbound).set(total - num_inbound);
214✔
1089

1090
        let uptime = self
214✔
1091
            .uptime
214✔
1092
            .map(|ts| i64::try_from(ts.elapsed().as_secs()).unwrap_or(i64::MAX))
214✔
1093
            .unwrap_or(0);
214✔
1094
        metrics::uptime().set(uptime);
214✔
1095
    }
214✔
1096

1097
    fn transition(&mut self, next_status: ConnectivityStatus, required_num_peers: usize) {
214✔
1098
        use ConnectivityStatus::{Degraded, Offline, Online};
1099
        if self.status != next_status {
214✔
1100
            debug!(
166✔
1101
                target: LOG_TARGET,
×
1102
                "Connectivity status transitioning from {} to {}", self.status, next_status
×
1103
            );
1104
        }
48✔
1105

1106
        match (self.status, next_status) {
214✔
1107
            (Online(_), Online(_)) => {},
122✔
1108
            (_, Online(n)) => {
70✔
1109
                info!(
70✔
1110
                    target: LOG_TARGET,
×
1111
                    "Connectivity is ONLINE ({n}/{required_num_peers} connections)"
×
1112
                );
1113

1114
                #[cfg(feature = "metrics")]
1115
                if self.uptime.is_none() {
70✔
1116
                    self.uptime = Some(Instant::now());
1✔
1117
                }
69✔
1118
                self.publish_event(ConnectivityEvent::ConnectivityStateOnline(n));
70✔
1119
            },
1120
            (Degraded(m), Degraded(n)) => {
2✔
1121
                info!(
2✔
1122
                    target: LOG_TARGET,
×
1123
                    "Connectivity is DEGRADED ({n}/{required_num_peers} connections)"
×
1124
                );
1125
                if m != n {
2✔
1126
                    self.publish_event(ConnectivityEvent::ConnectivityStateDegraded(n));
1✔
1127
                }
1✔
1128
            },
1129
            (_, Degraded(n)) => {
5✔
1130
                info!(
5✔
1131
                    target: LOG_TARGET,
×
1132
                    "Connectivity is DEGRADED ({n}/{required_num_peers} connections)"
×
1133
                );
1134
                self.publish_event(ConnectivityEvent::ConnectivityStateDegraded(n));
5✔
1135
            },
1136
            (Offline, Offline) => {},
3✔
1137
            (_, Offline) => {
1138
                warn!(
12✔
1139
                    target: LOG_TARGET,
×
1140
                    "Connectivity is OFFLINE (0/{required_num_peers} connections)"
×
1141
                );
1142
                #[cfg(feature = "metrics")]
1143
                {
12✔
1144
                    self.uptime = None;
12✔
1145
                }
12✔
1146
                self.publish_event(ConnectivityEvent::ConnectivityStateOffline);
12✔
1147
            },
1148
            (status, next_status) => unreachable!("Unexpected status transition ({status} to {next_status})"),
×
1149
        }
1150
        self.status = next_status;
214✔
1151
    }
214✔
1152

1153
    fn publish_event(&mut self, event: ConnectivityEvent) {
385✔
1154
        // A send operation can only fail if there are no subscribers, so it is safe to ignore the error
1155
        let _result = self.event_tx.send(event);
385✔
1156
    }
385✔
1157

1158
    async fn ban_peer(
5✔
1159
        &mut self,
5✔
1160
        node_id: &NodeId,
5✔
1161
        duration: Duration,
5✔
1162
        reason: String,
5✔
1163
    ) -> Result<(), ConnectivityError> {
5✔
1164
        info!(
5✔
1165
            target: LOG_TARGET,
×
1166
            "Banning peer {} for {} because: {}",
×
1167
            node_id,
1168
            format_duration(duration),
×
1169
            reason
1170
        );
1171
        let ban_result = self.peer_manager.ban_peer_by_node_id(node_id, duration, reason).await;
5✔
1172

1173
        #[cfg(feature = "metrics")]
1174
        super::metrics::banned_peers_counter().inc();
5✔
1175

1176
        self.publish_event(ConnectivityEvent::PeerBanned(node_id.clone()));
5✔
1177

1178
        if let Some(conn) = self.pool.get_connection_mut(node_id) {
5✔
1179
            disconnect_with_timeout(conn, Minimized::Yes, None, "ConnectivityManagerActor ban peer").await?;
5✔
1180
            let status = self.pool.get_connection_status(node_id);
5✔
1181
            debug!(
5✔
1182
                target: LOG_TARGET,
×
1183
                "Disconnected banned peer {node_id}. The peer connection status is {status}"
×
1184
            );
1185
        }
×
1186
        ban_result?;
5✔
1187
        Ok(())
5✔
1188
    }
5✔
1189

1190
    async fn execute_proactive_dialing(&mut self, task_id: u64) -> Result<(), ConnectivityError> {
12✔
1191
        debug!(
12✔
1192
            target: LOG_TARGET,
×
1193
            "({}) Starting proactive dialing execution - current connections: {}, target: {}",
×
1194
            task_id,
1195
            self.pool.count_connected_nodes(),
×
1196
            self.config.target_connection_count
1197
        );
1198

1199
        // First, clean up old health data to keep metrics accurate
1200
        for stats in self.connection_stats.values_mut() {
12✔
1201
            stats.cleanup_old_health_data(self.config.success_rate_tracking_window);
12✔
1202
        }
12✔
1203

1204
        // Update circuit breaker metrics
1205
        self.update_circuit_breaker_metrics();
12✔
1206

1207
        // Execute proactive dialing logic
1208
        if self.seeds.is_empty() {
12✔
1209
            self.seeds = self
12✔
1210
                .peer_manager
12✔
1211
                .get_seed_peers()
12✔
1212
                .await
12✔
1213
                .inspect_err(|err| {
12✔
1214
                    warn!(
×
1215
                        target: LOG_TARGET,
×
1216
                        "Failed to get seed peers from PeerManager, using empty list for proactive dialing, seed peers \
×
1217
                        will not be excluded as a first pass. ({})", err
×
1218
                    );
1219
                })
×
1220
                .unwrap_or(vec![])
12✔
1221
                .iter()
12✔
1222
                .map(|s| s.node_id.clone())
12✔
1223
                .collect();
12✔
1224
        }
×
1225
        match self
12✔
1226
            .proactive_dialer
12✔
1227
            .execute_proactive_dialing(&self.pool, &self.connection_stats, &self.seeds, task_id)
12✔
1228
            .await
12✔
1229
        {
1230
            Ok(dialed_count) => {
12✔
1231
                if dialed_count > 0 {
12✔
1232
                    debug!(
×
1233
                        target: LOG_TARGET,
×
1234
                        "({task_id}) Proactive dialing initiated {dialed_count} peer connections"
×
1235
                    );
1236
                }
12✔
1237
                Ok(())
12✔
1238
            },
1239
            Err(err) => {
×
1240
                error!(
×
1241
                    target: LOG_TARGET,
×
1242
                    "({task_id}) Proactive dialing failed: {err:?}"
×
1243

1244
                );
1245
                Err(err)
×
1246
            },
1247
        }
1248
    }
12✔
1249

1250
    fn update_circuit_breaker_metrics(&self) {
12✔
1251
        let _circuit_breaker_open_count = self
12✔
1252
            .connection_stats
12✔
1253
            .values()
12✔
1254
            .filter(|stats| stats.health_metrics().circuit_breaker_state().is_open())
12✔
1255
            .count();
12✔
1256

1257
        // Calculate average peer health score
1258
        if !self.connection_stats.is_empty() {
12✔
1259
            let total_health: f32 = self
12✔
1260
                .connection_stats
12✔
1261
                .values()
12✔
1262
                .map(|stats| stats.health_score(self.config.success_rate_tracking_window))
12✔
1263
                .sum();
12✔
1264
            let _avg_health = total_health / self.connection_stats.len() as f32;
12✔
1265
        }
×
1266
    }
12✔
1267

1268
    fn cleanup_connection_stats(&mut self) {
12✔
1269
        let mut to_remove = Vec::new();
12✔
1270
        for node_id in self.connection_stats.keys() {
12✔
1271
            let status = self.pool.get_connection_status(node_id);
12✔
1272
            if matches!(
12✔
1273
                status,
12✔
1274
                ConnectionStatus::NotConnected | ConnectionStatus::Failed | ConnectionStatus::Disconnected(_)
1275
            ) {
×
1276
                to_remove.push(node_id.clone());
×
1277
            }
12✔
1278
        }
1279
        for node_id in to_remove {
12✔
1280
            self.connection_stats.remove(&node_id);
×
1281
        }
×
1282
    }
12✔
1283
}
1284

1285
enum TieBreak {
1286
    None,
1287
    UseNew,
1288
    KeepExisting,
1289
}
1290

1291
async fn disconnect_with_timeout(
5✔
1292
    connection: &mut PeerConnection,
5✔
1293
    minimized: Minimized,
5✔
1294
    task_id: Option<u64>,
5✔
1295
    requester: &str,
5✔
1296
) -> Result<(), PeerConnectionError> {
5✔
1297
    match tokio::time::timeout(PEER_DISCONNECT_TIMEOUT, connection.disconnect(minimized, requester)).await {
5✔
1298
        Ok(res) => res,
5✔
1299
        Err(_) => {
1300
            warn!(
×
1301
                target: LOG_TARGET,
×
1302
                "Timeout disconnecting peer ({:?}) '{}'",
×
1303
                task_id,
1304
                connection.peer_node_id().short_str(),
×
1305
            );
1306
            Err(PeerConnectionError::DisconnectTimeout)
×
1307
        },
1308
    }
1309
}
5✔
1310

1311
async fn disconnect_if_unused_with_timeout(
×
1312
    connection: &mut PeerConnection,
×
1313
    minimized: Minimized,
×
1314
    task_id: Option<u64>,
×
1315
    requester: &str,
×
1316
) -> Result<(), PeerConnectionError> {
×
1317
    match tokio::time::timeout(
×
1318
        PEER_DISCONNECT_TIMEOUT,
1319
        connection.disconnect_if_unused(minimized, 0, 0, requester),
×
1320
    )
1321
    .await
×
1322
    {
1323
        Ok(res) => res,
×
1324
        Err(_) => {
1325
            warn!(
×
1326
                target: LOG_TARGET,
×
1327
                "Timeout disconnecting peer ({:?}) '{}'",
×
1328
                task_id,
1329
                connection.peer_node_id().short_str(),
×
1330
            );
1331
            Err(PeerConnectionError::DisconnectTimeout)
×
1332
        },
1333
    }
1334
}
×
1335

1336
async fn disconnect_silent_with_timeout(
58✔
1337
    connection: &mut PeerConnection,
58✔
1338
    minimized: Minimized,
58✔
1339
    task_id: Option<u64>,
58✔
1340
    requester: &str,
58✔
1341
) -> Result<(), PeerConnectionError> {
58✔
1342
    match tokio::time::timeout(
58✔
1343
        PEER_DISCONNECT_TIMEOUT,
1344
        connection.disconnect_silent(minimized, requester),
58✔
1345
    )
1346
    .await
58✔
1347
    {
1348
        Ok(res) => res,
53✔
1349
        Err(_) => {
1350
            warn!(
×
1351
                target: LOG_TARGET,
×
1352
                "Timeout disconnecting peer ({:?}) '{}'",
×
1353
                task_id,
1354
                connection.peer_node_id().short_str(),
×
1355
            );
1356
            Err(PeerConnectionError::DisconnectTimeout)
×
1357
        },
1358
    }
1359
}
53✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc