• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

butlergroup / rust-libp2p / 18606178523

17 Oct 2025 10:19PM UTC coverage: 75.887% (+8.1%) from 67.783%
18606178523

push

github

butlergroup
	modified:   .github/workflows/ci.yml

40841 of 53818 relevant lines covered (75.89%)

43009.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.28
/protocols/relay/src/behaviour/handler.rs
1
// Copyright 2021 Protocol Labs.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a
4
// copy of this software and associated documentation files (the "Software"),
5
// to deal in the Software without restriction, including without limitation
6
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7
// and/or sell copies of the Software, and to permit persons to whom the
8
// Software is furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19
// DEALINGS IN THE SOFTWARE.
20

21
use std::{
22
    collections::{HashMap, VecDeque},
23
    fmt, io,
24
    task::{Context, Poll},
25
    time::Duration,
26
};
27

28
use bytes::Bytes;
29
use either::Either;
30
use futures::{
31
    future::{BoxFuture, FutureExt, TryFutureExt},
32
    io::AsyncWriteExt,
33
    stream::{FuturesUnordered, StreamExt},
34
};
35
use futures_timer::Delay;
36
use libp2p_core::{upgrade::ReadyUpgrade, ConnectedPoint, Multiaddr};
37
use libp2p_identity::PeerId;
38
use libp2p_swarm::{
39
    handler::{ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound},
40
    ConnectionHandler, ConnectionHandlerEvent, ConnectionId, Stream, StreamProtocol,
41
    StreamUpgradeError, SubstreamProtocol,
42
};
43
use web_time::Instant;
44

45
use crate::{
46
    behaviour::CircuitId,
47
    copy_future::CopyFuture,
48
    proto,
49
    protocol::{inbound_hop, outbound_stop},
50
    HOP_PROTOCOL_NAME, STOP_PROTOCOL_NAME,
51
};
52

53
const MAX_CONCURRENT_STREAMS_PER_CONNECTION: usize = 10;
54
const STREAM_TIMEOUT: Duration = Duration::from_secs(60);
55

56
#[derive(Debug, Clone)]
57
pub struct Config {
58
    pub reservation_duration: Duration,
59
    pub max_circuit_duration: Duration,
60
    pub max_circuit_bytes: u64,
61
}
62

63
pub enum In {
64
    AcceptReservationReq {
65
        inbound_reservation_req: inbound_hop::ReservationReq,
66
        addrs: Vec<Multiaddr>,
67
    },
68
    DenyReservationReq {
69
        inbound_reservation_req: inbound_hop::ReservationReq,
70
        status: proto::Status,
71
    },
72
    DenyCircuitReq {
73
        circuit_id: Option<CircuitId>,
74
        inbound_circuit_req: inbound_hop::CircuitReq,
75
        status: proto::Status,
76
    },
77
    NegotiateOutboundConnect {
78
        circuit_id: CircuitId,
79
        inbound_circuit_req: inbound_hop::CircuitReq,
80
        src_peer_id: PeerId,
81
        src_connection_id: ConnectionId,
82
    },
83
    AcceptAndDriveCircuit {
84
        circuit_id: CircuitId,
85
        dst_peer_id: PeerId,
86
        inbound_circuit_req: inbound_hop::CircuitReq,
87
        dst_stream: Stream,
88
        dst_pending_data: Bytes,
89
    },
90
}
91

92
impl fmt::Debug for In {
93
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
94
        match self {
×
95
            In::AcceptReservationReq {
96
                inbound_reservation_req: _,
97
                addrs,
×
98
            } => f
×
99
                .debug_struct("In::AcceptReservationReq")
×
100
                .field("addrs", addrs)
×
101
                .finish(),
×
102
            In::DenyReservationReq {
103
                inbound_reservation_req: _,
104
                status,
×
105
            } => f
×
106
                .debug_struct("In::DenyReservationReq")
×
107
                .field("status", status)
×
108
                .finish(),
×
109
            In::DenyCircuitReq {
110
                circuit_id,
×
111
                inbound_circuit_req: _,
112
                status,
×
113
            } => f
×
114
                .debug_struct("In::DenyCircuitReq")
×
115
                .field("circuit_id", circuit_id)
×
116
                .field("status", status)
×
117
                .finish(),
×
118
            In::NegotiateOutboundConnect {
119
                circuit_id,
×
120
                inbound_circuit_req: _,
121
                src_peer_id,
×
122
                src_connection_id,
×
123
            } => f
×
124
                .debug_struct("In::NegotiateOutboundConnect")
×
125
                .field("circuit_id", circuit_id)
×
126
                .field("src_peer_id", src_peer_id)
×
127
                .field("src_connection_id", src_connection_id)
×
128
                .finish(),
×
129
            In::AcceptAndDriveCircuit {
130
                circuit_id,
×
131
                inbound_circuit_req: _,
132
                dst_peer_id,
×
133
                dst_stream: _,
134
                dst_pending_data: _,
135
            } => f
×
136
                .debug_struct("In::AcceptAndDriveCircuit")
×
137
                .field("circuit_id", circuit_id)
×
138
                .field("dst_peer_id", dst_peer_id)
×
139
                .finish(),
×
140
        }
141
    }
×
142
}
143

144
/// The events produced by the [`Handler`].
145
#[allow(clippy::large_enum_variant)]
146
pub enum Event {
147
    /// An inbound reservation request has been received.
148
    ReservationReqReceived {
149
        inbound_reservation_req: inbound_hop::ReservationReq,
150
        endpoint: ConnectedPoint,
151
        /// Indicates whether the request replaces an existing reservation.
152
        renewed: bool,
153
    },
154
    /// An inbound reservation request has been accepted.
155
    ReservationReqAccepted {
156
        /// Indicates whether the request replaces an existing reservation.
157
        renewed: bool,
158
    },
159
    /// Accepting an inbound reservation request failed.
160
    ReservationReqAcceptFailed { error: inbound_hop::Error },
161
    /// An inbound reservation request has been denied.
162
    ReservationReqDenied { status: proto::Status },
163
    /// Denying an inbound reservation request has failed.
164
    ReservationReqDenyFailed { error: inbound_hop::Error },
165
    /// An inbound reservation has timed out.
166
    ReservationTimedOut {},
167
    /// An inbound circuit request has been received.
168
    CircuitReqReceived {
169
        inbound_circuit_req: inbound_hop::CircuitReq,
170
        endpoint: ConnectedPoint,
171
    },
172
    /// An inbound circuit request has been denied.
173
    CircuitReqDenied {
174
        circuit_id: Option<CircuitId>,
175
        dst_peer_id: PeerId,
176
        status: proto::Status,
177
    },
178
    /// Denying an inbound circuit request failed.
179
    CircuitReqDenyFailed {
180
        circuit_id: Option<CircuitId>,
181
        dst_peer_id: PeerId,
182
        error: inbound_hop::Error,
183
    },
184
    /// An inbound circuit request has been accepted.
185
    CircuitReqAccepted {
186
        circuit_id: CircuitId,
187
        dst_peer_id: PeerId,
188
    },
189
    /// Accepting an inbound circuit request failed.
190
    CircuitReqAcceptFailed {
191
        circuit_id: CircuitId,
192
        dst_peer_id: PeerId,
193
        error: inbound_hop::Error,
194
    },
195
    /// An outbound substream for an inbound circuit request has been
196
    /// negotiated.
197
    OutboundConnectNegotiated {
198
        circuit_id: CircuitId,
199
        src_peer_id: PeerId,
200
        src_connection_id: ConnectionId,
201
        inbound_circuit_req: inbound_hop::CircuitReq,
202
        dst_stream: Stream,
203
        dst_pending_data: Bytes,
204
    },
205
    /// Negotiating an outbound substream for an inbound circuit request failed.
206
    OutboundConnectNegotiationFailed {
207
        circuit_id: CircuitId,
208
        src_peer_id: PeerId,
209
        src_connection_id: ConnectionId,
210
        inbound_circuit_req: inbound_hop::CircuitReq,
211
        status: proto::Status,
212
        error: outbound_stop::Error,
213
    },
214
    /// An inbound circuit has closed.
215
    CircuitClosed {
216
        circuit_id: CircuitId,
217
        dst_peer_id: PeerId,
218
        error: Option<std::io::Error>,
219
    },
220
}
221

222
impl fmt::Debug for Event {
223
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
224
        match self {
×
225
            Event::ReservationReqReceived {
226
                inbound_reservation_req: _,
227
                endpoint,
×
228
                renewed,
×
229
            } => f
×
230
                .debug_struct("Event::ReservationReqReceived")
×
231
                .field("endpoint", endpoint)
×
232
                .field("renewed", renewed)
×
233
                .finish(),
×
234
            Event::ReservationReqAccepted { renewed } => f
×
235
                .debug_struct("Event::ReservationReqAccepted")
×
236
                .field("renewed", renewed)
×
237
                .finish(),
×
238
            Event::ReservationReqAcceptFailed { error } => f
×
239
                .debug_struct("Event::ReservationReqAcceptFailed")
×
240
                .field("error", error)
×
241
                .finish(),
×
242
            Event::ReservationReqDenied { status } => f
×
243
                .debug_struct("Event::ReservationReqDenied")
×
244
                .field("status", status)
×
245
                .finish(),
×
246
            Event::ReservationReqDenyFailed { error } => f
×
247
                .debug_struct("Event::ReservationReqDenyFailed")
×
248
                .field("error", error)
×
249
                .finish(),
×
250
            Event::ReservationTimedOut {} => f.debug_struct("Event::ReservationTimedOut").finish(),
×
251
            Event::CircuitReqReceived {
252
                endpoint,
×
253
                inbound_circuit_req: _,
254
            } => f
×
255
                .debug_struct("Event::CircuitReqReceived")
×
256
                .field("endpoint", endpoint)
×
257
                .finish(),
×
258
            Event::CircuitReqDenied {
259
                circuit_id,
×
260
                dst_peer_id,
×
261
                status,
×
262
            } => f
×
263
                .debug_struct("Event::CircuitReqDenied")
×
264
                .field("circuit_id", circuit_id)
×
265
                .field("dst_peer_id", dst_peer_id)
×
266
                .field("status", status)
×
267
                .finish(),
×
268
            Event::CircuitReqDenyFailed {
269
                circuit_id,
×
270
                dst_peer_id,
×
271
                error,
×
272
            } => f
×
273
                .debug_struct("Event::CircuitReqDenyFailed")
×
274
                .field("circuit_id", circuit_id)
×
275
                .field("dst_peer_id", dst_peer_id)
×
276
                .field("error", error)
×
277
                .finish(),
×
278
            Event::CircuitReqAccepted {
279
                circuit_id,
×
280
                dst_peer_id,
×
281
            } => f
×
282
                .debug_struct("Event::CircuitReqAccepted")
×
283
                .field("circuit_id", circuit_id)
×
284
                .field("dst_peer_id", dst_peer_id)
×
285
                .finish(),
×
286
            Event::CircuitReqAcceptFailed {
287
                circuit_id,
×
288
                dst_peer_id,
×
289
                error,
×
290
            } => f
×
291
                .debug_struct("Event::CircuitReqAcceptFailed")
×
292
                .field("circuit_id", circuit_id)
×
293
                .field("dst_peer_id", dst_peer_id)
×
294
                .field("error", error)
×
295
                .finish(),
×
296
            Event::OutboundConnectNegotiated {
297
                circuit_id,
×
298
                src_peer_id,
×
299
                src_connection_id,
×
300
                inbound_circuit_req: _,
301
                dst_stream: _,
302
                dst_pending_data: _,
303
            } => f
×
304
                .debug_struct("Event::OutboundConnectNegotiated")
×
305
                .field("circuit_id", circuit_id)
×
306
                .field("src_peer_id", src_peer_id)
×
307
                .field("src_connection_id", src_connection_id)
×
308
                .finish(),
×
309
            Event::OutboundConnectNegotiationFailed {
310
                circuit_id,
×
311
                src_peer_id,
×
312
                src_connection_id,
×
313
                inbound_circuit_req: _,
314
                status,
×
315
                error,
×
316
            } => f
×
317
                .debug_struct("Event::OutboundConnectNegotiationFailed")
×
318
                .field("circuit_id", circuit_id)
×
319
                .field("src_peer_id", src_peer_id)
×
320
                .field("src_connection_id", src_connection_id)
×
321
                .field("status", status)
×
322
                .field("error", error)
×
323
                .finish(),
×
324
            Event::CircuitClosed {
325
                circuit_id,
×
326
                dst_peer_id,
×
327
                error,
×
328
            } => f
×
329
                .debug_struct("Event::CircuitClosed")
×
330
                .field("circuit_id", circuit_id)
×
331
                .field("dst_peer_id", dst_peer_id)
×
332
                .field("error", error)
×
333
                .finish(),
×
334
        }
335
    }
×
336
}
337

338
/// [`ConnectionHandler`] that manages substreams for a relay on a single
339
/// connection with a peer.
340
pub struct Handler {
341
    endpoint: ConnectedPoint,
342

343
    /// Static [`Handler`] [`Config`].
344
    config: Config,
345

346
    /// Queue of events to return when polled.
347
    queued_events: VecDeque<
348
        ConnectionHandlerEvent<
349
            <Self as ConnectionHandler>::OutboundProtocol,
350
            (),
351
            <Self as ConnectionHandler>::ToBehaviour,
352
        >,
353
    >,
354

355
    /// The point in time when this connection started idleing.
356
    idle_at: Option<Instant>,
357

358
    /// Future handling inbound reservation request.
359
    reservation_request_future: Option<ReservationRequestFuture>,
360
    /// Timeout for the currently active reservation.
361
    active_reservation: Option<Delay>,
362

363
    /// Futures accepting an inbound circuit request.
364
    circuit_accept_futures: Futures<Result<CircuitParts, (CircuitId, PeerId, inbound_hop::Error)>>,
365
    /// Futures denying an inbound circuit request.
366
    circuit_deny_futures: Futures<(
367
        Option<CircuitId>,
368
        PeerId,
369
        proto::Status,
370
        Result<(), inbound_hop::Error>,
371
    )>,
372
    /// Futures relaying data for circuit between two peers.
373
    circuits: Futures<(CircuitId, PeerId, Result<(), std::io::Error>)>,
374

375
    /// We issue a stream upgrade for each [`PendingConnect`] request.
376
    pending_connect_requests: VecDeque<PendingConnect>,
377

378
    /// A `CONNECT` request is in flight for these circuits.
379
    active_connect_requests: HashMap<CircuitId, PendingConnect>,
380

381
    inbound_workers: futures_bounded::FuturesSet<
382
        Result<Either<inbound_hop::ReservationReq, inbound_hop::CircuitReq>, inbound_hop::Error>,
383
    >,
384
    outbound_workers: futures_bounded::FuturesMap<
385
        CircuitId,
386
        Result<outbound_stop::Circuit, outbound_stop::Error>,
387
    >,
388
}
389

390
impl Handler {
391
    pub fn new(config: Config, endpoint: ConnectedPoint) -> Handler {
27✔
392
        Handler {
27✔
393
            inbound_workers: futures_bounded::FuturesSet::new(
27✔
394
                STREAM_TIMEOUT,
27✔
395
                MAX_CONCURRENT_STREAMS_PER_CONNECTION,
27✔
396
            ),
27✔
397
            outbound_workers: futures_bounded::FuturesMap::new(
27✔
398
                STREAM_TIMEOUT,
27✔
399
                MAX_CONCURRENT_STREAMS_PER_CONNECTION,
27✔
400
            ),
27✔
401
            endpoint,
27✔
402
            config,
27✔
403
            queued_events: Default::default(),
27✔
404
            idle_at: None,
27✔
405
            reservation_request_future: Default::default(),
27✔
406
            circuit_accept_futures: Default::default(),
27✔
407
            circuit_deny_futures: Default::default(),
27✔
408
            circuits: Default::default(),
27✔
409
            active_reservation: Default::default(),
27✔
410
            pending_connect_requests: Default::default(),
27✔
411
            active_connect_requests: Default::default(),
27✔
412
        }
27✔
413
    }
27✔
414

415
    fn on_fully_negotiated_inbound(&mut self, stream: Stream) {
33✔
416
        if self
33✔
417
            .inbound_workers
33✔
418
            .try_push(inbound_hop::handle_inbound_request(
33✔
419
                stream,
33✔
420
                self.config.reservation_duration,
33✔
421
                self.config.max_circuit_duration,
33✔
422
                self.config.max_circuit_bytes,
33✔
423
            ))
33✔
424
            .is_err()
33✔
425
        {
426
            tracing::warn!("Dropping inbound stream because we are at capacity")
×
427
        }
33✔
428
    }
33✔
429

430
    fn on_fully_negotiated_outbound(&mut self, stream: Stream) {
6✔
431
        let connect = self
6✔
432
            .pending_connect_requests
6✔
433
            .pop_front()
6✔
434
            .expect("opened a stream without a pending stop command");
6✔
435

436
        if self
6✔
437
            .outbound_workers
6✔
438
            .try_push(
6✔
439
                connect.circuit_id,
6✔
440
                outbound_stop::connect(
6✔
441
                    stream,
6✔
442
                    connect.src_peer_id,
6✔
443
                    connect.max_circuit_duration,
6✔
444
                    connect.max_circuit_bytes,
6✔
445
                ),
6✔
446
            )
6✔
447
            .is_err()
6✔
448
        {
449
            tracing::warn!("Dropping outbound stream because we are at capacity")
×
450
        }
6✔
451

452
        self.active_connect_requests
6✔
453
            .insert(connect.circuit_id, connect);
6✔
454
    }
6✔
455

456
    fn on_dial_upgrade_error(
×
457
        &mut self,
×
458
        DialUpgradeError { error, .. }: DialUpgradeError<
×
459
            (),
×
460
            <Self as ConnectionHandler>::OutboundProtocol,
×
461
        >,
×
462
    ) {
×
463
        let error = match error {
×
464
            StreamUpgradeError::Timeout => outbound_stop::Error::Io(io::ErrorKind::TimedOut.into()),
×
465
            StreamUpgradeError::NegotiationFailed => outbound_stop::Error::Unsupported,
×
466
            StreamUpgradeError::Io(e) => outbound_stop::Error::Io(e),
×
467
            StreamUpgradeError::Apply(v) => libp2p_core::util::unreachable(v),
×
468
        };
469

470
        let stop_command = self
×
471
            .pending_connect_requests
×
472
            .pop_front()
×
473
            .expect("failed to open a stream without a pending stop command");
×
474

475
        self.queued_events
×
476
            .push_back(ConnectionHandlerEvent::NotifyBehaviour(
×
477
                Event::OutboundConnectNegotiationFailed {
×
478
                    circuit_id: stop_command.circuit_id,
×
479
                    src_peer_id: stop_command.src_peer_id,
×
480
                    src_connection_id: stop_command.src_connection_id,
×
481
                    inbound_circuit_req: stop_command.inbound_circuit_req,
×
482
                    status: proto::Status::CONNECTION_FAILED,
×
483
                    error,
×
484
                },
×
485
            ));
×
486
    }
×
487
}
488

489
enum ReservationRequestFuture {
490
    Accepting(BoxFuture<'static, Result<(), inbound_hop::Error>>),
491
    Denying(BoxFuture<'static, (proto::Status, Result<(), inbound_hop::Error>)>),
492
}
493

494
type Futures<T> = FuturesUnordered<BoxFuture<'static, T>>;
495

496
impl ConnectionHandler for Handler {
497
    type FromBehaviour = In;
498
    type ToBehaviour = Event;
499
    type InboundProtocol = ReadyUpgrade<StreamProtocol>;
500
    type InboundOpenInfo = ();
501
    type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
502
    type OutboundOpenInfo = ();
503

504
    fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
589✔
505
        SubstreamProtocol::new(ReadyUpgrade::new(HOP_PROTOCOL_NAME), ())
589✔
506
    }
589✔
507

508
    fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
39✔
509
        match event {
39✔
510
            In::AcceptReservationReq {
511
                inbound_reservation_req,
21✔
512
                addrs,
21✔
513
            } => {
514
                if self
21✔
515
                    .reservation_request_future
21✔
516
                    .replace(ReservationRequestFuture::Accepting(
21✔
517
                        inbound_reservation_req.accept(addrs).err_into().boxed(),
21✔
518
                    ))
21✔
519
                    .is_some()
21✔
520
                {
521
                    tracing::warn!("Dropping existing deny/accept future in favor of new one")
×
522
                }
21✔
523
            }
524
            In::DenyReservationReq {
525
                inbound_reservation_req,
3✔
526
                status,
3✔
527
            } => {
528
                if self
3✔
529
                    .reservation_request_future
3✔
530
                    .replace(ReservationRequestFuture::Denying(
3✔
531
                        inbound_reservation_req
3✔
532
                            .deny(status)
3✔
533
                            .err_into()
3✔
534
                            .map(move |result| (status, result))
3✔
535
                            .boxed(),
3✔
536
                    ))
537
                    .is_some()
3✔
538
                {
539
                    tracing::warn!("Dropping existing deny/accept future in favor of new one")
×
540
                }
3✔
541
            }
542
            In::NegotiateOutboundConnect {
543
                circuit_id,
6✔
544
                inbound_circuit_req,
6✔
545
                src_peer_id,
6✔
546
                src_connection_id,
6✔
547
            } => {
6✔
548
                self.pending_connect_requests.push_back(PendingConnect::new(
6✔
549
                    circuit_id,
6✔
550
                    inbound_circuit_req,
6✔
551
                    src_peer_id,
6✔
552
                    src_connection_id,
6✔
553
                    &self.config,
6✔
554
                ));
6✔
555
                self.queued_events
6✔
556
                    .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
6✔
557
                        protocol: SubstreamProtocol::new(ReadyUpgrade::new(STOP_PROTOCOL_NAME), ()),
6✔
558
                    });
6✔
559
            }
6✔
560
            In::DenyCircuitReq {
561
                circuit_id,
3✔
562
                inbound_circuit_req,
3✔
563
                status,
3✔
564
            } => {
565
                let dst_peer_id = inbound_circuit_req.dst();
3✔
566
                self.circuit_deny_futures.push(
3✔
567
                    inbound_circuit_req
3✔
568
                        .deny(status)
3✔
569
                        .err_into()
3✔
570
                        .map(move |result| (circuit_id, dst_peer_id, status, result))
3✔
571
                        .boxed(),
3✔
572
                );
573
            }
574
            In::AcceptAndDriveCircuit {
575
                circuit_id,
6✔
576
                dst_peer_id,
6✔
577
                inbound_circuit_req,
6✔
578
                dst_stream,
6✔
579
                dst_pending_data,
6✔
580
            } => {
581
                self.circuit_accept_futures.push(
6✔
582
                    inbound_circuit_req
6✔
583
                        .accept()
6✔
584
                        .err_into()
6✔
585
                        .map_ok(move |(src_stream, src_pending_data)| CircuitParts {
6✔
586
                            circuit_id,
6✔
587
                            src_stream,
6✔
588
                            src_pending_data,
6✔
589
                            dst_peer_id,
6✔
590
                            dst_stream,
6✔
591
                            dst_pending_data,
6✔
592
                        })
6✔
593
                        .map_err(move |e| (circuit_id, dst_peer_id, e))
6✔
594
                        .boxed(),
6✔
595
                );
596
            }
597
        }
598
    }
39✔
599

600
    fn connection_keep_alive(&self) -> bool {
107✔
601
        let Some(idle_at) = self.idle_at else {
107✔
602
            return true;
55✔
603
        };
604

605
        Instant::now().duration_since(idle_at) <= Duration::from_secs(10)
52✔
606
    }
107✔
607

608
    #[tracing::instrument(level = "trace", name = "ConnectionHandler::poll", skip(self, cx))]
609
    fn poll(
610
        &mut self,
611
        cx: &mut Context<'_>,
612
    ) -> Poll<ConnectionHandlerEvent<Self::OutboundProtocol, (), Self::ToBehaviour>> {
613
        // Return queued events.
614
        if let Some(event) = self.queued_events.pop_front() {
615
            return Poll::Ready(event);
616
        }
617

618
        // Progress existing circuits.
619
        if let Poll::Ready(Some((circuit_id, dst_peer_id, result))) =
620
            self.circuits.poll_next_unpin(cx)
621
        {
622
            match result {
623
                Ok(()) => {
624
                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
625
                        Event::CircuitClosed {
626
                            circuit_id,
627
                            dst_peer_id,
628
                            error: None,
629
                        },
630
                    ))
631
                }
632
                Err(e) => {
633
                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
634
                        Event::CircuitClosed {
635
                            circuit_id,
636
                            dst_peer_id,
637
                            error: Some(e),
638
                        },
639
                    ))
640
                }
641
            }
642
        }
643

644
        // Process inbound protocol workers
645
        loop {
646
            match self.inbound_workers.poll_unpin(cx) {
647
                Poll::Ready(Ok(Ok(Either::Left(inbound_reservation_req)))) => {
648
                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
649
                        Event::ReservationReqReceived {
650
                            inbound_reservation_req,
651
                            endpoint: self.endpoint.clone(),
652
                            renewed: self.active_reservation.is_some(),
653
                        },
654
                    ));
655
                }
656
                Poll::Ready(Ok(Ok(Either::Right(inbound_circuit_req)))) => {
657
                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
658
                        Event::CircuitReqReceived {
659
                            inbound_circuit_req,
660
                            endpoint: self.endpoint.clone(),
661
                        },
662
                    ));
663
                }
664
                Poll::Ready(Err(e)) => {
665
                    tracing::debug!("Inbound stream operation timed out: {e}");
666
                    continue;
667
                }
668
                Poll::Ready(Ok(Err(e))) => {
669
                    tracing::debug!("Inbound stream operation failed: {e}");
670
                    continue;
671
                }
672
                Poll::Pending => {
673
                    break;
674
                }
675
            }
676
        }
677

678
        // Process outbound protocol workers
679
        match self.outbound_workers.poll_unpin(cx) {
680
            Poll::Ready((id, Ok(Ok(circuit)))) => {
681
                let connect = self
682
                    .active_connect_requests
683
                    .remove(&id)
684
                    .expect("must have pending connect");
685

686
                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
687
                    Event::OutboundConnectNegotiated {
688
                        circuit_id: id,
689
                        src_peer_id: connect.src_peer_id,
690
                        src_connection_id: connect.src_connection_id,
691
                        inbound_circuit_req: connect.inbound_circuit_req,
692
                        dst_stream: circuit.dst_stream,
693
                        dst_pending_data: circuit.dst_pending_data,
694
                    },
695
                ));
696
            }
697
            Poll::Ready((id, Ok(Err(error)))) => {
698
                let connect = self
699
                    .active_connect_requests
700
                    .remove(&id)
701
                    .expect("must have pending connect");
702

703
                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
704
                    Event::OutboundConnectNegotiationFailed {
705
                        circuit_id: connect.circuit_id,
706
                        src_peer_id: connect.src_peer_id,
707
                        src_connection_id: connect.src_connection_id,
708
                        inbound_circuit_req: connect.inbound_circuit_req,
709
                        status: error.to_status(),
710
                        error,
711
                    },
712
                ));
713
            }
714
            Poll::Ready((id, Err(futures_bounded::Timeout { .. }))) => {
715
                let connect = self
716
                    .active_connect_requests
717
                    .remove(&id)
718
                    .expect("must have pending connect");
719

720
                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
721
                    Event::OutboundConnectNegotiationFailed {
722
                        circuit_id: connect.circuit_id,
723
                        src_peer_id: connect.src_peer_id,
724
                        src_connection_id: connect.src_connection_id,
725
                        inbound_circuit_req: connect.inbound_circuit_req,
726
                        status: proto::Status::CONNECTION_FAILED, // Best fit?
727
                        error: outbound_stop::Error::Io(io::ErrorKind::TimedOut.into()),
728
                    },
729
                ));
730
            }
731
            Poll::Pending => {}
732
        }
733

734
        // Deny new circuits.
735
        if let Poll::Ready(Some((circuit_id, dst_peer_id, status, result))) =
736
            self.circuit_deny_futures.poll_next_unpin(cx)
737
        {
738
            match result {
739
                Ok(()) => {
740
                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
741
                        Event::CircuitReqDenied {
742
                            circuit_id,
743
                            dst_peer_id,
744
                            status,
745
                        },
746
                    ));
747
                }
748
                Err(error) => {
749
                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
750
                        Event::CircuitReqDenyFailed {
751
                            circuit_id,
752
                            dst_peer_id,
753
                            error,
754
                        },
755
                    ));
756
                }
757
            }
758
        }
759

760
        // Accept new circuits.
761
        if let Poll::Ready(Some(result)) = self.circuit_accept_futures.poll_next_unpin(cx) {
762
            match result {
763
                Ok(parts) => {
764
                    let CircuitParts {
765
                        circuit_id,
766
                        mut src_stream,
767
                        src_pending_data,
768
                        dst_peer_id,
769
                        mut dst_stream,
770
                        dst_pending_data,
771
                    } = parts;
772
                    let max_circuit_duration = self.config.max_circuit_duration;
773
                    let max_circuit_bytes = self.config.max_circuit_bytes;
774

775
                    let circuit = async move {
6✔
776
                        let (result_1, result_2) = futures::future::join(
6✔
777
                            src_stream.write_all(&dst_pending_data),
6✔
778
                            dst_stream.write_all(&src_pending_data),
6✔
779
                        )
780
                        .await;
6✔
781
                        result_1?;
6✔
782
                        result_2?;
6✔
783

784
                        CopyFuture::new(
6✔
785
                            src_stream,
6✔
786
                            dst_stream,
6✔
787
                            max_circuit_duration,
6✔
788
                            max_circuit_bytes,
6✔
789
                        )
6✔
790
                        .await?;
6✔
791

792
                        Ok(())
×
793
                    }
×
794
                    .map(move |r| (circuit_id, dst_peer_id, r))
×
795
                    .boxed();
796

797
                    self.circuits.push(circuit);
798

799
                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
800
                        Event::CircuitReqAccepted {
801
                            circuit_id,
802
                            dst_peer_id,
803
                        },
804
                    ));
805
                }
806
                Err((circuit_id, dst_peer_id, error)) => {
807
                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
808
                        Event::CircuitReqAcceptFailed {
809
                            circuit_id,
810
                            dst_peer_id,
811
                            error,
812
                        },
813
                    ));
814
                }
815
            }
816
        }
817

818
        // Check active reservation.
819
        if let Some(Poll::Ready(())) = self
820
            .active_reservation
821
            .as_mut()
822
            .map(|fut| fut.poll_unpin(cx))
207✔
823
        {
824
            self.active_reservation = None;
825
            return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
826
                Event::ReservationTimedOut {},
827
            ));
828
        }
829

830
        // Progress reservation request.
831
        match self.reservation_request_future.as_mut() {
832
            Some(ReservationRequestFuture::Accepting(fut)) => {
833
                if let Poll::Ready(result) = fut.poll_unpin(cx) {
834
                    self.reservation_request_future = None;
835

836
                    match result {
837
                        Ok(()) => {
838
                            let renewed = self
839
                                .active_reservation
840
                                .replace(Delay::new(self.config.reservation_duration))
841
                                .is_some();
842
                            return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
843
                                Event::ReservationReqAccepted { renewed },
844
                            ));
845
                        }
846
                        Err(error) => {
847
                            return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
848
                                Event::ReservationReqAcceptFailed { error },
849
                            ));
850
                        }
851
                    }
852
                }
853
            }
854
            Some(ReservationRequestFuture::Denying(fut)) => {
855
                if let Poll::Ready((status, result)) = fut.poll_unpin(cx) {
856
                    self.reservation_request_future = None;
857

858
                    match result {
859
                        Ok(()) => {
860
                            return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
861
                                Event::ReservationReqDenied { status },
862
                            ))
863
                        }
864
                        Err(error) => {
865
                            return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
866
                                Event::ReservationReqDenyFailed { error },
867
                            ));
868
                        }
869
                    }
870
                }
871
            }
872
            None => {}
873
        }
874

875
        // Check keep alive status.
876
        if self.active_reservation.is_none() {
877
            if self.idle_at.is_none() {
878
                self.idle_at = Some(Instant::now());
879
            }
880
        } else {
881
            self.idle_at = None;
882
        }
883

884
        Poll::Pending
885
    }
886

887
    fn on_connection_event(
72✔
888
        &mut self,
72✔
889
        event: ConnectionEvent<Self::InboundProtocol, Self::OutboundProtocol>,
72✔
890
    ) {
72✔
891
        match event {
72✔
892
            ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
893
                protocol: stream,
33✔
894
                ..
895
            }) => {
33✔
896
                self.on_fully_negotiated_inbound(stream);
33✔
897
            }
33✔
898
            ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
899
                protocol: stream,
6✔
900
                ..
901
            }) => {
6✔
902
                self.on_fully_negotiated_outbound(stream);
6✔
903
            }
6✔
904
            ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
×
905
                self.on_dial_upgrade_error(dial_upgrade_error);
×
906
            }
×
907
            _ => {}
33✔
908
        }
909
    }
72✔
910
}
911

912
struct CircuitParts {
913
    circuit_id: CircuitId,
914
    src_stream: Stream,
915
    src_pending_data: Bytes,
916
    dst_peer_id: PeerId,
917
    dst_stream: Stream,
918
    dst_pending_data: Bytes,
919
}
920

921
/// Holds everything we know about a to-be-issued `CONNECT` request to a peer.
922
struct PendingConnect {
923
    circuit_id: CircuitId,
924
    inbound_circuit_req: inbound_hop::CircuitReq,
925
    src_peer_id: PeerId,
926
    src_connection_id: ConnectionId,
927
    max_circuit_duration: Duration,
928
    max_circuit_bytes: u64,
929
}
930

931
impl PendingConnect {
932
    fn new(
6✔
933
        circuit_id: CircuitId,
6✔
934
        inbound_circuit_req: inbound_hop::CircuitReq,
6✔
935
        src_peer_id: PeerId,
6✔
936
        src_connection_id: ConnectionId,
6✔
937
        config: &Config,
6✔
938
    ) -> Self {
6✔
939
        Self {
6✔
940
            circuit_id,
6✔
941
            inbound_circuit_req,
6✔
942
            src_peer_id,
6✔
943
            src_connection_id,
6✔
944
            max_circuit_duration: config.max_circuit_duration,
6✔
945
            max_circuit_bytes: config.max_circuit_bytes,
6✔
946
        }
6✔
947
    }
6✔
948
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc