• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 24654956413

20 Apr 2026 07:52AM UTC coverage: 61.03% (+0.09%) from 60.945%
24654956413

push

github

web-flow
fix(sidechain)!: include epoch_hash in sidechain block header (#7767)

Description
---
fix(sidechain)!: include epoch_hash in sidechain block header

Motivation and Context
---
We need to read the epoch_hash from the sidechain header on Ootle
(https://github.com/tari-project/tari-ootle/pull/2017#issuecomment-4259006786)

This increases the proof size (over the wire) by 33 bytes

How Has This Been Tested?
---
Existing sidechain tests (fixtures updated)

Breaking Changes
---

- [ ] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [x] Other - Please specify

BREAKING CHANGE: this has no breaking change on current testnets or
mainnet. However, it is a breaking change for Ootle when it upgrades to
use these changes.

1 of 3 new or added lines in 2 files covered. (33.33%)

112 existing lines in 9 files now uncovered.

70783 of 115981 relevant lines covered (61.03%)

224161.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.27
/comms/core/src/connection_manager/manager.rs
1
// Copyright 2019, The Tari Project
2
//
3
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
// following conditions are met:
5
//
6
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
// disclaimer.
8
//
9
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
// following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
// products derived from this software without specific prior written permission.
14
//
15
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{fmt, sync::Arc};
24

25
use log::*;
26
use multiaddr::Multiaddr;
27
use tari_shutdown::{Shutdown, ShutdownSignal};
28
use time::Duration;
29
use tokio::{
30
    io::{AsyncRead, AsyncWrite},
31
    sync::{broadcast, mpsc, oneshot},
32
    task,
33
    time,
34
};
35
use tracing::{Instrument, Level, span};
36

37
use super::{
38
    dialer::{Dialer, DialerRequest},
39
    error::ConnectionManagerError,
40
    listener::PeerListener,
41
    peer_connection::PeerConnection,
42
    requester::ConnectionManagerRequest,
43
};
44
#[cfg(feature = "metrics")]
45
use crate::connection_manager::ConnectionDirection;
46
#[cfg(feature = "metrics")]
47
use crate::connection_manager::metrics;
48
use crate::{
49
    Minimized,
50
    PeerManager,
51
    backoff::Backoff,
52
    connection_manager::ConnectionId,
53
    multiplexing::Substream,
54
    net_address::MultiaddrRange,
55
    noise::NoiseConfig,
56
    peer_manager::{NodeId, NodeIdentity, PeerManagerError},
57
    peer_validator::PeerValidatorConfig,
58
    protocol::{NodeNetworkInfo, ProtocolEvent, ProtocolId, Protocols},
59
    transports::{TcpTransport, Transport},
60
};
61

62
const LOG_TARGET: &str = "comms::connection_manager::manager";
63

64
const EVENT_CHANNEL_SIZE: usize = 32;
65
const DIALER_REQUEST_CHANNEL_SIZE: usize = 32;
66

67
/// Connection events
68
#[derive(Debug)]
69
pub enum ConnectionManagerEvent {
70
    // Peer connection
71
    PeerConnected(Box<PeerConnection>),
72
    PeerDisconnected(ConnectionId, NodeId, Minimized),
73
    PeerConnectFailed(NodeId, ConnectionManagerError),
74
    PeerInboundConnectFailed(ConnectionManagerError),
75

76
    // Substreams
77
    NewInboundSubstream(NodeId, ProtocolId, Substream),
78

79
    // Other
80
    PeerViolation { peer_node_id: NodeId, details: String },
81
}
82

83
impl fmt::Display for ConnectionManagerEvent {
84
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1✔
85
        #[allow(clippy::enum_glob_use)]
86
        use ConnectionManagerEvent::*;
87
        match self {
1✔
88
            PeerConnected(conn) => write!(f, "PeerConnected({conn})"),
1✔
89
            PeerDisconnected(id, node_id, minimized) => {
×
90
                write!(f, "PeerDisconnected({}, {}, {:?})", id, node_id.short_str(), minimized)
×
91
            },
92
            PeerConnectFailed(node_id, err) => write!(f, "PeerConnectFailed({}, {:?})", node_id.short_str(), err),
×
93
            PeerInboundConnectFailed(err) => write!(f, "PeerInboundConnectFailed({err:?})",),
×
94
            NewInboundSubstream(node_id, protocol, _) => write!(
×
95
                f,
×
96
                "NewInboundSubstream({}, {}, Stream)",
97
                node_id.short_str(),
×
98
                String::from_utf8_lossy(protocol)
×
99
            ),
100
            PeerViolation { peer_node_id, details } => {
×
101
                write!(f, "PeerViolation({}, {})", peer_node_id.short_str(), details)
×
102
            },
103
        }
104
    }
1✔
105
}
106

107
/// Configuration for ConnectionManager
108
#[derive(Debug, Clone)]
109
pub struct ConnectionManagerConfig {
110
    /// The address to listen on for incoming connections. This address must be supported by the transport.
111
    /// Default: DEFAULT_LISTENER_ADDRESS constant
112
    pub listener_address: Multiaddr,
113
    /// The number of dial attempts to make before giving up. Default: 3
114
    pub max_dial_attempts: usize,
115
    /// The maximum number of connection tasks that will be spawned at the same time. Once this limit is reached, peers
116
    /// attempting to connect will have to wait for another connection attempt to complete. Default: 100
117
    pub max_simultaneous_inbound_connects: usize,
118
    /// Version information for this node
119
    pub network_info: NodeNetworkInfo,
120
    /// The maximum time to wait for the first byte before closing the connection. Default: 3s
121
    pub time_to_first_byte: Duration,
122
    /// The maximum time to wait for a noise protocol handshake message before timing out. For 1.5 RTT XX handshake,
123
    /// the responder will wait 2 x this value (1 per receive) before timing out.
124
    /// Default: 3s
125
    pub noise_handshake_recv_timeout: Duration,
126
    /// The maximum time to wait for a peer to respond on a noise protocol dial.
127
    /// Default: 60s
128
    pub noise_dial_timeout: Duration,
129
    /// The number of liveness check sessions to allow. Default: 0
130
    pub liveness_max_sessions: usize,
131
    /// CIDR blocks that allowlist liveness checks. Default: Localhost only (127.0.0.1/32)
132
    pub liveness_cidr_allowlist: Vec<cidr::AnyIpCidr>,
133
    /// Interval to perform self-liveness ping-pong tests. Default: None/disabled
134
    pub self_liveness_self_check_interval: Option<Duration>,
135
    /// If set, an additional TCP-only p2p listener will be started. This is useful for local wallet connections.
136
    /// Default: None (disabled)
137
    pub auxiliary_tcp_listener_address: Option<Multiaddr>,
138
    /// Peer validation configuration. See [PeerValidatorConfig]
139
    pub peer_validation_config: PeerValidatorConfig,
140
    /// Addresses that should never be dialed
141
    pub excluded_dial_addresses: Vec<MultiaddrRange>,
142
}
143

144
impl Default for ConnectionManagerConfig {
145
    fn default() -> Self {
130✔
146
        Self {
130✔
147
            #[cfg(not(test))]
130✔
148
            listener_address: "/ip4/0.0.0.0/tcp/7898"
130✔
149
                .parse()
130✔
150
                .expect("DEFAULT_LISTENER_ADDRESS is malformed"),
130✔
151
            #[cfg(test)]
130✔
152
            listener_address: "/memory/0".parse().unwrap(),
130✔
153
            max_dial_attempts: 1,
130✔
154
            max_simultaneous_inbound_connects: 100,
130✔
155
            network_info: Default::default(),
130✔
156
            liveness_max_sessions: 1,
130✔
157
            time_to_first_byte: Duration::from_secs(6),
130✔
158
            liveness_cidr_allowlist: vec![cidr::AnyIpCidr::V4("127.0.0.1/32".parse().unwrap())],
130✔
159
            self_liveness_self_check_interval: None,
130✔
160
            auxiliary_tcp_listener_address: None,
130✔
161
            peer_validation_config: PeerValidatorConfig::default(),
130✔
162
            noise_handshake_recv_timeout: Duration::from_secs(6),
130✔
163
            noise_dial_timeout: Duration::from_secs(60),
130✔
164
            excluded_dial_addresses: vec![],
130✔
165
        }
130✔
166
    }
130✔
167
}
168

169
/// Container struct for the listener addresses
170
#[derive(Debug, Clone)]
171
pub struct ListenerInfo {
172
    bind_address: Multiaddr,
173
    aux_bind_address: Option<Multiaddr>,
174
}
175

176
impl ListenerInfo {
177
    /// The address that was bound on. In the case of TCP, if the OS has decided which port to bind on (0.0.0.0:0), this
178
    /// address contains the actual port that was used.
179
    pub fn bind_address(&self) -> &Multiaddr {
84✔
180
        &self.bind_address
84✔
181
    }
84✔
182

183
    /// The auxiliary TCP address that was bound on if enabled.
184
    pub fn auxiliary_bind_address(&self) -> Option<&Multiaddr> {
1✔
185
        self.aux_bind_address.as_ref()
1✔
186
    }
1✔
187
}
188

189
/// The actor responsible for connection management.
190
pub(crate) struct ConnectionManager<TTransport, TBackoff> {
191
    request_rx: mpsc::Receiver<ConnectionManagerRequest>,
192
    internal_event_rx: mpsc::Receiver<ConnectionManagerEvent>,
193
    dialer_tx: mpsc::Sender<DialerRequest>,
194
    dialer: Option<Dialer<TTransport, TBackoff>>,
195
    listener: Option<PeerListener<TTransport>>,
196
    aux_listener: Option<PeerListener<TcpTransport>>,
197
    peer_manager: Arc<PeerManager>,
198
    shutdown_signal: Option<ShutdownSignal>,
199
    protocols: Protocols<Substream>,
200
    listener_info: Option<ListenerInfo>,
201
    listening_notifiers: Vec<oneshot::Sender<ListenerInfo>>,
202
    connection_manager_events_tx: broadcast::Sender<Arc<ConnectionManagerEvent>>,
203
    complete_trigger: Shutdown,
204
}
205

206
impl<TTransport, TBackoff> ConnectionManager<TTransport, TBackoff>
207
where
208
    TTransport: Transport + Unpin + Send + Sync + Clone + 'static,
209
    TTransport::Output: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
210
    TBackoff: Backoff + Send + Sync + 'static,
211
{
212
    pub(crate) fn new(
121✔
213
        mut config: ConnectionManagerConfig,
121✔
214
        transport: TTransport,
121✔
215
        backoff: TBackoff,
121✔
216
        request_rx: mpsc::Receiver<ConnectionManagerRequest>,
121✔
217
        node_identity: Arc<NodeIdentity>,
121✔
218
        peer_manager: Arc<PeerManager>,
121✔
219
        connection_manager_events_tx: broadcast::Sender<Arc<ConnectionManagerEvent>>,
121✔
220
        shutdown_signal: ShutdownSignal,
121✔
221
    ) -> Self {
121✔
222
        let (internal_event_tx, internal_event_rx) = mpsc::channel(EVENT_CHANNEL_SIZE);
121✔
223
        let (dialer_tx, dialer_rx) = mpsc::channel(DIALER_REQUEST_CHANNEL_SIZE);
121✔
224

225
        let noise_config = NoiseConfig::new(node_identity.clone())
121✔
226
            .with_recv_timeout(config.noise_handshake_recv_timeout)
121✔
227
            .with_dial_timeout(config.noise_dial_timeout);
121✔
228

229
        let listener = PeerListener::new(
121✔
230
            config.clone(),
121✔
231
            config.listener_address.clone(),
121✔
232
            transport.clone(),
121✔
233
            noise_config.clone(),
121✔
234
            internal_event_tx.clone(),
121✔
235
            peer_manager.clone(),
121✔
236
            node_identity.clone(),
121✔
237
            shutdown_signal.clone(),
121✔
238
        );
239

240
        let aux_listener = config.auxiliary_tcp_listener_address.take().map(|addr| {
121✔
241
            info!(target: LOG_TARGET, "Starting auxiliary listener on {addr}");
1✔
242
            let aux_config = ConnectionManagerConfig {
1✔
243
                // Disable liveness checks on the auxiliary listener
1✔
244
                self_liveness_self_check_interval: None,
1✔
245
                ..config.clone()
1✔
246
            };
1✔
247
            PeerListener::new(
1✔
248
                aux_config,
1✔
249
                addr,
1✔
250
                TcpTransport::new(),
1✔
251
                noise_config.clone(),
1✔
252
                internal_event_tx.clone(),
1✔
253
                peer_manager.clone(),
1✔
254
                node_identity.clone(),
1✔
255
                shutdown_signal.clone(),
1✔
256
            )
257
        });
1✔
258

259
        let dialer = Dialer::new(
121✔
260
            config,
121✔
261
            node_identity,
121✔
262
            peer_manager.clone(),
121✔
263
            transport,
121✔
264
            noise_config,
121✔
265
            backoff,
121✔
266
            dialer_rx,
121✔
267
            internal_event_tx,
121✔
268
            shutdown_signal.clone(),
121✔
269
        );
270

271
        Self {
121✔
272
            shutdown_signal: Some(shutdown_signal),
121✔
273
            request_rx,
121✔
274
            peer_manager,
121✔
275
            protocols: Protocols::new(),
121✔
276
            internal_event_rx,
121✔
277
            dialer_tx,
121✔
278
            dialer: Some(dialer),
121✔
279
            listener: Some(listener),
121✔
280
            listener_info: None,
121✔
281
            aux_listener,
121✔
282
            listening_notifiers: Vec::new(),
121✔
283
            connection_manager_events_tx,
121✔
284
            complete_trigger: Shutdown::new(),
121✔
285
        }
121✔
286
    }
121✔
287

288
    pub fn add_protocols(&mut self, protocols: Protocols<Substream>) -> &mut Self {
233✔
289
        self.protocols.extend(protocols);
233✔
290
        self
233✔
291
    }
233✔
292

293
    pub fn complete_signal(&self) -> ShutdownSignal {
113✔
294
        self.complete_trigger.to_signal()
113✔
295
    }
113✔
296

297
    pub fn spawn(self) -> task::JoinHandle<()> {
120✔
298
        task::spawn(self.run())
120✔
299
    }
120✔
300

301
    pub async fn run(mut self) {
121✔
302
        let span = span!(Level::DEBUG, "comms::connection_manager::run");
121✔
303
        let _enter = span.enter();
121✔
304
        let mut shutdown = self
121✔
305
            .shutdown_signal
121✔
306
            .take()
121✔
307
            .expect("ConnectionManager initialized without a shutdown");
121✔
308

309
        // Runs the listeners. Sockets are bound and ready once this resolves
310
        match self.run_listeners().await {
121✔
311
            Ok(info) => {
121✔
312
                self.listener_info = Some(info);
121✔
313
            },
121✔
314
            Err(err) => {
×
315
                error!(
×
316
                    target: LOG_TARGET,
×
317
                    "Failed to start listener(s). {err}. Connection manager is quitting."
318
                );
319
                return;
×
320
            },
321
        };
322
        self.run_dialer();
121✔
323
        // Notify any awaiting tasks that the listener(s) are ready to receive connections
324
        self.notify_all_ready();
121✔
325

326
        debug!(
121✔
327
            target: LOG_TARGET,
×
328
            "Connection manager started. Protocols supported by this node: {}",
329
            self.protocols
×
330
                .iter()
×
331
                .map(|p| String::from_utf8_lossy(p))
×
332
                .collect::<Vec<_>>()
×
333
                .join(", ")
×
334
        );
335
        loop {
336
            tokio::select! {
713✔
337
                Some(event) = self.internal_event_rx.recv() => {
713✔
338
                    self.handle_event(event).await;
401✔
339
                },
340

341
                Some(request) = self.request_rx.recv() => {
713✔
342
                    self.handle_request(request).await;
191✔
343
                },
344

345
                _ = &mut shutdown => {
713✔
346
                    info!(target: LOG_TARGET, "ConnectionManager is shutting down because it received the shutdown signal");
84✔
347
                    break;
84✔
348
                }
349
            }
350
        }
351
    }
84✔
352

353
    async fn run_listeners(&mut self) -> Result<ListenerInfo, ConnectionManagerError> {
121✔
354
        let mut listener = self
121✔
355
            .listener
121✔
356
            .take()
121✔
357
            .expect("ConnectionManager initialized without a listener");
121✔
358

359
        listener.set_supported_protocols(self.protocols.get_supported_protocols());
121✔
360

361
        let mut listener_info = match listener.listen().await {
121✔
362
            Ok(bind_address) => ListenerInfo {
121✔
363
                bind_address,
121✔
364
                aux_bind_address: None,
121✔
365
            },
121✔
366
            Err(err) => return Err(err),
×
367
        };
368

369
        if let Some(mut listener) = self.aux_listener.take() {
121✔
370
            listener.set_supported_protocols(self.protocols.get_supported_protocols());
1✔
371
            let addr = listener.listen().await?;
1✔
372
            debug!(target: LOG_TARGET, "Aux TCP listener bound to address {addr}");
1✔
373
            listener_info.aux_bind_address = Some(addr);
1✔
374
        }
120✔
375

376
        Ok(listener_info)
121✔
377
    }
121✔
378

379
    fn run_dialer(&mut self) {
121✔
380
        let mut dialer = self
121✔
381
            .dialer
121✔
382
            .take()
121✔
383
            .expect("ConnectionManager initialized without a dialer");
121✔
384

385
        dialer.set_supported_protocols(self.protocols.get_supported_protocols());
121✔
386
        dialer.spawn();
121✔
387
    }
121✔
388

389
    async fn handle_request(&mut self, request: ConnectionManagerRequest) {
191✔
390
        use ConnectionManagerRequest::{CancelDial, DialPeer, NotifyListening};
391
        trace!(target: LOG_TARGET, "Connection manager got request: {request:?}" );
191✔
392
        match request {
191✔
393
            DialPeer { node_id, reply_tx } => {
102✔
394
                let tracing_id = tracing::Span::current().id();
102✔
395
                let span = span!(Level::TRACE, "connection_manager::handle_request");
102✔
396
                span.follows_from(tracing_id);
102✔
397
                self.dial_peer(node_id, reply_tx).instrument(span).await
102✔
398
            },
399
            CancelDial(node_id) => {
1✔
400
                if let Err(err) = self.dialer_tx.send(DialerRequest::CancelPendingDial(node_id)).await {
1✔
401
                    error!(
×
402
                        target: LOG_TARGET,
×
403
                        "Failed to send cancel dial request to dialer: {err}"
404
                    );
405
                }
1✔
406
            },
407
            NotifyListening(reply) => match self.listener_info.as_ref() {
88✔
408
                Some(info) => {
88✔
409
                    let _result = reply.send(info.clone());
88✔
410
                },
88✔
411
                None => {
×
412
                    self.listening_notifiers.push(reply);
×
413
                },
×
414
            },
415
        }
416
    }
191✔
417

418
    fn notify_all_ready(&mut self) {
121✔
419
        let info = self
121✔
420
            .listener_info
121✔
421
            .as_ref()
121✔
422
            .expect("notify_all_ready called before listeners were successfully bound");
121✔
423
        for notifier in self.listening_notifiers.drain(..) {
121✔
424
            let _result = notifier.send(info.clone());
×
425
        }
×
426
    }
121✔
427

428
    async fn handle_event(&mut self, event: ConnectionManagerEvent) {
401✔
429
        use ConnectionManagerEvent::*;
430

431
        match event {
401✔
432
            NewInboundSubstream(node_id, protocol, stream) => {
177✔
433
                let proto_str = String::from_utf8_lossy(&protocol);
177✔
434
                debug!(
177✔
435
                    target: LOG_TARGET,
×
436
                    "New inbound substream for peer '{}' speaking protocol '{}'",
437
                    node_id.short_str(),
×
438
                    proto_str
439
                );
440
                #[cfg(feature = "metrics")]
441
                metrics::inbound_substream_counter(&protocol).inc();
177✔
442
                let notify_fut = self
177✔
443
                    .protocols
177✔
444
                    .notify(&protocol, ProtocolEvent::NewInboundSubstream(node_id, stream));
177✔
445
                match time::timeout(Duration::from_secs(10), notify_fut).await {
177✔
446
                    Ok(Ok(_)) => {
447
                        debug!(target: LOG_TARGET, "Protocol notification for '{proto_str}' sent" );
177✔
448
                    },
UNCOV
449
                    Ok(Err(err)) => {
×
UNCOV
450
                        error!(
×
451
                            target: LOG_TARGET,
×
452
                            "Error sending NewSubstream notification for protocol '{proto_str}' because '{err:?}'"
453
                        );
454
                    },
455
                    Err(err) => {
×
456
                        error!(
×
457
                            target: LOG_TARGET,
×
458
                            "Error sending NewSubstream notification for protocol '{proto_str}' because {err}"
459
                        );
460
                    },
461
                }
462
            },
463

464
            PeerConnected(conn) => {
158✔
465
                if conn.direction().is_inbound() {
158✔
466
                    // Notify the dialer that we have an inbound connection, so that is can resolve any pending dials.
467
                    let _result = self
79✔
468
                        .dialer_tx
79✔
469
                        .send(DialerRequest::NotifyNewInboundConnection(conn.clone()))
79✔
470
                        .await;
79✔
471
                }
79✔
472
                #[cfg(feature = "metrics")]
473
                metrics::successful_connections(conn.direction()).inc();
158✔
474
                self.publish_event(PeerConnected(conn));
158✔
475
            },
476
            PeerConnectFailed(peer, err) => {
4✔
477
                #[cfg(feature = "metrics")]
4✔
478
                metrics::failed_connections(ConnectionDirection::Outbound).inc();
4✔
479
                self.publish_event(PeerConnectFailed(peer, err));
4✔
480
            },
4✔
481
            PeerInboundConnectFailed(err) => {
×
482
                #[cfg(feature = "metrics")]
×
483
                metrics::failed_connections(ConnectionDirection::Inbound).inc();
×
484
                self.publish_event(PeerInboundConnectFailed(err));
×
485
            },
×
486
            event => {
62✔
487
                self.publish_event(event);
62✔
488
            },
62✔
489
        }
490
    }
401✔
491

492
    #[inline]
493
    async fn send_dialer_request(&mut self, req: DialerRequest) {
101✔
494
        if let Err(err) = self.dialer_tx.send(req).await {
101✔
495
            error!(target: LOG_TARGET, "Failed to send request to dialer because '{err}'");
×
496
        }
101✔
497
    }
101✔
498

499
    fn publish_event(&self, event: ConnectionManagerEvent) {
224✔
500
        // Error on no subscribers can be ignored
501
        let _result = self.connection_manager_events_tx.send(Arc::new(event));
224✔
502
    }
224✔
503

504
    async fn dial_peer(
102✔
505
        &mut self,
102✔
506
        node_id: NodeId,
102✔
507
        reply: Option<oneshot::Sender<Result<PeerConnection, ConnectionManagerError>>>,
102✔
508
    ) {
102✔
509
        match self.peer_manager.find_by_node_id(&node_id).await {
102✔
510
            Ok(Some(peer)) => {
101✔
511
                self.send_dialer_request(DialerRequest::Dial(Box::new(peer), reply))
101✔
512
                    .await;
101✔
513
            },
514
            Ok(None) => {
515
                warn!(target: LOG_TARGET, "Peer not found for dial");
1✔
516
                if let Some(reply) = reply {
1✔
517
                    let _result = reply.send(Err(ConnectionManagerError::PeerManagerError(
1✔
518
                        PeerManagerError::peer_not_found(&node_id),
1✔
519
                    )));
1✔
520
                }
1✔
521
            },
522
            Err(err) => {
×
523
                warn!(target: LOG_TARGET, "Failed to fetch peer to dial because '{err}'");
×
524
                if let Some(reply) = reply {
×
525
                    let _result = reply.send(Err(ConnectionManagerError::PeerManagerError(err)));
×
526
                }
×
527
            },
528
        }
529
    }
102✔
530
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc