• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 19468834672

18 Nov 2025 02:01PM UTC coverage: 51.294% (-0.3%) from 51.544%
19468834672

push

github

web-flow
feat: add coin selection and spending via bins or buckets (#7584)

Description
---

Add range limit coin-join:
- Added an unspent output coin distribution gRPC method
('CoinHistogramRequest'), whereby the wallet will return the amount and
value of coins in a pre-set range of buckets.
- Added a range limit coin-join gRPC method ('RangeLimitCoinJoin') to
the wallet, whereby the user can specify the minimum target amount,
maximum number of inputs, dust lower bound (inclusive), dust upper bound
(exclusive) and fee. Transaction size will be limited to the specified
maximum number of inputs, and multiple outputs will be created according
to the minimum target amount. All the inputs in the range will be spent,
unless the total available amount does not meet the minimum target
amount.

Closes #7582.

Motivation and Context
---
See #7582.

How Has This Been Tested?
---
System-level testing

gRPC **CoinHistogram** method
```
{
  "buckets": [
    {
      "count": "0",
      "total_amount": "0",
      "lower_bound": "0",
      "upper_bound": "1000"
    },
    {
      "count": "0",
      "total_amount": "0",
      "lower_bound": "1000",
      "upper_bound": "100000"
    },
    {
      "count": "2",
      "total_amount": "1165548",
      "lower_bound": "100000",
      "upper_bound": "1000000"
    },
    {
      "count": "158",
      "total_amount": "1455989209",
      "lower_bound": "1000000",
      "upper_bound": "1000000000"
    },
    {
      "count": "0",
      "total_amount": "0",
      "lower_bound": "1000000000",
      "upper_bound": "100000000000"
    },
    {
      "count": "0",
      "total_amount": "0",
      "lower_bound": "100000000000",
      "upper_bound": "21000000000000000"
    }
  ]
}
```

gRPC **RangeLimitCoinJoin** method

In this example, two transactions were created, bounded by the 350 input
size limit. gRPC client view:

<img width="897" height="396" alt="image"
src="https://github.com/user-attachments/assets/6c5ae857-8a01-4c90-9c55-1eee2fbd... (continued)

0 of 636 new or added lines in 11 files covered. (0.0%)

17 existing lines in 8 files now uncovered.

59180 of 115373 relevant lines covered (51.29%)

7948.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.06
/comms/core/src/connection_manager/peer_connection.rs
1
// Copyright 2019, The Tari Project
2
//
3
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
// following conditions are met:
5
//
6
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
// disclaimer.
8
//
9
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
// following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
// products derived from this software without specific prior written permission.
14
//
15
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{
24
    fmt,
25
    future::Future,
26
    sync::{
27
        atomic::{AtomicBool, AtomicUsize, Ordering},
28
        Arc,
29
    },
30
    time::{Duration, Instant},
31
};
32

33
use futures::{future::BoxFuture, stream::FuturesUnordered};
34
use log::*;
35
use multiaddr::Multiaddr;
36
use tari_shutdown::oneshot_trigger::OneshotTrigger;
37
use tokio::{
38
    sync::{mpsc, oneshot},
39
    time,
40
};
41
use tokio_stream::StreamExt;
42
use tracing::{span, Instrument, Level};
43

44
use super::{direction::ConnectionDirection, error::PeerConnectionError, manager::ConnectionManagerEvent};
45
#[cfg(feature = "rpc")]
46
use crate::protocol::rpc::{
47
    pool::RpcClientPool,
48
    pool::RpcPoolClient,
49
    NamedProtocolService,
50
    RpcClient,
51
    RpcClientBuilder,
52
    RpcError,
53
    RPC_MAX_FRAME_SIZE,
54
};
55
use crate::{
56
    framing,
57
    framing::CanonicalFraming,
58
    multiplexing::{Control, IncomingSubstreams, Substream, Yamux, YamuxControlError},
59
    peer_manager::{NodeId, PeerFeatures},
60
    protocol::{ProtocolId, ProtocolNegotiation},
61
    utils::atomic_ref_counter::AtomicRefCounter,
62
    Minimized,
63
};
64

65
const LOG_TARGET: &str = "comms::connection_manager::peer_connection";
66

67
const PROTOCOL_NEGOTIATION_TIMEOUT: Duration = Duration::from_secs(10);
68

69
static ID_COUNTER: AtomicUsize = AtomicUsize::new(0);
70

71
pub fn create(
116✔
72
    connection: Yamux,
116✔
73
    peer_addr: Multiaddr,
116✔
74
    peer_node_id: NodeId,
116✔
75
    peer_features: PeerFeatures,
116✔
76
    direction: ConnectionDirection,
116✔
77
    event_notifier: mpsc::Sender<ConnectionManagerEvent>,
116✔
78
    our_supported_protocols: Arc<Vec<ProtocolId>>,
116✔
79
    their_supported_protocols: Vec<ProtocolId>,
116✔
80
) -> PeerConnection {
116✔
81
    trace!(
116✔
82
        target: LOG_TARGET,
×
83
        "(Peer={}) Socket successfully upgraded to multiplexed socket",
×
84
        peer_node_id.short_str()
×
85
    );
86
    // All requests are request/response, so a channel size of 1 is all that is needed
87
    let (peer_tx, peer_rx) = mpsc::channel(1);
116✔
88
    let id = ID_COUNTER.fetch_add(1, Ordering::SeqCst); // Monotonic
116✔
89
    let substream_counter = connection.substream_counter();
116✔
90
    let peer_conn = PeerConnection::new(
116✔
91
        id,
116✔
92
        peer_tx,
116✔
93
        peer_node_id.clone(),
116✔
94
        peer_features,
116✔
95
        peer_addr,
116✔
96
        direction,
116✔
97
        substream_counter,
116✔
98
    );
99
    let peer_actor = PeerConnectionActor::new(
116✔
100
        id,
116✔
101
        peer_node_id,
116✔
102
        direction,
116✔
103
        connection,
116✔
104
        peer_rx,
116✔
105
        event_notifier,
116✔
106
        our_supported_protocols,
116✔
107
        their_supported_protocols,
116✔
108
    );
109
    tokio::spawn(peer_actor.run());
116✔
110

111
    peer_conn
116✔
112
}
116✔
113

114
/// Request types for the PeerConnection actor.
115
#[derive(Debug)]
116
pub enum PeerConnectionRequest {
117
    /// Open a new substream and negotiate the given protocol
118
    OpenSubstream {
119
        protocol_id: ProtocolId,
120
        reply_tx: oneshot::Sender<Result<NegotiatedSubstream<Substream>, PeerConnectionError>>,
121
    },
122
    /// Disconnect all substreams and close the transport connection
123
    Disconnect(
124
        bool,
125
        oneshot::Sender<Result<(), PeerConnectionError>>,
126
        Minimized,
127
        String,
128
    ),
129
}
130

131
/// ID type for peer connections
132
pub type ConnectionId = usize;
133

134
/// Request handle for an active peer connection
135
#[derive(Debug, Clone)]
136
pub struct PeerConnection {
137
    id: ConnectionId,
138
    peer_node_id: NodeId,
139
    peer_features: PeerFeatures,
140
    request_tx: mpsc::Sender<PeerConnectionRequest>,
141
    address: Arc<Multiaddr>,
142
    direction: ConnectionDirection,
143
    started_at: Instant,
144
    substream_counter: AtomicRefCounter,
145
    handle_counter: Arc<()>,
146
    drop_notifier: OneshotTrigger<NodeId>,
147
    force_disconnect_rpc_clients_when_clone_drops: Arc<AtomicBool>,
148
    rpc_session_states: Vec<Arc<AtomicBool>>,
149
}
150

151
impl PeerConnection {
152
    pub(crate) fn new(
263✔
153
        id: ConnectionId,
263✔
154
        request_tx: mpsc::Sender<PeerConnectionRequest>,
263✔
155
        peer_node_id: NodeId,
263✔
156
        peer_features: PeerFeatures,
263✔
157
        address: Multiaddr,
263✔
158
        direction: ConnectionDirection,
263✔
159
        substream_counter: AtomicRefCounter,
263✔
160
    ) -> Self {
263✔
161
        Self {
263✔
162
            id,
263✔
163
            request_tx,
263✔
164
            peer_node_id,
263✔
165
            peer_features,
263✔
166
            address: Arc::new(address),
263✔
167
            direction,
263✔
168
            started_at: Instant::now(),
263✔
169
            substream_counter,
263✔
170
            handle_counter: Arc::new(()),
263✔
171
            drop_notifier: OneshotTrigger::<NodeId>::new(),
263✔
172
            force_disconnect_rpc_clients_when_clone_drops: Arc::new(Default::default()),
263✔
173
            rpc_session_states: Vec::new(),
263✔
174
        }
263✔
175
    }
263✔
176

177
    pub fn peer_node_id(&self) -> &NodeId {
1,839✔
178
        &self.peer_node_id
1,839✔
179
    }
1,839✔
180

181
    pub fn peer_features(&self) -> PeerFeatures {
1,094✔
182
        self.peer_features
1,094✔
183
    }
1,094✔
184

185
    pub fn direction(&self) -> ConnectionDirection {
741✔
186
        self.direction
741✔
187
    }
741✔
188

189
    pub fn known_address(&self) -> Option<&Multiaddr> {
×
190
        if self.direction.is_outbound() {
×
191
            Some(self.address())
×
192
        } else {
193
            None
×
194
        }
195
    }
×
196

197
    pub fn address(&self) -> &Multiaddr {
×
198
        &self.address
×
199
    }
×
200

201
    pub fn id(&self) -> ConnectionId {
70✔
202
        self.id
70✔
203
    }
70✔
204

205
    pub fn is_connected(&self) -> bool {
2,521✔
206
        !self.request_tx.is_closed()
2,521✔
207
    }
2,521✔
208

209
    /// Returns a owned future that resolves on disconnection
210
    pub fn on_disconnect(&self) -> impl Future<Output = ()> + 'static {
75✔
211
        let request_tx = self.request_tx.clone();
75✔
212
        async move { request_tx.closed().await }
75✔
213
    }
75✔
214

215
    pub fn age(&self) -> Duration {
1✔
216
        self.started_at.elapsed()
1✔
217
    }
1✔
218

219
    pub fn substream_count(&self) -> usize {
1✔
220
        self.substream_counter.get()
1✔
221
    }
1✔
222

223
    pub fn handle_count(&self) -> usize {
1,163✔
224
        Arc::strong_count(&self.handle_counter)
1,163✔
225
    }
1,163✔
226

227
    pub async fn open_substream(
145✔
228
        &mut self,
145✔
229
        protocol_id: &ProtocolId,
145✔
230
    ) -> Result<NegotiatedSubstream<Substream>, PeerConnectionError> {
145✔
231
        let (reply_tx, reply_rx) = oneshot::channel();
145✔
232
        let _unused = self
145✔
233
            .request_tx
145✔
234
            .send(PeerConnectionRequest::OpenSubstream {
145✔
235
                protocol_id: protocol_id.clone(),
145✔
236
                reply_tx,
145✔
237
            })
145✔
238
            .await
145✔
239
            .inspect_err(|e| {
145✔
240
                info!(
×
241
                    target: LOG_TARGET,
×
242
                    "Failed to send OpenSubstream request for protocol `{}` to peer `{}`: {}",
×
243
                    String::from_utf8_lossy(protocol_id),
×
244
                    self.peer_node_id,
245
                    e
246
                );
247
            });
×
248
        reply_rx
145✔
249
            .await
145✔
250
            .map_err(|_| PeerConnectionError::InternalReplyCancelled)?
145✔
251
    }
145✔
252

253
    pub async fn open_framed_substream(
99✔
254
        &mut self,
99✔
255
        protocol_id: &ProtocolId,
99✔
256
        max_frame_size: usize,
99✔
257
    ) -> Result<CanonicalFraming<Substream>, PeerConnectionError> {
99✔
258
        let substream = self.open_substream(protocol_id).await?;
99✔
259
        Ok(framing::canonical(substream.stream, max_frame_size))
99✔
260
    }
99✔
261

262
    #[cfg(feature = "rpc")]
263
    pub async fn connect_rpc<T>(&mut self) -> Result<T, RpcError>
21✔
264
    where T: From<RpcClient> + NamedProtocolService {
21✔
265
        self.connect_rpc_using_builder(Default::default()).await
21✔
266
    }
21✔
267

268
    #[cfg(feature = "rpc")]
269
    pub async fn connect_rpc_using_builder<T>(&mut self, builder: RpcClientBuilder<T>) -> Result<T, RpcError>
75✔
270
    where T: From<RpcClient> + NamedProtocolService {
75✔
271
        let protocol = ProtocolId::from_static(T::PROTOCOL_NAME);
75✔
272
        debug!(
75✔
273
            target: LOG_TARGET,
×
274
            "Attempting to establish RPC protocol `{}` to peer `{}`",
×
275
            String::from_utf8_lossy(&protocol),
×
276
            self.peer_node_id
277
        );
278
        let framed = self.open_framed_substream(&protocol, RPC_MAX_FRAME_SIZE).await?;
75✔
279
        let rpc_session_state = Arc::new(AtomicBool::new(true));
75✔
280

281
        let rpc_client = builder
75✔
282
            .with_protocol_id(protocol)
75✔
283
            .with_node_id(self.peer_node_id.clone())
75✔
284
            .with_terminate_signal(self.drop_notifier.to_signal())
75✔
285
            .with_session_state(rpc_session_state.clone())
75✔
286
            .connect(framed)
75✔
287
            .await?;
75✔
288
        self.rpc_session_states.push(rpc_session_state.clone());
73✔
289

290
        Ok(rpc_client)
73✔
291
    }
75✔
292

293
    /// Creates a new RpcClientPool that can be shared between tasks. The client pool will lazily establish up to
294
    /// `max_sessions` sessions and provides client session that is least used.
295
    #[cfg(feature = "rpc")]
296
    pub fn create_rpc_client_pool<T>(
5✔
297
        &self,
5✔
298
        max_sessions: usize,
5✔
299
        client_config: RpcClientBuilder<T>,
5✔
300
    ) -> RpcClientPool<T>
5✔
301
    where
5✔
302
        T: RpcPoolClient + From<RpcClient> + NamedProtocolService + Clone,
5✔
303
    {
304
        RpcClientPool::new(self.clone(), max_sessions, client_config)
5✔
305
    }
5✔
306

307
    fn rpc_session_count(&self) -> usize {
265✔
308
        self.rpc_session_states
265✔
309
            .iter()
265✔
310
            .filter(|s| s.load(Ordering::Relaxed))
265✔
311
            .count()
265✔
312
    }
265✔
313

314
    /// Immediately disconnects the peer connection. This can only fail if the peer connection worker
315
    /// is shut down (and the peer is already disconnected)
316
    pub async fn disconnect(&mut self, minimized: Minimized, requester: &str) -> Result<(), PeerConnectionError> {
18✔
317
        trace!(
18✔
318
            target: LOG_TARGET,
×
319
            "Hard disconnect - requester: '{}', peer: `{}`, RPC clients: {}, substreams {}",
×
320
            requester,
321
            self.peer_node_id,
322
            self.rpc_session_count(),
×
323
            self.substream_count()
×
324
        );
325
        let (reply_tx, reply_rx) = oneshot::channel();
18✔
326
        let _unused = self
18✔
327
            .request_tx
18✔
328
            .send(PeerConnectionRequest::Disconnect(
18✔
329
                false,
18✔
330
                reply_tx,
18✔
331
                minimized,
18✔
332
                requester.to_string(),
18✔
333
            ))
18✔
334
            .await
18✔
335
            .inspect_err(|e| {
18✔
336
                info!(
×
337
                    target: LOG_TARGET,
×
338
                    "Failed to send Disconnect request to peer `{}`: {}",
×
339
                    self.peer_node_id,
340
                    e
341
                );
342
            });
×
343
        reply_rx
18✔
344
            .await
18✔
345
            .map_err(|_| PeerConnectionError::InternalReplyCancelled)?
18✔
346
    }
18✔
347

348
    /// Request to disconnect the peer connection if unused by other services.
349
    pub async fn disconnect_if_unused(
×
350
        &mut self,
×
351
        minimized: Minimized,
×
352
        expected_rpc: usize,
×
353
        expected_substreams: usize,
×
354
        requester: &str,
×
355
    ) -> Result<(), PeerConnectionError> {
×
356
        let number_of_rpc_clients = self.rpc_session_count();
×
357
        let substream_count = self.substream_count();
×
358
        if number_of_rpc_clients > expected_rpc || substream_count > expected_substreams {
×
359
            trace!(
×
360
                target: LOG_TARGET,
×
361
                "Soft disconnect - requester: '{}', peer: `{}`, RPC clients: {}, substreams {}, NOT disconnecting",
×
362
                requester,
363
                self.peer_node_id,
364
                number_of_rpc_clients,
365
                self.substream_count()
×
366
            );
367
            Ok(())
×
368
        } else {
369
            self.disconnect(minimized, requester).await
×
370
        }
371
    }
×
372

373
    pub(crate) async fn disconnect_silent(
58✔
374
        &mut self,
58✔
375
        minimized: Minimized,
58✔
376
        requester: &str,
58✔
377
    ) -> Result<(), PeerConnectionError> {
58✔
378
        let (reply_tx, reply_rx) = oneshot::channel();
58✔
379
        let _unused = self
58✔
380
            .request_tx
58✔
381
            .send(PeerConnectionRequest::Disconnect(
58✔
382
                true,
58✔
383
                reply_tx,
58✔
384
                minimized,
58✔
385
                requester.to_string(),
58✔
386
            ))
58✔
387
            .await
58✔
388
            .inspect_err(|e| {
58✔
UNCOV
389
                info!(
×
390
                    target: LOG_TARGET,
×
391
                    "Failed to send Disconnect request to peer `{}`: {}",
×
392
                    self.peer_node_id,
393
                    e
394
                );
UNCOV
395
            });
×
396
        reply_rx
58✔
397
            .await
58✔
398
            .map_err(|_| PeerConnectionError::InternalReplyCancelled)?
56✔
399
    }
56✔
400

401
    /// Forcefully disconnect all RPC clients when any clone is dropped - if not set (the default behaviour) all RPC
402
    /// clients will be disconnected when the last instance is dropped. i.e. when `self.handle_counter == 1`
403
    pub fn set_force_disconnect_rpc_clients_when_clone_drops(&mut self) {
1✔
404
        self.force_disconnect_rpc_clients_when_clone_drops
1✔
405
            .store(true, Ordering::Relaxed);
1✔
406
    }
1✔
407
}
408

409
impl Drop for PeerConnection {
410
    fn drop(&mut self) {
1,148✔
411
        if self.handle_count() <= 1 ||
1,148✔
412
            self.force_disconnect_rpc_clients_when_clone_drops
885✔
413
                .load(Ordering::Relaxed)
885✔
414
        {
415
            let number_of_rpc_clients = self.rpc_session_count();
264✔
416
            if number_of_rpc_clients > 0 {
264✔
417
                self.drop_notifier.broadcast(self.peer_node_id.clone());
8✔
418
                trace!(
8✔
419
                    target: LOG_TARGET,
×
420
                    "PeerConnection `{}` drop called, open sub-streams: {}, notified {} RPC clients to drop connection",
×
421
                    self.peer_node_id.clone(), self.substream_count(), number_of_rpc_clients,
×
422
                );
423
            } else {
424
                trace!(
256✔
425
                    target: LOG_TARGET,
×
426
                    "PeerConnection `{}` drop called, open sub-streams: {}, RPC clients: {}",
×
427
                    self.peer_node_id, self.substream_count(), number_of_rpc_clients
×
428
                );
429
            }
430
        }
884✔
431
    }
1,148✔
432
}
433

434
impl fmt::Display for PeerConnection {
435
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1✔
436
        write!(
1✔
437
            f,
1✔
438
            "Id: {}, Node ID: {}, Direction: {}, Peer Address: {}, Age: {:.0?}, #Substreams: {}, #RPC sessions: {}, \
1✔
439
             #Refs: {}",
1✔
440
            self.id,
441
            self.peer_node_id.short_str(),
1✔
442
            self.direction,
443
            self.address,
444
            self.age(),
1✔
445
            self.substream_count(),
1✔
446
            self.rpc_session_count(),
1✔
447
            self.handle_count()
1✔
448
        )
449
    }
1✔
450
}
451

452
impl PartialEq for PeerConnection {
453
    fn eq(&self, other: &Self) -> bool {
10✔
454
        self.id == other.id
10✔
455
    }
10✔
456
}
457

458
/// Actor for an active connection to a peer.
459
struct PeerConnectionActor {
460
    id: ConnectionId,
461
    peer_node_id: NodeId,
462
    request_rx: mpsc::Receiver<PeerConnectionRequest>,
463
    direction: ConnectionDirection,
464
    incoming_substreams: IncomingSubstreams,
465
    control: Control,
466
    event_notifier: mpsc::Sender<ConnectionManagerEvent>,
467
    our_supported_protocols: Arc<Vec<ProtocolId>>,
468
    inbound_protocol_negotiations:
469
        FuturesUnordered<BoxFuture<'static, Result<(ProtocolId, Substream), PeerConnectionError>>>,
470
    their_supported_protocols: Vec<ProtocolId>,
471
}
472

473
impl PeerConnectionActor {
474
    fn new(
116✔
475
        id: ConnectionId,
116✔
476
        peer_node_id: NodeId,
116✔
477
        direction: ConnectionDirection,
116✔
478
        connection: Yamux,
116✔
479
        request_rx: mpsc::Receiver<PeerConnectionRequest>,
116✔
480
        event_notifier: mpsc::Sender<ConnectionManagerEvent>,
116✔
481
        our_supported_protocols: Arc<Vec<ProtocolId>>,
116✔
482
        their_supported_protocols: Vec<ProtocolId>,
116✔
483
    ) -> Self {
116✔
484
        Self {
116✔
485
            id,
116✔
486
            peer_node_id,
116✔
487
            direction,
116✔
488
            control: connection.get_yamux_control(),
116✔
489
            incoming_substreams: connection.into_incoming(),
116✔
490
            request_rx,
116✔
491
            event_notifier,
116✔
492
            our_supported_protocols,
116✔
493
            inbound_protocol_negotiations: FuturesUnordered::new(),
116✔
494
            their_supported_protocols,
116✔
495
        }
116✔
496
    }
116✔
497

498
    pub async fn run(mut self) {
116✔
499
        loop {
500
            tokio::select! {
541✔
501
                maybe_request = self.request_rx.recv() => {
541✔
502
                    match maybe_request {
223✔
503
                        Some(request) => self.handle_request(request).await,
185✔
504
                        None => {
505
                            debug!(target: LOG_TARGET, "[{self}] All peer connection handles dropped closing the connection");
38✔
506
                            break;
38✔
507
                        }
508
                    }
509
                },
510

511
                maybe_substream = self.incoming_substreams.next() => {
541✔
512
                    match maybe_substream {
171✔
513
                        Some(substream) => self.handle_incoming_substream(substream).await,
121✔
514
                        None => {
515
                            debug!(target: LOG_TARGET, "[{}] Peer '{}' closed the connection", self, self.peer_node_id.short_str());
50✔
516
                            break;
50✔
517
                        },
518
                    }
519
                },
520

521
                Some(result) = self.inbound_protocol_negotiations.next() => {
541✔
522
                    self.handle_inbound_protocol_negotiation_result(result).await;
121✔
523
                }
524
            }
525
        }
526

527
        if let Err(err) = self.disconnect(false, Minimized::No, "PeerConnectionActor exit").await {
88✔
528
            warn!(
×
529
                target: LOG_TARGET,
×
530
                "[{}] Failed to politely close connection to peer '{}' because '{}'",
×
531
                self,
532
                self.peer_node_id.short_str(),
×
533
                err
534
            );
535
        }
88✔
536
    }
88✔
537

538
    async fn handle_request(&mut self, request: PeerConnectionRequest) {
185✔
539
        use PeerConnectionRequest::{Disconnect, OpenSubstream};
540
        match request {
185✔
541
            OpenSubstream { protocol_id, reply_tx } => {
121✔
542
                let tracing_id = tracing::Span::current().id();
121✔
543
                let span = span!(Level::TRACE, "handle_request");
121✔
544
                span.follows_from(tracing_id);
121✔
545
                let result = self.open_negotiated_protocol_stream(protocol_id).instrument(span).await;
121✔
546
                log_if_error_fmt!(
×
547
                    target: LOG_TARGET,
×
548
                    reply_tx.send(result),
121✔
549
                    "Reply oneshot closed when sending reply",
×
550
                );
551
            },
552
            Disconnect(silent, reply_tx, minimized, requester) => {
64✔
553
                debug!(
64✔
554
                    target: LOG_TARGET,
×
555
                    "[{}] Disconnect{}requested for {} connection to peer '{}', requester: '{}'",
×
556
                    self,
557
                    if silent { " (silent) " } else { " " },
×
558
                    self.direction,
559
                    self.peer_node_id.short_str(),
×
560
                    requester,
561
                );
562
                let _result = reply_tx.send(self.disconnect(silent, minimized, &requester).await);
64✔
563
            },
564
        }
565
    }
183✔
566

567
    async fn handle_incoming_substream(&mut self, mut stream: Substream) {
121✔
568
        let our_supported_protocols = self.our_supported_protocols.clone();
121✔
569
        self.inbound_protocol_negotiations.push(Box::pin(async move {
121✔
570
            let mut protocol_negotiation = ProtocolNegotiation::new(&mut stream);
121✔
571

572
            let selected_protocol = time::timeout(
121✔
573
                PROTOCOL_NEGOTIATION_TIMEOUT,
121✔
574
                protocol_negotiation.negotiate_protocol_inbound(&our_supported_protocols),
121✔
575
            )
121✔
576
            .await
121✔
577
            .map_err(|_| PeerConnectionError::ProtocolNegotiationTimeout)??;
121✔
578
            Ok((selected_protocol, stream))
120✔
579
        }));
121✔
580
    }
121✔
581

582
    async fn handle_inbound_protocol_negotiation_result(
121✔
583
        &mut self,
121✔
584
        result: Result<(ProtocolId, Substream), PeerConnectionError>,
121✔
585
    ) {
121✔
586
        match result {
1✔
587
            Ok((selected_protocol, stream)) => {
120✔
588
                self.notify_event(ConnectionManagerEvent::NewInboundSubstream(
120✔
589
                    self.peer_node_id.clone(),
120✔
590
                    selected_protocol,
120✔
591
                    stream,
120✔
592
                ))
120✔
593
                .await;
120✔
594
            },
595
            Err(PeerConnectionError::ProtocolError(err)) if err.is_ban_offence() => {
1✔
596
                error!(
×
597
                    target: LOG_TARGET,
×
598
                    "[{}] PEER VIOLATION: Incoming substream for peer '{}' failed to open because '{}'",
×
599
                    self,
600
                    self.peer_node_id.short_str(),
×
601
                    err
602
                );
603

604
                self.notify_event(ConnectionManagerEvent::PeerViolation {
×
605
                    peer_node_id: self.peer_node_id.clone(),
×
606
                    details: err.to_string(),
×
607
                })
×
608
                .await;
×
609
            },
610
            Err(err) => {
1✔
611
                error!(
1✔
612
                    target: LOG_TARGET,
×
613
                    "[{}] Incoming substream for peer '{}' failed to open because '{error}'",
×
614
                    self,
615
                    self.peer_node_id.short_str(),
×
616
                    error = err
617
                );
618
            },
619
        }
620
    }
121✔
621

622
    async fn open_negotiated_protocol_stream(
121✔
623
        &mut self,
121✔
624
        protocol: ProtocolId,
121✔
625
    ) -> Result<NegotiatedSubstream<Substream>, PeerConnectionError> {
121✔
626
        debug!(
121✔
627
            target: LOG_TARGET,
×
628
            "[{}] Negotiating protocol '{}' on new substream for peer '{}'",
×
629
            self,
630
            String::from_utf8_lossy(&protocol),
×
631
            self.peer_node_id.short_str()
×
632
        );
633
        let mut stream = self.control.open_stream().await?;
121✔
634

635
        let mut negotiation = ProtocolNegotiation::new(&mut stream);
121✔
636

637
        let selected_protocol = if self.their_supported_protocols.contains(&protocol) {
121✔
638
            let fut = negotiation.negotiate_protocol_outbound_optimistic(&protocol);
120✔
639
            time::timeout(PROTOCOL_NEGOTIATION_TIMEOUT, fut).await??
120✔
640
        } else {
641
            let selected_protocols = [protocol];
1✔
642
            let fut = negotiation.negotiate_protocol_outbound(&selected_protocols);
1✔
643
            time::timeout(PROTOCOL_NEGOTIATION_TIMEOUT, fut).await??
1✔
644
        };
645

646
        Ok(NegotiatedSubstream::new(selected_protocol, stream))
120✔
647
    }
121✔
648

649
    async fn notify_event(&mut self, event: ConnectionManagerEvent) {
214✔
650
        let _result = self.event_notifier.send(event).await;
214✔
651
    }
214✔
652

653
    /// Disconnect this peer connection.
654
    ///
655
    /// # Arguments
656
    ///
657
    /// silent - true to suppress the PeerDisconnected event, false to publish the event
658
    async fn disconnect(
152✔
659
        &mut self,
152✔
660
        silent: bool,
152✔
661
        minimized: Minimized,
152✔
662
        requester: &str,
152✔
663
    ) -> Result<(), PeerConnectionError> {
152✔
664
        self.request_rx.close();
152✔
665

666
        // Only emit closed event once
667
        if let Err(e) = self.control.close().await {
152✔
668
            match e {
88✔
669
                YamuxControlError::ConnectionClosed => {
670
                    trace!(
×
671
                        target: LOG_TARGET,
×
672
                        "On disconnect: (Peer = {}) Connection already closed ({})",
×
673
                        self.peer_node_id.short_str(),
×
674
                        e
675
                    );
676
                },
677
                e => trace!(target: LOG_TARGET, "On disconnect: ({e})"),
88✔
678
            }
679
        }
62✔
680

681
        if !silent {
150✔
682
            self.notify_event(ConnectionManagerEvent::PeerDisconnected(
94✔
683
                self.id,
94✔
684
                self.peer_node_id.clone(),
94✔
685
                minimized,
94✔
686
            ))
94✔
687
            .await;
94✔
688
        }
56✔
689
        trace!(
150✔
690
            target: LOG_TARGET,
×
691
            "(Peer = {}) Connection closed, requester: '{}'",
×
692
            self.peer_node_id.short_str(), requester
×
693
        );
694

695
        Ok(())
150✔
696
    }
150✔
697
}
698

699
impl fmt::Display for PeerConnectionActor {
700
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
701
        write!(
×
702
            f,
×
703
            "PeerConnection(id={}, peer_node_id={}, direction={})",
×
704
            self.id,
705
            self.peer_node_id.short_str(),
×
706
            self.direction,
707
        )
708
    }
×
709
}
710

711
/// Contains the substream and the ProtocolId that was successfully negotiated.
712
pub struct NegotiatedSubstream<TSubstream> {
713
    pub protocol: ProtocolId,
714
    pub stream: TSubstream,
715
}
716

717
impl<TSubstream> NegotiatedSubstream<TSubstream> {
718
    pub fn new(protocol: ProtocolId, stream: TSubstream) -> Self {
120✔
719
        Self { protocol, stream }
120✔
720
    }
120✔
721
}
722

723
impl<TSubstream> fmt::Debug for NegotiatedSubstream<TSubstream> {
724
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
725
        f.debug_struct("NegotiatedSubstream")
×
726
            .field("protocol", &format!("{:?}", self.protocol))
×
727
            .field("stream", &"...".to_string())
×
728
            .finish()
×
729
    }
×
730
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc