• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 16467678859

23 Jul 2025 10:05AM UTC coverage: 54.21% (+0.004%) from 54.206%
16467678859

push

github

web-flow
docs: update ffi interface spec (#7367)

Description
---
Updated the FFI interface specification for `fn wallet_start_recovery`

Motivation and Context
---
The function interface was changed recently.

How Has This Been Tested?
---
The specification text was compared against the code in `pub async fn
recovery_event_monitoring(..)`

What process can a PR reviewer use to test or verify this change?
---
Code review

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be included exactly as is in the CHANGELOG, so please think
about it carefully. -->


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify

<!-- Does this include a breaking change? If so, include this line as a
footer -->
<!-- BREAKING CHANGE: Description what the user should do, e.g. delete a
database, resync the chain -->


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

* **Documentation**
* Updated the documentation for the wallet recovery progress callback to
clarify the event types and their arguments, simplifying descriptions
and removing outdated event information. No changes were made to
functionality or public interfaces.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

75384 of 139060 relevant lines covered (54.21%)

195873.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.33
/comms/core/src/connectivity/manager.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22
use std::{
23
    collections::HashMap,
24
    fmt,
25
    sync::Arc,
26
    time::{Duration, Instant},
27
};
28

29
use log::*;
30
use nom::lib::std::collections::hash_map::Entry;
31
use tari_shutdown::ShutdownSignal;
32
use tokio::{
33
    sync::{mpsc, oneshot},
34
    task::JoinHandle,
35
    time,
36
    time::MissedTickBehavior,
37
};
38
use tracing::{span, Instrument, Level};
39

40
use super::{
41
    config::ConnectivityConfig,
42
    connection_pool::{ConnectionPool, ConnectionStatus},
43
    connection_stats::PeerConnectionStats,
44
    error::ConnectivityError,
45
    proactive_dialer::ProactiveDialer,
46
    requester::{ConnectivityEvent, ConnectivityRequest},
47
    selection::ConnectivitySelection,
48
    ConnectivityEventTx,
49
};
50
use crate::{
51
    connection_manager::{
52
        ConnectionDirection,
53
        ConnectionManagerError,
54
        ConnectionManagerEvent,
55
        ConnectionManagerRequester,
56
    },
57
    peer_manager::NodeId,
58
    utils::datetime::format_duration,
59
    Minimized,
60
    NodeIdentity,
61
    PeerConnection,
62
    PeerConnectionError,
63
    PeerManager,
64
};
65

66
const LOG_TARGET: &str = "comms::connectivity::manager";
67

68
// Maximum time allowed for refreshing the connection pool
69
const POOL_REFRESH_TIMEOUT: Duration = Duration::from_millis(2500);
70
// Maximum time allowed to disconnect a single peer
71
const PEER_DISCONNECT_TIMEOUT: Duration = Duration::from_millis(250);
72
// Warning threshold for request processing time
73
const ACCEPTABLE_CONNECTIVITY_REQUEST_PROCESSING_TIME: Duration = Duration::from_millis(500);
74
// Warning threshold for event processing time
75
const ACCEPTABLE_EVENT_PROCESSING_TIME: Duration = Duration::from_millis(500);
76

77
/// # Connectivity Manager
78
///
79
/// The ConnectivityManager actor is responsible for tracking the state of all peer
80
/// connections in the system and maintaining a _pool_ of peer connections.
81
///
82
/// It emits [ConnectivityEvent](crate::connectivity::ConnectivityEvent)s that can keep client components
83
/// in the loop with the state of the node's connectivity.
84
pub struct ConnectivityManager {
85
    pub config: ConnectivityConfig,
86
    pub request_rx: mpsc::Receiver<ConnectivityRequest>,
87
    pub event_tx: ConnectivityEventTx,
88
    pub connection_manager: ConnectionManagerRequester,
89
    pub peer_manager: Arc<PeerManager>,
90
    pub node_identity: Arc<NodeIdentity>,
91
    pub shutdown_signal: ShutdownSignal,
92
}
93

94
impl ConnectivityManager {
95
    pub fn spawn(self) -> JoinHandle<()> {
71✔
96
        let proactive_dialer = ProactiveDialer::new(
71✔
97
            self.config,
71✔
98
            self.connection_manager.clone(),
71✔
99
            self.peer_manager.clone(),
71✔
100
            self.node_identity.clone(),
71✔
101
        );
71✔
102

71✔
103
        ConnectivityManagerActor {
71✔
104
            config: self.config,
71✔
105
            status: ConnectivityStatus::Initializing,
71✔
106
            request_rx: self.request_rx,
71✔
107
            connection_manager: self.connection_manager,
71✔
108
            peer_manager: self.peer_manager.clone(),
71✔
109
            event_tx: self.event_tx,
71✔
110
            connection_stats: HashMap::new(),
71✔
111
            node_identity: self.node_identity,
71✔
112
            pool: ConnectionPool::new(),
71✔
113
            shutdown_signal: self.shutdown_signal,
71✔
114
            #[cfg(feature = "metrics")]
71✔
115
            uptime: Some(Instant::now()),
71✔
116
            allow_list: vec![],
71✔
117
            proactive_dialer,
71✔
118
        }
71✔
119
        .spawn()
71✔
120
    }
71✔
121
}
122

123
/// Node connectivity status.
124
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
125
pub enum ConnectivityStatus {
126
    /// Initial connectivity status before the Connectivity actor has initialized.
127
    #[default]
128
    Initializing,
129
    /// Connectivity is online.
130
    Online(usize),
131
    /// Connectivity is less than the required minimum, but some connections are still active.
132
    Degraded(usize),
133
    /// There are no active connections.
134
    Offline,
135
}
136

137
impl ConnectivityStatus {
138
    is_fn!(is_initializing, ConnectivityStatus::Initializing);
139

140
    is_fn!(is_online, ConnectivityStatus::Online(_));
141

142
    is_fn!(is_offline, ConnectivityStatus::Offline);
143

144
    is_fn!(is_degraded, ConnectivityStatus::Degraded(_));
145

146
    pub fn num_connected_nodes(&self) -> usize {
5✔
147
        use ConnectivityStatus::{Degraded, Initializing, Offline, Online};
148
        match self {
5✔
149
            Initializing | Offline => 0,
4✔
150
            Online(n) | Degraded(n) => *n,
1✔
151
        }
152
    }
5✔
153
}
154

155
impl fmt::Display for ConnectivityStatus {
156
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
157
        write!(f, "{:?}", self)
×
158
    }
×
159
}
160

161
struct ConnectivityManagerActor {
162
    config: ConnectivityConfig,
163
    status: ConnectivityStatus,
164
    request_rx: mpsc::Receiver<ConnectivityRequest>,
165
    connection_manager: ConnectionManagerRequester,
166
    node_identity: Arc<NodeIdentity>,
167
    peer_manager: Arc<PeerManager>,
168
    event_tx: ConnectivityEventTx,
169
    connection_stats: HashMap<NodeId, PeerConnectionStats>,
170
    pool: ConnectionPool,
171
    shutdown_signal: ShutdownSignal,
172
    #[cfg(feature = "metrics")]
173
    uptime: Option<Instant>,
174
    allow_list: Vec<NodeId>,
175
    proactive_dialer: ProactiveDialer,
176
}
177

178
impl ConnectivityManagerActor {
179
    pub fn spawn(self) -> JoinHandle<()> {
71✔
180
        tokio::spawn(async { Self::run(self).await })
71✔
181
    }
71✔
182

183
    pub async fn run(mut self) {
71✔
184
        debug!(target: LOG_TARGET, "ConnectivityManager started");
71✔
185

186
        let mut connection_manager_events = self.connection_manager.get_event_subscription();
71✔
187

71✔
188
        let interval = self.config.connection_pool_refresh_interval;
71✔
189
        let mut connection_pool_timer = time::interval_at(
71✔
190
            Instant::now()
71✔
191
                .checked_add(interval)
71✔
192
                .expect("connection_pool_refresh_interval cause overflow")
71✔
193
                .into(),
71✔
194
            interval,
71✔
195
        );
71✔
196
        connection_pool_timer.set_missed_tick_behavior(MissedTickBehavior::Delay);
71✔
197

71✔
198
        self.publish_event(ConnectivityEvent::ConnectivityStateInitialized);
71✔
199

200
        loop {
201
            tokio::select! {
1,043✔
202
                Some(req) = self.request_rx.recv() => {
1,043✔
203
                    let timer = Instant::now();
769✔
204
                    let task_id = rand::random::<u64>();
769✔
205
                    trace!(target: LOG_TARGET, "Request ({}): {:?}", task_id, req);
769✔
206
                    self.handle_request(req).await;
769✔
207
                    if timer.elapsed() > ACCEPTABLE_CONNECTIVITY_REQUEST_PROCESSING_TIME {
769✔
208
                        warn!(
×
209
                            target: LOG_TARGET,
×
210
                            "Request ({}) took too long to process: {:.2?}",
×
211
                            task_id,
×
212
                            format_duration(timer.elapsed())
×
213
                        );
214
                    }
769✔
215
                    trace!(target: LOG_TARGET, "Request ({}) done", task_id);
769✔
216
                },
217

218
                Ok(event) = connection_manager_events.recv() => {
1,043✔
219
                    let timer = Instant::now();
189✔
220
                    let task_id = rand::random::<u64>();
189✔
221
                    trace!(target: LOG_TARGET, "Event ({}): {:?}", task_id, event);
189✔
222
                    if let Err(err) = self.handle_connection_manager_event(&event).await {
189✔
223
                        error!(target:LOG_TARGET, "Error handling connection manager event ({}): {:?}", task_id, err);
×
224
                    }
189✔
225
                    if timer.elapsed() > ACCEPTABLE_EVENT_PROCESSING_TIME {
189✔
226
                        warn!(
×
227
                            target: LOG_TARGET,
×
228
                            "Event ({}) took too long to process: {:.2?}",
×
229
                            task_id,
×
230
                            format_duration(timer.elapsed())
×
231
                        );
232
                    }
189✔
233
                    trace!(target: LOG_TARGET, "Event ({}) done", task_id);
189✔
234
                },
235

236
                _ = connection_pool_timer.tick() => {
1,043✔
237
                    let task_id = rand::random::<u64>();
14✔
238
                    trace!(target: LOG_TARGET, "Pool refresh peers task ({})", task_id);
14✔
239
                    self.cleanup_connection_stats();
14✔
240
                    match tokio::time::timeout(POOL_REFRESH_TIMEOUT, self.refresh_connection_pool(task_id)).await {
14✔
241
                        Ok(res) => {
14✔
242
                            if let Err(err) = res {
14✔
243
                                error!(target: LOG_TARGET, "Error refreshing connection pools ({}): {:?}", task_id, err);
×
244
                            }
14✔
245
                        },
246
                        Err(_) => {
247
                            warn!(
×
248
                                target: LOG_TARGET,
×
249
                                "Pool refresh task ({}) timeout",
×
250
                                task_id,
251
                            );
252
                        },
253
                    }
254
                    trace!(target: LOG_TARGET, "Pool refresh task ({}) done", task_id);
14✔
255
                },
256

257
                _ = self.shutdown_signal.wait() => {
1,043✔
258
                    info!(
53✔
259
                        target: LOG_TARGET,
×
260
                        "ConnectivityManager is shutting down because it received the shutdown signal"
×
261
                    );
262
                    self.disconnect_all().await;
53✔
263
                    break;
46✔
264
                }
46✔
265
            }
46✔
266
        }
46✔
267
    }
46✔
268

269
    async fn handle_request(&mut self, req: ConnectivityRequest) {
769✔
270
        #[allow(clippy::enum_glob_use)]
271
        use ConnectivityRequest::*;
272
        match req {
769✔
273
            WaitStarted(reply) => {
32✔
274
                let _ = reply.send(());
32✔
275
            },
32✔
276
            GetConnectivityStatus(reply) => {
17✔
277
                let _ = reply.send(self.status);
17✔
278
            },
17✔
279
            DialPeer { node_id, reply_tx } => {
142✔
280
                let tracing_id = tracing::Span::current().id();
142✔
281
                let span = span!(Level::TRACE, "handle_dial_peer");
142✔
282
                span.follows_from(tracing_id);
142✔
283
                self.handle_dial_peer(node_id.clone(), reply_tx).instrument(span).await;
142✔
284
            },
285
            SelectConnections(selection, reply) => {
54✔
286
                let _result = reply.send(self.select_connections(selection));
54✔
287
            },
54✔
288
            GetConnection(node_id, reply) => {
48✔
289
                let _result = reply.send(
48✔
290
                    self.pool
48✔
291
                        .get(&node_id)
48✔
292
                        .filter(|c| c.status() == ConnectionStatus::Connected)
48✔
293
                        .and_then(|c| c.connection())
48✔
294
                        .filter(|conn| conn.is_connected())
48✔
295
                        .cloned(),
48✔
296
                );
48✔
297
            },
48✔
298
            GetPeerStats(node_id, reply) => {
×
299
                let peer = match self.peer_manager.find_by_node_id(&node_id).await {
×
300
                    Ok(v) => v,
×
301
                    Err(e) => {
×
302
                        error!(target: LOG_TARGET, "Error when retrieving peer: {:?}", e);
×
303
                        None
×
304
                    },
305
                };
306
                let _result = reply.send(peer);
×
307
            },
308
            GetAllConnectionStates(reply) => {
1✔
309
                let states = self.pool.all().into_iter().cloned().collect();
1✔
310
                let _result = reply.send(states);
1✔
311
            },
1✔
312
            GetMinimizeConnectionsThreshold(reply) => {
×
313
                let minimize_connections_threshold = self.config.maintain_n_closest_connections_only;
×
314
                let _result = reply.send(minimize_connections_threshold);
×
315
            },
×
316
            BanPeer(node_id, duration, reason) => {
5✔
317
                if self.allow_list.contains(&node_id) {
5✔
318
                    info!(
×
319
                        target: LOG_TARGET,
×
320
                        "Peer is excluded from being banned as it was found in the AllowList, NodeId: {:?}", node_id
×
321
                    );
322
                } else if let Err(err) = self.ban_peer(&node_id, duration, reason).await {
5✔
323
                    error!(target: LOG_TARGET, "Error when banning peer: {:?}", err);
×
324
                } else {
5✔
325
                    // we banned the peer
5✔
326
                }
5✔
327
            },
328
            AddPeerToAllowList(node_id) => {
×
329
                if !self.allow_list.contains(&node_id) {
×
330
                    self.allow_list.push(node_id.clone());
×
331
                }
×
332
            },
333
            RemovePeerFromAllowList(node_id) => {
×
334
                if let Some(index) = self.allow_list.iter().position(|x| *x == node_id) {
×
335
                    self.allow_list.remove(index);
×
336
                }
×
337
            },
338
            GetAllowList(reply) => {
468✔
339
                let allow_list = self.allow_list.clone();
468✔
340
                let _result = reply.send(allow_list);
468✔
341
            },
468✔
342
            GetSeeds(reply) => {
×
343
                let seeds = self.peer_manager.get_seed_peers().await.unwrap_or_else(|e| {
×
344
                    error!(target: LOG_TARGET, "Error when retrieving seed peers: {:?}", e);
×
345
                    vec![]
×
346
                });
×
347
                let _result = reply.send(seeds);
×
348
            },
×
349
            GetActiveConnections(reply) => {
2✔
350
                let _result = reply.send(
2✔
351
                    self.pool
2✔
352
                        .filter_connection_states(|s| s.is_connected())
20✔
353
                        .into_iter()
2✔
354
                        .cloned()
2✔
355
                        .collect(),
2✔
356
                );
2✔
357
            },
2✔
358
            GetNodeIdentity(reply) => {
×
359
                let identity = self.node_identity.as_ref();
×
360
                let _result = reply.send(identity.clone());
×
361
            },
×
362
        }
363
    }
769✔
364

365
    async fn handle_dial_peer(
142✔
366
        &mut self,
142✔
367
        node_id: NodeId,
142✔
368
        reply_tx: Option<oneshot::Sender<Result<PeerConnection, ConnectionManagerError>>>,
142✔
369
    ) {
142✔
370
        match self.peer_manager.is_peer_banned(&node_id).await {
142✔
371
            Ok(true) => {
372
                if let Some(reply) = reply_tx {
×
373
                    let _result = reply.send(Err(ConnectionManagerError::PeerBanned));
×
374
                }
×
375
                return;
×
376
            },
377
            Ok(false) => {},
142✔
378
            Err(err) => {
×
379
                if let Some(reply) = reply_tx {
×
380
                    let _result = reply.send(Err(err.into()));
×
381
                }
×
382
                return;
×
383
            },
384
        }
385
        match self.pool.get(&node_id) {
142✔
386
            // The connection pool may temporarily contain a connection that is not connected so we need to check this.
387
            Some(state) if state.is_connected() => {
65✔
388
                if let Some(reply_tx) = reply_tx {
53✔
389
                    let _result = reply_tx.send(Ok(state.connection().cloned().expect("Already checked")));
36✔
390
                }
36✔
391
            },
392
            maybe_state => {
89✔
393
                match maybe_state {
89✔
394
                    Some(state) => {
12✔
395
                        info!(
12✔
396
                            target: LOG_TARGET,
×
397
                            "Connection was previously attempted for peer {}. Current status is '{}'. Dialing again...",
×
398
                            node_id.short_str(),
×
399
                            state.status()
×
400
                        );
401
                    },
402
                    None => {
403
                        info!(
77✔
404
                            target: LOG_TARGET,
×
405
                            "No connection for peer {}. Dialing...",
×
406
                            node_id.short_str(),
×
407
                        );
408
                    },
409
                }
410

411
                if let Err(err) = self.connection_manager.send_dial_peer(node_id, reply_tx).await {
89✔
412
                    error!(
×
413
                        target: LOG_TARGET,
×
414
                        "Failed to send dial request to connection manager: {:?}", err
×
415
                    );
416
                }
89✔
417
            },
418
        }
419
    }
142✔
420

421
    async fn disconnect_all(&mut self) {
53✔
422
        let mut node_ids = Vec::with_capacity(self.pool.count_connected());
53✔
423
        for mut state in self.pool.filter_drain(|_| true) {
79✔
424
            if let Some(conn) = state.connection_mut() {
78✔
425
                if !conn.is_connected() {
78✔
426
                    continue;
29✔
427
                }
49✔
428
                match disconnect_silent_with_timeout(
49✔
429
                    conn,
49✔
430
                    Minimized::No,
49✔
431
                    None,
49✔
432
                    "ConnectivityManagerActor disconnect all",
49✔
433
                )
49✔
434
                .await
49✔
435
                {
436
                    Ok(_) => {
41✔
437
                        node_ids.push(conn.peer_node_id().clone());
41✔
438
                    },
41✔
439
                    Err(err) => {
1✔
440
                        debug!(
1✔
441
                            target: LOG_TARGET,
×
442
                            "In disconnect_all: Error when disconnecting peer '{}' because '{:?}'",
×
443
                            conn.peer_node_id().short_str(),
×
444
                            err
445
                        );
446
                    },
447
                }
448
            }
×
449
        }
450

451
        for node_id in node_ids {
87✔
452
            self.publish_event(ConnectivityEvent::PeerDisconnected(node_id, Minimized::No));
41✔
453
        }
41✔
454
    }
46✔
455

456
    async fn refresh_connection_pool(&mut self, task_id: u64) -> Result<(), ConnectivityError> {
14✔
457
        info!(
14✔
458
            target: LOG_TARGET,
×
459
            "CONNECTIVITY_REFRESH: Performing connection pool cleanup/refresh ({}). (#Peers = {}, #Connected={}, #Failed={}, #Disconnected={}, \
×
460
             #Clients={})",
×
461
            task_id,
×
462
            self.pool.count_entries(),
×
463
            self.pool.count_connected_nodes(),
×
464
            self.pool.count_failed(),
×
465
            self.pool.count_disconnected(),
×
466
            self.pool.count_connected_clients()
×
467
        );
468

469
        self.clean_connection_pool();
14✔
470
        if self.config.is_connection_reaping_enabled {
14✔
471
            self.reap_inactive_connections(task_id).await;
14✔
472
        }
×
473
        if let Some(threshold) = self.config.maintain_n_closest_connections_only {
14✔
474
            self.maintain_n_closest_peer_connections_only(threshold, task_id).await;
×
475
        }
14✔
476

477
        // Execute proactive dialing logic (if enabled)
478
        debug!(
14✔
479
            target: LOG_TARGET,
×
480
            "({}) Proactive dialing config check: enabled={}, target_connections={}",
×
481
            task_id,
482
            self.config.proactive_dialing_enabled,
483
            self.config.target_connection_count
484
        );
485

486
        if self.config.proactive_dialing_enabled {
14✔
487
            debug!(
14✔
488
                target: LOG_TARGET,
×
489
                "({}) Executing proactive dialing logic",
×
490
                task_id
491
            );
492
            if let Err(err) = self.execute_proactive_dialing(task_id).await {
14✔
493
                warn!(
×
494
                    target: LOG_TARGET,
×
495
                    "({}) Proactive dialing failed: {:?}",
×
496
                    task_id,
497
                    err
498
                );
499
            }
14✔
500
        } else {
501
            debug!(
×
502
                target: LOG_TARGET,
×
503
                "({}) Proactive dialing disabled in configuration",
×
504
                task_id
505
            );
506
        }
507

508
        self.update_connectivity_status();
14✔
509
        self.update_connectivity_metrics();
14✔
510
        Ok(())
14✔
511
    }
14✔
512

513
    async fn maintain_n_closest_peer_connections_only(&mut self, threshold: usize, task_id: u64) {
×
514
        let start = Instant::now();
×
515
        // Select all active peer connections (that are communication nodes) with health-aware selection
×
516
        let selection = ConnectivitySelection::healthy_closest_to(
×
517
            self.node_identity.node_id().clone(),
×
518
            self.pool.count_connected_nodes(),
×
519
            vec![],
×
520
        );
×
521
        let mut connections = match self.select_connections_with_health(selection) {
×
522
            Ok(peers) => peers,
×
523
            Err(e) => {
×
524
                warn!(
×
525
                    target: LOG_TARGET,
×
526
                    "Connectivity error trying to maintain {} closest peers ({}) ({:?})",
×
527
                    threshold,
528
                    task_id,
529
                    e
530
                );
531
                return;
×
532
            },
533
        };
534
        let num_connections = connections.len();
×
535

×
536
        // Remove peers that are on the allow list
×
537
        connections.retain(|conn| !self.allow_list.contains(conn.peer_node_id()));
×
538
        debug!(
×
539
            target: LOG_TARGET,
×
540
            "minimize_connections: ({}) Filtered peers: {}, Handles: {}",
×
541
            task_id,
×
542
            connections.len(),
×
543
            num_connections,
544
        );
545

546
        // Disconnect all remaining peers above the threshold
547
        let len = connections.len();
×
548
        for conn in connections.iter_mut().skip(threshold) {
×
549
            debug!(
×
550
                target: LOG_TARGET,
×
551
                "minimize_connections: ({}) Disconnecting '{}' because the node is not among the {} closest peers",
×
552
                task_id,
×
553
                conn.peer_node_id(),
×
554
                threshold
555
            );
556
            match disconnect_if_unused_with_timeout(
×
557
                conn,
×
558
                Minimized::Yes,
×
559
                Some(task_id),
×
560
                "ConnectivityManagerActor maintain closest",
×
561
            )
×
562
            .await
×
563
            {
564
                Ok(_) => {
×
565
                    self.pool.remove(conn.peer_node_id());
×
566
                },
×
567
                Err(err) => {
×
568
                    debug!(
×
569
                        target: LOG_TARGET,
×
570
                        "Peer '{}' already disconnected ({:?}). Error: {:?}",
×
571
                        conn.peer_node_id().short_str(),
×
572
                        task_id,
573
                        err
574
                    );
575
                },
576
            }
577
        }
578
        if len > 0 {
×
579
            debug!(
×
580
                "minimize_connections: ({}) Minimized {} connections in {:.2?}",
×
581
                task_id,
×
582
                len,
×
583
                start.elapsed()
×
584
            );
585
        }
×
586
    }
×
587

588
    async fn reap_inactive_connections(&mut self, task_id: u64) {
14✔
589
        let start = Instant::now();
14✔
590
        let excess_connections = self
14✔
591
            .pool
14✔
592
            .count_connected()
14✔
593
            .saturating_sub(self.config.reaper_min_connection_threshold);
14✔
594
        if excess_connections == 0 {
14✔
595
            return;
14✔
596
        }
×
597

×
598
        let mut connections = self
×
599
            .pool
×
600
            .get_inactive_outbound_connections_mut(self.config.reaper_min_inactive_age);
×
601
        connections.truncate(excess_connections);
×
602
        let mut nodes_to_remove = Vec::new();
×
603
        for conn in &mut connections {
×
604
            if !conn.is_connected() {
×
605
                continue;
×
606
            }
×
607

×
608
            debug!(
×
609
                target: LOG_TARGET,
×
610
                "({}) Disconnecting '{}' because connection was inactive ({} handles)",
×
611
                task_id,
×
612
                conn.peer_node_id().short_str(),
×
613
                conn.handle_count()
×
614
            );
615
            match disconnect_with_timeout(
×
616
                conn,
×
617
                Minimized::Yes,
×
618
                Some(task_id),
×
619
                "ConnectivityManagerActor reap inactive",
×
620
            )
×
621
            .await
×
622
            {
623
                Ok(_) => {
×
624
                    nodes_to_remove.push(conn.peer_node_id().clone());
×
625
                },
×
626
                Err(err) => {
×
627
                    debug!(
×
628
                        target: LOG_TARGET,
×
629
                        "Peer '{}' already disconnected ({:?}). Error: {:?}",
×
630
                        conn.peer_node_id().short_str(),
×
631
                        task_id,
632
                        err
633
                    );
634
                },
635
            }
636
        }
637
        let len = nodes_to_remove.len();
×
638
        if len > 0 {
×
639
            for node_id in nodes_to_remove {
×
640
                self.pool.remove(&node_id);
×
641
            }
×
642
            debug!(
×
643
                "({}) Reaped {} inactive connections in {:.2?}",
×
644
                task_id,
×
645
                len,
×
646
                start.elapsed()
×
647
            );
648
        }
×
649
    }
14✔
650

651
    fn clean_connection_pool(&mut self) {
14✔
652
        let cleared_states = self.pool.filter_drain(|state| {
14✔
653
            matches!(
14✔
654
                state.status(),
14✔
655
                ConnectionStatus::Failed | ConnectionStatus::Disconnected(_)
656
            )
657
        });
14✔
658

14✔
659
        if !cleared_states.is_empty() {
14✔
660
            debug!(
×
661
                target: LOG_TARGET,
×
662
                "Cleared connection states: {}",
×
663
                cleared_states
×
664
                    .iter()
×
665
                    .map(ToString::to_string)
×
666
                    .collect::<Vec<_>>()
×
667
                    .join(",")
×
668
            )
669
        }
14✔
670
    }
14✔
671

672
    fn select_connections(&self, selection: ConnectivitySelection) -> Result<Vec<PeerConnection>, ConnectivityError> {
54✔
673
        trace!(target: LOG_TARGET, "Selection query: {:?}", selection);
54✔
674
        trace!(
54✔
675
            target: LOG_TARGET,
×
676
            "Selecting from {} connected node peers",
×
677
            self.pool.count_connected_nodes()
×
678
        );
679

680
        let conns = selection.select(&self.pool);
54✔
681
        debug!(target: LOG_TARGET, "Selected {} connections(s)", conns.len());
54✔
682

683
        Ok(conns.into_iter().cloned().collect())
54✔
684
    }
54✔
685

686
    fn select_connections_with_health(
×
687
        &self,
×
688
        selection: ConnectivitySelection,
×
689
    ) -> Result<Vec<PeerConnection>, ConnectivityError> {
×
690
        trace!(target: LOG_TARGET, "Health-aware selection query: {:?}", selection);
×
691
        trace!(
×
692
            target: LOG_TARGET,
×
693
            "Selecting from {} connected node peers with health metrics",
×
694
            self.pool.count_connected_nodes()
×
695
        );
696

697
        let conns = selection.select_with_health(
×
698
            &self.pool,
×
699
            &self.connection_stats,
×
700
            self.config.success_rate_tracking_window,
×
701
        );
×
702
        debug!(target: LOG_TARGET, "Selected {} healthy connections(s)", conns.len());
×
703

704
        Ok(conns.into_iter().cloned().collect())
×
705
    }
×
706

707
    fn get_connection_stat_mut(&mut self, node_id: NodeId) -> &mut PeerConnectionStats {
138✔
708
        match self.connection_stats.entry(node_id) {
138✔
709
            Entry::Occupied(entry) => entry.into_mut(),
8✔
710
            Entry::Vacant(entry) => entry.insert(PeerConnectionStats::new()),
130✔
711
        }
712
    }
138✔
713

714
    fn mark_connection_success(&mut self, node_id: NodeId) {
135✔
715
        let entry = self.get_connection_stat_mut(node_id);
135✔
716
        entry.set_connection_success();
135✔
717

135✔
718
        // Update proactive dialing success metrics
135✔
719
    }
135✔
720

721
    fn mark_peer_failed(&mut self, node_id: NodeId) -> usize {
3✔
722
        let threshold = self.config.circuit_breaker_failure_threshold;
3✔
723
        let entry = self.get_connection_stat_mut(node_id);
3✔
724
        entry.set_connection_failed_with_threshold(threshold);
3✔
725

3✔
726
        entry.failed_attempts()
3✔
727
    }
3✔
728

729
    async fn on_peer_connection_failure(&mut self, node_id: &NodeId) -> Result<(), ConnectivityError> {
4✔
730
        if self.status.is_offline() {
4✔
731
            info!(
1✔
732
                target: LOG_TARGET,
×
733
                "Node is offline. Ignoring connection failure event for peer '{}'.", node_id
×
734
            );
735
            self.publish_event(ConnectivityEvent::ConnectivityStateOffline);
1✔
736
            return Ok(());
1✔
737
        }
3✔
738

3✔
739
        let num_failed = self.mark_peer_failed(node_id.clone());
3✔
740

3✔
741
        if num_failed >= self.config.max_failures_mark_offline {
3✔
742
            debug!(
3✔
743
                target: LOG_TARGET,
×
744
                "Marking peer '{}' as offline because this node failed to connect to them {} times",
×
745
                node_id.short_str(),
×
746
                num_failed
747
            );
748

749
            if let Some(peer) = self.peer_manager.find_by_node_id(node_id).await? {
3✔
750
                if !peer.is_banned() &&
3✔
751
                    peer.last_seen_since()
3✔
752
                        // Haven't seen them in expire_peer_last_seen_duration
3✔
753
                        .map(|t| t > self.config.expire_peer_last_seen_duration)
3✔
754
                        // Or don't delete if never seen
3✔
755
                        .unwrap_or(false)
3✔
756
                {
757
                    debug!(
×
758
                        target: LOG_TARGET,
×
759
                        "Peer `{}` was marked as offline after {} attempts (last seen: {}). Removing peer from peer \
×
760
                         list",
×
761
                        node_id,
×
762
                        num_failed,
×
763
                        peer.last_seen_since()
×
764
                            .map(|d| format!("{}s ago", d.as_secs()))
×
765
                            .unwrap_or_else(|| "Never".to_string()),
×
766
                    );
767
                    self.peer_manager.soft_delete_peer(node_id).await?;
×
768
                }
3✔
769
            }
×
770
        }
×
771

772
        Ok(())
3✔
773
    }
4✔
774

775
    async fn handle_connection_manager_event(
189✔
776
        &mut self,
189✔
777
        event: &ConnectionManagerEvent,
189✔
778
    ) -> Result<(), ConnectivityError> {
189✔
779
        self.update_state_on_connectivity_event(event).await?;
189✔
780
        self.update_connectivity_status();
189✔
781
        self.update_connectivity_metrics();
189✔
782
        Ok(())
189✔
783
    }
189✔
784

785
    #[allow(clippy::too_many_lines)]
786
    async fn update_state_on_connectivity_event(
189✔
787
        &mut self,
189✔
788
        event: &ConnectionManagerEvent,
189✔
789
    ) -> Result<(), ConnectivityError> {
189✔
790
        use ConnectionManagerEvent::*;
791
        match event {
189✔
792
            PeerConnected(new_conn) => {
137✔
793
                match self.on_new_connection(new_conn).await {
137✔
794
                    TieBreak::KeepExisting => {
795
                        debug!(
2✔
796
                            target: LOG_TARGET,
×
797
                            "Discarding new connection to peer '{}' because we already have an existing connection",
×
798
                            new_conn.peer_node_id().short_str()
×
799
                        );
800
                        // Ignore event, we discarded the new connection and keeping the current one
801
                        return Ok(());
2✔
802
                    },
803
                    TieBreak::UseNew | TieBreak::None => {},
135✔
804
                }
805
            },
806
            PeerDisconnected(id, node_id, _minimized) => {
46✔
807
                if let Some(conn) = self.pool.get_connection(node_id) {
46✔
808
                    if conn.id() != *id {
46✔
809
                        debug!(
4✔
810
                            target: LOG_TARGET,
×
811
                            "Ignoring peer disconnected event for stale peer connection (id: {}) for peer '{}'",
×
812
                            id,
813
                            node_id
814
                        );
815
                        return Ok(());
4✔
816
                    }
42✔
817
                }
×
818
            },
819
            PeerViolation { peer_node_id, details } => {
×
820
                self.ban_peer(
×
821
                    peer_node_id,
×
822
                    Duration::from_secs(2 * 60 * 60),
×
823
                    format!("Peer violation: {details}"),
×
824
                )
×
825
                .await?;
×
826
                return Ok(());
×
827
            },
828
            _ => {},
6✔
829
        }
830

831
        let (node_id, mut new_status, connection) = match event {
181✔
832
            PeerDisconnected(_, node_id, minimized) => (node_id, ConnectionStatus::Disconnected(*minimized), None),
42✔
833
            PeerConnected(conn) => (conn.peer_node_id(), ConnectionStatus::Connected, Some(conn.clone())),
135✔
834
            PeerConnectFailed(node_id, ConnectionManagerError::AllPeerAddressesAreExcluded(msg)) => {
×
835
                debug!(
×
836
                    target: LOG_TARGET,
×
837
                    "Peer '{}' contains only excluded addresses ({})",
×
838
                    node_id,
839
                    msg
840
                );
841
                (node_id, ConnectionStatus::Failed, None)
×
842
            },
843
            PeerConnectFailed(node_id, ConnectionManagerError::NoiseHandshakeError(msg)) => {
×
844
                if let Some(conn) = self.pool.get_connection(node_id) {
×
845
                    debug!(
×
846
                        target: LOG_TARGET,
×
847
                        "Handshake error to peer '{}', disconnecting for a fresh retry ({})",
×
848
                        node_id,
849
                        msg
850
                    );
851
                    let mut conn = conn.clone();
×
852
                    disconnect_with_timeout(
×
853
                        &mut conn,
×
854
                        Minimized::No,
×
855
                        None,
×
856
                        "ConnectivityManagerActor peer connect failed",
×
857
                    )
×
858
                    .await?;
×
859
                }
×
860
                (node_id, ConnectionStatus::Failed, None)
×
861
            },
862
            PeerConnectFailed(node_id, ConnectionManagerError::DialCancelled) => {
×
863
                if let Some(conn) = self.pool.get_connection(node_id) {
×
864
                    if conn.is_connected() && conn.direction().is_inbound() {
×
865
                        debug!(
×
866
                            target: LOG_TARGET,
×
867
                            "Ignoring DialCancelled({}) event because an inbound connection already exists", node_id
×
868
                        );
869

870
                        return Ok(());
×
871
                    }
×
872
                }
×
873
                debug!(
×
874
                    target: LOG_TARGET,
×
875
                    "Dial was cancelled before connection completed to peer '{}'", node_id
×
876
                );
877
                (node_id, ConnectionStatus::Failed, None)
×
878
            },
879
            PeerConnectFailed(node_id, err) => {
4✔
880
                debug!(
4✔
881
                    target: LOG_TARGET,
×
882
                    "Connection to peer '{}' failed because '{:?}'", node_id, err
×
883
                );
884
                self.on_peer_connection_failure(node_id).await?;
4✔
885
                (node_id, ConnectionStatus::Failed, None)
4✔
886
            },
887
            _ => return Ok(()),
2✔
888
        };
889

890
        let old_status = self.pool.set_status(node_id, new_status);
181✔
891
        if let Some(conn) = connection {
181✔
892
            new_status = self.pool.insert_connection(*conn);
135✔
893
        }
135✔
894
        if old_status != new_status {
181✔
895
            debug!(
180✔
896
                target: LOG_TARGET,
×
897
                "Peer connection for node '{}' transitioned from {} to {}", node_id, old_status, new_status
×
898
            );
899
        }
1✔
900

901
        let node_id = node_id.clone();
181✔
902

903
        use ConnectionStatus::{Connected, Disconnected, Failed};
904
        match (old_status, new_status) {
181✔
905
            (_, Connected) => match self.pool.get_connection_mut(&node_id).cloned() {
135✔
906
                Some(conn) => {
135✔
907
                    self.mark_connection_success(conn.peer_node_id().clone());
135✔
908
                    self.publish_event(ConnectivityEvent::PeerConnected(conn.into()));
135✔
909
                },
135✔
910
                None => unreachable!(
×
911
                    "Connection transitioning to CONNECTED state must always have a connection set i.e. \
×
912
                     ConnectionPool::get_connection is Some"
×
913
                ),
×
914
            },
915
            (Connected, Disconnected(..)) => {
916
                self.publish_event(ConnectivityEvent::PeerDisconnected(node_id, match new_status {
37✔
917
                    ConnectionStatus::Disconnected(reason) => reason,
37✔
918
                    _ => Minimized::No,
×
919
                }));
920
            },
921
            // Was not connected so don't broadcast event
922
            (_, Disconnected(..)) => {},
5✔
923
            (_, Failed) => {
4✔
924
                self.publish_event(ConnectivityEvent::PeerConnectFailed(node_id));
4✔
925
            },
4✔
926
            _ => {
927
                error!(
×
928
                    target: LOG_TARGET,
×
929
                    "Unexpected connection status transition ({} to {}) for peer '{}'", old_status, new_status, node_id
×
930
                );
931
            },
932
        }
933

934
        Ok(())
181✔
935
    }
189✔
936

937
    async fn on_new_connection(&mut self, new_conn: &PeerConnection) -> TieBreak {
137✔
938
        match self.pool.get(new_conn.peer_node_id()).cloned() {
137✔
939
            Some(existing_state) if !existing_state.is_connected() => {
8✔
940
                debug!(
4✔
941
                    target: LOG_TARGET,
×
942
                    "Tie break: Existing connection (id: {}, peer: {}, direction: {}) was not connected, resolving \
×
943
                     tie break by using the new connection. (New: id: {}, peer: {}, direction: {})",
×
944
                    existing_state.connection().map(|c| c.id()).unwrap_or_default(),
×
945
                    existing_state.node_id(),
×
946
                    existing_state.connection().map(|c| c.direction().as_str()).unwrap_or("--"),
×
947
                    new_conn.id(),
×
948
                    new_conn.peer_node_id(),
×
949
                    new_conn.direction(),
×
950
                );
951
                self.pool.remove(existing_state.node_id());
4✔
952
                TieBreak::UseNew
4✔
953
            },
954
            Some(mut existing_state) => {
4✔
955
                let Some(existing_conn) = existing_state.connection_mut() else {
4✔
956
                    error!(
×
957
                        target: LOG_TARGET,
×
958
                        "INVARIANT ERROR in Tie break: PeerConnection is None but state is CONNECTED: Existing \
×
959
                        connection (id: {}, peer: {}, direction: {}), new connection. (id: {}, peer: {}, direction: {})",
×
960
                        existing_state.connection().map(|c| c.id()).unwrap_or_default(),
×
961
                        existing_state.node_id(),
×
962
                        existing_state.connection().map(|c| c.direction().as_str()).unwrap_or("--"),
×
963
                        new_conn.id(),
×
964
                        new_conn.peer_node_id(),
×
965
                        new_conn.direction(),
×
966
                    );
967
                    return TieBreak::UseNew;
×
968
                };
969
                if self.tie_break_existing_connection(existing_conn, new_conn) {
4✔
970
                    info!(
2✔
971
                        target: LOG_TARGET,
×
972
                        "Tie break: Keep new connection (id: {}, peer: {}, direction: {}). Disconnect existing \
×
973
                         connection (id: {}, peer: {}, direction: {})",
×
974
                        new_conn.id(),
×
975
                        new_conn.peer_node_id(),
×
976
                        new_conn.direction(),
×
977
                        existing_conn.id(),
×
978
                        existing_conn.peer_node_id(),
×
979
                        existing_conn.direction(),
×
980
                    );
981

982
                    let _result = disconnect_silent_with_timeout(
2✔
983
                        existing_conn,
2✔
984
                        Minimized::Yes,
2✔
985
                        None,
2✔
986
                        "ConnectivityManagerActor tie break",
2✔
987
                    )
2✔
988
                    .await;
2✔
989
                    self.pool.remove(existing_conn.peer_node_id());
2✔
990
                    TieBreak::UseNew
2✔
991
                } else {
992
                    debug!(
2✔
993
                        target: LOG_TARGET,
×
994
                        "Tie break: Keeping existing connection (id: {}, peer: {}, direction: {}). Disconnecting new \
×
995
                         connection (id: {}, peer: {}, direction: {})",
×
996
                        new_conn.id(),
×
997
                        new_conn.peer_node_id(),
×
998
                        new_conn.direction(),
×
999
                        existing_conn.id(),
×
1000
                        existing_conn.peer_node_id(),
×
1001
                        existing_conn.direction(),
×
1002
                    );
1003

1004
                    let _result = disconnect_silent_with_timeout(
2✔
1005
                        &mut new_conn.clone(),
2✔
1006
                        Minimized::Yes,
2✔
1007
                        None,
2✔
1008
                        "ConnectivityManagerActor tie break",
2✔
1009
                    )
2✔
1010
                    .await;
2✔
1011
                    TieBreak::KeepExisting
2✔
1012
                }
1013
            },
1014

1015
            None => TieBreak::None,
129✔
1016
        }
1017
    }
137✔
1018

1019
    /// Two connections to the same peer have been created. This function deterministically determines which peer
1020
    /// connection to close. It does this by comparing our NodeId to that of the peer. This rule enables both sides to
1021
    /// agree which connection to disconnect
1022
    ///
1023
    /// Returns true if the existing connection should close, otherwise false if the new connection should be closed.
1024
    fn tie_break_existing_connection(&self, existing_conn: &PeerConnection, new_conn: &PeerConnection) -> bool {
4✔
1025
        debug_assert_eq!(existing_conn.peer_node_id(), new_conn.peer_node_id());
4✔
1026
        let peer_node_id = existing_conn.peer_node_id();
4✔
1027
        let our_node_id = self.node_identity.node_id();
4✔
1028

4✔
1029
        debug!(
4✔
1030
            target: LOG_TARGET,
×
1031
            "Tie-break: (Existing = {}, New = {})",
×
1032
            existing_conn.direction(),
×
1033
            new_conn.direction()
×
1034
        );
1035
        use ConnectionDirection::{Inbound, Outbound};
1036
        match (existing_conn.direction(), new_conn.direction()) {
4✔
1037
            // They connected to us twice for some reason. Drop the older connection
1038
            (Inbound, Inbound) => true,
×
1039
            // They connected to us at the same time we connected to them
1040
            (Inbound, Outbound) => peer_node_id > our_node_id,
2✔
1041
            // We connected to them at the same time as they connected to us
1042
            (Outbound, Inbound) => our_node_id > peer_node_id,
2✔
1043
            // We connected to them twice for some reason. Drop the older connection.
1044
            (Outbound, Outbound) => true,
×
1045
        }
1046
    }
4✔
1047

1048
    fn update_connectivity_status(&mut self) {
203✔
1049
        // The contract we are making with online/degraded status transitions is as follows:
203✔
1050
        // - If min_connectivity peers are connected we MUST transition to ONLINE
203✔
1051
        // - Clients SHOULD tolerate entering a DEGRADED/OFFLINE status
203✔
1052
        // - If a number of peers disconnect or the local system's network goes down, the status MAY transition to
203✔
1053
        //   DEGRADED
203✔
1054
        let min_peers = self.config.min_connectivity;
203✔
1055
        let num_connected_nodes = self.pool.count_connected_nodes();
203✔
1056
        let num_connected_clients = self.pool.count_connected_clients();
203✔
1057
        debug!(
203✔
1058
            target: LOG_TARGET,
×
1059
            "#min_peers = {}, #nodes = {}, #clients = {}", min_peers, num_connected_nodes, num_connected_clients
×
1060
        );
1061

1062
        match num_connected_nodes {
203✔
1063
            n if n >= min_peers => {
203✔
1064
                self.transition(ConnectivityStatus::Online(n), min_peers);
181✔
1065
            },
181✔
1066
            n if n > 0 && n < min_peers => {
22✔
1067
                self.transition(ConnectivityStatus::Degraded(n), min_peers);
3✔
1068
            },
3✔
1069
            n if n == 0 => {
19✔
1070
                if num_connected_clients == 0 {
19✔
1071
                    self.transition(ConnectivityStatus::Offline, min_peers);
15✔
1072
                } else {
15✔
1073
                    self.transition(ConnectivityStatus::Degraded(n), min_peers);
4✔
1074
                }
4✔
1075
            },
1076
            _ => unreachable!("num_connected is unsigned and only negative pattern covered on this branch"),
×
1077
        }
1078
    }
203✔
1079

1080
    #[cfg(not(feature = "metrics"))]
1081
    fn update_connectivity_metrics(&mut self) {}
1082

1083
    #[allow(clippy::cast_possible_wrap)]
1084
    #[cfg(feature = "metrics")]
1085
    fn update_connectivity_metrics(&mut self) {
203✔
1086
        use std::convert::TryFrom;
1087

1088
        use super::metrics;
1089

1090
        let total = self.pool.count_connected() as i64;
203✔
1091
        let num_inbound = self.pool.count_filtered(|state| match state.connection() {
558✔
1092
            Some(conn) => conn.is_connected() && conn.direction().is_inbound(),
558✔
1093
            None => false,
×
1094
        }) as i64;
558✔
1095

203✔
1096
        metrics::connections(ConnectionDirection::Inbound).set(num_inbound);
203✔
1097
        metrics::connections(ConnectionDirection::Outbound).set(total - num_inbound);
203✔
1098

203✔
1099
        let uptime = self
203✔
1100
            .uptime
203✔
1101
            .map(|ts| i64::try_from(ts.elapsed().as_secs()).unwrap_or(i64::MAX))
203✔
1102
            .unwrap_or(0);
203✔
1103
        metrics::uptime().set(uptime);
203✔
1104
    }
203✔
1105

1106
    fn transition(&mut self, next_status: ConnectivityStatus, required_num_peers: usize) {
203✔
1107
        use ConnectivityStatus::{Degraded, Offline, Online};
1108
        if self.status != next_status {
203✔
1109
            debug!(
166✔
1110
                target: LOG_TARGET,
×
1111
                "Connectivity status transitioning from {} to {}", self.status, next_status
×
1112
            );
1113
        }
37✔
1114

1115
        match (self.status, next_status) {
203✔
1116
            (Online(_), Online(_)) => {},
111✔
1117
            (_, Online(n)) => {
70✔
1118
                info!(
70✔
1119
                    target: LOG_TARGET,
×
1120
                    "Connectivity is ONLINE ({}/{} connections)", n, required_num_peers
×
1121
                );
1122

1123
                #[cfg(feature = "metrics")]
1124
                if self.uptime.is_none() {
70✔
1125
                    self.uptime = Some(Instant::now());
2✔
1126
                }
68✔
1127
                self.publish_event(ConnectivityEvent::ConnectivityStateOnline(n));
70✔
1128
            },
1129
            (Degraded(m), Degraded(n)) => {
2✔
1130
                info!(
2✔
1131
                    target: LOG_TARGET,
×
1132
                    "Connectivity is DEGRADED ({}/{} connections)", n, required_num_peers
×
1133
                );
1134
                if m != n {
2✔
1135
                    self.publish_event(ConnectivityEvent::ConnectivityStateDegraded(n));
1✔
1136
                }
1✔
1137
            },
1138
            (_, Degraded(n)) => {
5✔
1139
                info!(
5✔
1140
                    target: LOG_TARGET,
×
1141
                    "Connectivity is DEGRADED ({}/{} connections)", n, required_num_peers
×
1142
                );
1143
                self.publish_event(ConnectivityEvent::ConnectivityStateDegraded(n));
5✔
1144
            },
1145
            (Offline, Offline) => {},
3✔
1146
            (_, Offline) => {
1147
                warn!(
12✔
1148
                    target: LOG_TARGET,
×
1149
                    "Connectivity is OFFLINE (0/{} connections)", required_num_peers
×
1150
                );
1151
                #[cfg(feature = "metrics")]
1152
                {
12✔
1153
                    self.uptime = None;
12✔
1154
                }
12✔
1155
                self.publish_event(ConnectivityEvent::ConnectivityStateOffline);
12✔
1156
            },
1157
            (status, next_status) => unreachable!("Unexpected status transition ({} to {})", status, next_status),
×
1158
        }
1159
        self.status = next_status;
203✔
1160
    }
203✔
1161

1162
    fn publish_event(&mut self, event: ConnectivityEvent) {
382✔
1163
        // A send operation can only fail if there are no subscribers, so it is safe to ignore the error
382✔
1164
        let _result = self.event_tx.send(event);
382✔
1165
    }
382✔
1166

1167
    async fn ban_peer(
5✔
1168
        &mut self,
5✔
1169
        node_id: &NodeId,
5✔
1170
        duration: Duration,
5✔
1171
        reason: String,
5✔
1172
    ) -> Result<(), ConnectivityError> {
5✔
1173
        info!(
5✔
1174
            target: LOG_TARGET,
×
1175
            "Banning peer {} for {} because: {}",
×
1176
            node_id,
×
1177
            format_duration(duration),
×
1178
            reason
1179
        );
1180
        let ban_result = self.peer_manager.ban_peer_by_node_id(node_id, duration, reason).await;
5✔
1181

1182
        #[cfg(feature = "metrics")]
1183
        super::metrics::banned_peers_counter().inc();
5✔
1184

5✔
1185
        self.publish_event(ConnectivityEvent::PeerBanned(node_id.clone()));
5✔
1186

1187
        if let Some(conn) = self.pool.get_connection_mut(node_id) {
5✔
1188
            disconnect_with_timeout(conn, Minimized::Yes, None, "ConnectivityManagerActor ban peer").await?;
5✔
1189
            let status = self.pool.get_connection_status(node_id);
5✔
1190
            debug!(
5✔
1191
                target: LOG_TARGET,
×
1192
                "Disconnected banned peer {}. The peer connection status is {}", node_id, status
×
1193
            );
1194
        }
×
1195
        ban_result?;
5✔
1196
        Ok(())
5✔
1197
    }
5✔
1198

1199
    async fn execute_proactive_dialing(&mut self, task_id: u64) -> Result<(), ConnectivityError> {
14✔
1200
        debug!(
14✔
1201
            target: LOG_TARGET,
×
1202
            "({}) Starting proactive dialing execution - current connections: {}, target: {}",
×
1203
            task_id,
×
1204
            self.pool.count_connected_nodes(),
×
1205
            self.config.target_connection_count
1206
        );
1207

1208
        // First, clean up old health data to keep metrics accurate
1209
        for stats in self.connection_stats.values_mut() {
14✔
1210
            stats.cleanup_old_health_data(self.config.success_rate_tracking_window);
14✔
1211
        }
14✔
1212

1213
        // Update circuit breaker metrics
1214
        self.update_circuit_breaker_metrics();
14✔
1215

14✔
1216
        // Execute proactive dialing logic
14✔
1217
        match self
14✔
1218
            .proactive_dialer
14✔
1219
            .execute_proactive_dialing(&self.pool, &self.connection_stats, task_id)
14✔
1220
            .await
14✔
1221
        {
1222
            Ok(dialed_count) => {
14✔
1223
                if dialed_count > 0 {
14✔
1224
                    debug!(
×
1225
                        target: LOG_TARGET,
×
1226
                        "({}) Proactive dialing initiated {} peer connections",
×
1227
                        task_id,
1228
                        dialed_count
1229
                    );
1230
                }
14✔
1231
                Ok(())
14✔
1232
            },
1233
            Err(err) => {
×
1234
                error!(
×
1235
                    target: LOG_TARGET,
×
1236
                    "({}) Proactive dialing failed: {:?}",
×
1237
                    task_id,
1238
                    err
1239
                );
1240
                Err(err)
×
1241
            },
1242
        }
1243
    }
14✔
1244

1245
    fn update_circuit_breaker_metrics(&self) {
14✔
1246
        let _circuit_breaker_open_count = self
14✔
1247
            .connection_stats
14✔
1248
            .values()
14✔
1249
            .filter(|stats| stats.health_metrics().circuit_breaker_state().is_open())
14✔
1250
            .count();
14✔
1251

14✔
1252
        // Calculate average peer health score
14✔
1253
        if !self.connection_stats.is_empty() {
14✔
1254
            let total_health: f32 = self
14✔
1255
                .connection_stats
14✔
1256
                .values()
14✔
1257
                .map(|stats| stats.health_score(self.config.success_rate_tracking_window))
14✔
1258
                .sum();
14✔
1259
            let _avg_health = total_health / self.connection_stats.len() as f32;
14✔
1260
        }
14✔
1261
    }
14✔
1262

1263
    fn cleanup_connection_stats(&mut self) {
14✔
1264
        let mut to_remove = Vec::new();
14✔
1265
        for node_id in self.connection_stats.keys() {
14✔
1266
            let status = self.pool.get_connection_status(node_id);
14✔
1267
            if matches!(
14✔
1268
                status,
14✔
1269
                ConnectionStatus::NotConnected | ConnectionStatus::Failed | ConnectionStatus::Disconnected(_)
1270
            ) {
×
1271
                to_remove.push(node_id.clone());
×
1272
            }
14✔
1273
        }
1274
        for node_id in to_remove {
14✔
1275
            self.connection_stats.remove(&node_id);
×
1276
        }
×
1277
    }
14✔
1278
}
1279

1280
enum TieBreak {
1281
    None,
1282
    UseNew,
1283
    KeepExisting,
1284
}
1285

1286
async fn disconnect_with_timeout(
5✔
1287
    connection: &mut PeerConnection,
5✔
1288
    minimized: Minimized,
5✔
1289
    task_id: Option<u64>,
5✔
1290
    requester: &str,
5✔
1291
) -> Result<(), PeerConnectionError> {
5✔
1292
    match tokio::time::timeout(PEER_DISCONNECT_TIMEOUT, connection.disconnect(minimized, requester)).await {
5✔
1293
        Ok(res) => res,
5✔
1294
        Err(_) => {
1295
            warn!(
×
1296
                target: LOG_TARGET,
×
1297
                "Timeout disconnecting peer ({:?}) '{}'",
×
1298
                task_id,
×
1299
                connection.peer_node_id().short_str(),
×
1300
            );
1301
            Err(PeerConnectionError::DisconnectTimeout)
×
1302
        },
1303
    }
1304
}
5✔
1305

1306
async fn disconnect_if_unused_with_timeout(
×
1307
    connection: &mut PeerConnection,
×
1308
    minimized: Minimized,
×
1309
    task_id: Option<u64>,
×
1310
    requester: &str,
×
1311
) -> Result<(), PeerConnectionError> {
×
1312
    match tokio::time::timeout(
×
1313
        PEER_DISCONNECT_TIMEOUT,
×
1314
        connection.disconnect_if_unused(minimized, 0, 0, requester),
×
1315
    )
×
1316
    .await
×
1317
    {
1318
        Ok(res) => res,
×
1319
        Err(_) => {
1320
            warn!(
×
1321
                target: LOG_TARGET,
×
1322
                "Timeout disconnecting peer ({:?}) '{}'",
×
1323
                task_id,
×
1324
                connection.peer_node_id().short_str(),
×
1325
            );
1326
            Err(PeerConnectionError::DisconnectTimeout)
×
1327
        },
1328
    }
1329
}
×
1330

1331
async fn disconnect_silent_with_timeout(
53✔
1332
    connection: &mut PeerConnection,
53✔
1333
    minimized: Minimized,
53✔
1334
    task_id: Option<u64>,
53✔
1335
    requester: &str,
53✔
1336
) -> Result<(), PeerConnectionError> {
53✔
1337
    match tokio::time::timeout(
53✔
1338
        PEER_DISCONNECT_TIMEOUT,
53✔
1339
        connection.disconnect_silent(minimized, requester),
53✔
1340
    )
53✔
1341
    .await
53✔
1342
    {
1343
        Ok(res) => res,
45✔
1344
        Err(_) => {
1345
            warn!(
1✔
1346
                target: LOG_TARGET,
×
1347
                "Timeout disconnecting peer ({:?}) '{}'",
×
1348
                task_id,
×
1349
                connection.peer_node_id().short_str(),
×
1350
            );
1351
            Err(PeerConnectionError::DisconnectTimeout)
1✔
1352
        },
1353
    }
1354
}
46✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc