• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

butlergroup / rust-libp2p / 18610913338

18 Oct 2025 04:41AM UTC coverage: 78.379% (+2.5%) from 75.842%
18610913338

push

github

butlergroup
	modified:   .github/workflows/ci.yml

36944 of 47135 relevant lines covered (78.38%)

37728.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.06
/protocols/relay/src/behaviour/handler.rs
1
// Copyright 2021 Protocol Labs.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a
4
// copy of this software and associated documentation files (the "Software"),
5
// to deal in the Software without restriction, including without limitation
6
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7
// and/or sell copies of the Software, and to permit persons to whom the
8
// Software is furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19
// DEALINGS IN THE SOFTWARE.
20

21
use std::{
22
    collections::{HashMap, VecDeque},
23
    fmt, io,
24
    task::{Context, Poll},
25
    time::Duration,
26
};
27

28
use bytes::Bytes;
29
use either::Either;
30
use futures::{
31
    future::{BoxFuture, FutureExt, TryFutureExt},
32
    io::AsyncWriteExt,
33
    stream::{FuturesUnordered, StreamExt},
34
};
35
use futures_timer::Delay;
36
use libp2p_core::{upgrade::ReadyUpgrade, ConnectedPoint, Multiaddr};
37
use libp2p_identity::PeerId;
38
use libp2p_swarm::{
39
    handler::{ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound},
40
    ConnectionHandler, ConnectionHandlerEvent, ConnectionId, Stream, StreamProtocol,
41
    StreamUpgradeError, SubstreamProtocol,
42
};
43
use web_time::Instant;
44

45
use crate::{
46
    behaviour::CircuitId,
47
    copy_future::CopyFuture,
48
    proto,
49
    protocol::{inbound_hop, outbound_stop},
50
    HOP_PROTOCOL_NAME, STOP_PROTOCOL_NAME,
51
};
52

53
const MAX_CONCURRENT_STREAMS_PER_CONNECTION: usize = 10;
54
const STREAM_TIMEOUT: Duration = Duration::from_secs(60);
55

56
#[derive(Debug, Clone)]
57
pub struct Config {
58
    pub reservation_duration: Duration,
59
    pub max_circuit_duration: Duration,
60
    pub max_circuit_bytes: u64,
61
}
62

63
pub enum In {
64
    AcceptReservationReq {
65
        inbound_reservation_req: inbound_hop::ReservationReq,
66
        addrs: Vec<Multiaddr>,
67
    },
68
    DenyReservationReq {
69
        inbound_reservation_req: inbound_hop::ReservationReq,
70
        status: proto::Status,
71
    },
72
    DenyCircuitReq {
73
        circuit_id: Option<CircuitId>,
74
        inbound_circuit_req: inbound_hop::CircuitReq,
75
        status: proto::Status,
76
    },
77
    NegotiateOutboundConnect {
78
        circuit_id: CircuitId,
79
        inbound_circuit_req: inbound_hop::CircuitReq,
80
        src_peer_id: PeerId,
81
        src_connection_id: ConnectionId,
82
    },
83
    AcceptAndDriveCircuit {
84
        circuit_id: CircuitId,
85
        dst_peer_id: PeerId,
86
        inbound_circuit_req: inbound_hop::CircuitReq,
87
        dst_stream: Stream,
88
        dst_pending_data: Bytes,
89
    },
90
}
91

92
impl fmt::Debug for In {
93
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
94
        match self {
×
95
            In::AcceptReservationReq {
96
                inbound_reservation_req: _,
97
                addrs,
×
98
            } => f
×
99
                .debug_struct("In::AcceptReservationReq")
×
100
                .field("addrs", addrs)
×
101
                .finish(),
×
102
            In::DenyReservationReq {
103
                inbound_reservation_req: _,
104
                status,
×
105
            } => f
×
106
                .debug_struct("In::DenyReservationReq")
×
107
                .field("status", status)
×
108
                .finish(),
×
109
            In::DenyCircuitReq {
110
                circuit_id,
×
111
                inbound_circuit_req: _,
112
                status,
×
113
            } => f
×
114
                .debug_struct("In::DenyCircuitReq")
×
115
                .field("circuit_id", circuit_id)
×
116
                .field("status", status)
×
117
                .finish(),
×
118
            In::NegotiateOutboundConnect {
119
                circuit_id,
×
120
                inbound_circuit_req: _,
121
                src_peer_id,
×
122
                src_connection_id,
×
123
            } => f
×
124
                .debug_struct("In::NegotiateOutboundConnect")
×
125
                .field("circuit_id", circuit_id)
×
126
                .field("src_peer_id", src_peer_id)
×
127
                .field("src_connection_id", src_connection_id)
×
128
                .finish(),
×
129
            In::AcceptAndDriveCircuit {
130
                circuit_id,
×
131
                inbound_circuit_req: _,
132
                dst_peer_id,
×
133
                dst_stream: _,
134
                dst_pending_data: _,
135
            } => f
×
136
                .debug_struct("In::AcceptAndDriveCircuit")
×
137
                .field("circuit_id", circuit_id)
×
138
                .field("dst_peer_id", dst_peer_id)
×
139
                .finish(),
×
140
        }
141
    }
×
142
}
143

144
/// The events produced by the [`Handler`].
145
#[allow(clippy::large_enum_variant)]
146
pub enum Event {
147
    /// An inbound reservation request has been received.
148
    ReservationReqReceived {
149
        inbound_reservation_req: inbound_hop::ReservationReq,
150
        endpoint: ConnectedPoint,
151
        /// Indicates whether the request replaces an existing reservation.
152
        renewed: bool,
153
    },
154
    /// An inbound reservation request has been accepted.
155
    ReservationReqAccepted {
156
        /// Indicates whether the request replaces an existing reservation.
157
        renewed: bool,
158
    },
159
    /// Accepting an inbound reservation request failed.
160
    ReservationReqAcceptFailed { error: inbound_hop::Error },
161
    /// An inbound reservation request has been denied.
162
    ReservationReqDenied { status: proto::Status },
163
    /// Denying an inbound reservation request has failed.
164
    ReservationReqDenyFailed { error: inbound_hop::Error },
165
    /// An inbound reservation has timed out.
166
    ReservationTimedOut {},
167
    /// An inbound circuit request has been received.
168
    CircuitReqReceived {
169
        inbound_circuit_req: inbound_hop::CircuitReq,
170
        endpoint: ConnectedPoint,
171
    },
172
    /// An inbound circuit request has been denied.
173
    CircuitReqDenied {
174
        circuit_id: Option<CircuitId>,
175
        dst_peer_id: PeerId,
176
        status: proto::Status,
177
    },
178
    /// Denying an inbound circuit request failed.
179
    CircuitReqDenyFailed {
180
        circuit_id: Option<CircuitId>,
181
        dst_peer_id: PeerId,
182
        error: inbound_hop::Error,
183
    },
184
    /// An inbound circuit request has been accepted.
185
    CircuitReqAccepted {
186
        circuit_id: CircuitId,
187
        dst_peer_id: PeerId,
188
    },
189
    /// Accepting an inbound circuit request failed.
190
    CircuitReqAcceptFailed {
191
        circuit_id: CircuitId,
192
        dst_peer_id: PeerId,
193
        error: inbound_hop::Error,
194
    },
195
    /// An outbound substream for an inbound circuit request has been
196
    /// negotiated.
197
    OutboundConnectNegotiated {
198
        circuit_id: CircuitId,
199
        src_peer_id: PeerId,
200
        src_connection_id: ConnectionId,
201
        inbound_circuit_req: inbound_hop::CircuitReq,
202
        dst_stream: Stream,
203
        dst_pending_data: Bytes,
204
    },
205
    /// Negotiating an outbound substream for an inbound circuit request failed.
206
    OutboundConnectNegotiationFailed {
207
        circuit_id: CircuitId,
208
        src_peer_id: PeerId,
209
        src_connection_id: ConnectionId,
210
        inbound_circuit_req: inbound_hop::CircuitReq,
211
        status: proto::Status,
212
        error: outbound_stop::Error,
213
    },
214
    /// An inbound circuit has closed.
215
    CircuitClosed {
216
        circuit_id: CircuitId,
217
        dst_peer_id: PeerId,
218
        error: Option<std::io::Error>,
219
    },
220
}
221

222
impl fmt::Debug for Event {
223
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
224
        match self {
×
225
            Event::ReservationReqReceived {
226
                inbound_reservation_req: _,
227
                endpoint,
×
228
                renewed,
×
229
            } => f
×
230
                .debug_struct("Event::ReservationReqReceived")
×
231
                .field("endpoint", endpoint)
×
232
                .field("renewed", renewed)
×
233
                .finish(),
×
234
            Event::ReservationReqAccepted { renewed } => f
×
235
                .debug_struct("Event::ReservationReqAccepted")
×
236
                .field("renewed", renewed)
×
237
                .finish(),
×
238
            Event::ReservationReqAcceptFailed { error } => f
×
239
                .debug_struct("Event::ReservationReqAcceptFailed")
×
240
                .field("error", error)
×
241
                .finish(),
×
242
            Event::ReservationReqDenied { status } => f
×
243
                .debug_struct("Event::ReservationReqDenied")
×
244
                .field("status", status)
×
245
                .finish(),
×
246
            Event::ReservationReqDenyFailed { error } => f
×
247
                .debug_struct("Event::ReservationReqDenyFailed")
×
248
                .field("error", error)
×
249
                .finish(),
×
250
            Event::ReservationTimedOut {} => f.debug_struct("Event::ReservationTimedOut").finish(),
×
251
            Event::CircuitReqReceived {
252
                endpoint,
×
253
                inbound_circuit_req: _,
254
            } => f
×
255
                .debug_struct("Event::CircuitReqReceived")
×
256
                .field("endpoint", endpoint)
×
257
                .finish(),
×
258
            Event::CircuitReqDenied {
259
                circuit_id,
×
260
                dst_peer_id,
×
261
                status,
×
262
            } => f
×
263
                .debug_struct("Event::CircuitReqDenied")
×
264
                .field("circuit_id", circuit_id)
×
265
                .field("dst_peer_id", dst_peer_id)
×
266
                .field("status", status)
×
267
                .finish(),
×
268
            Event::CircuitReqDenyFailed {
269
                circuit_id,
×
270
                dst_peer_id,
×
271
                error,
×
272
            } => f
×
273
                .debug_struct("Event::CircuitReqDenyFailed")
×
274
                .field("circuit_id", circuit_id)
×
275
                .field("dst_peer_id", dst_peer_id)
×
276
                .field("error", error)
×
277
                .finish(),
×
278
            Event::CircuitReqAccepted {
279
                circuit_id,
×
280
                dst_peer_id,
×
281
            } => f
×
282
                .debug_struct("Event::CircuitReqAccepted")
×
283
                .field("circuit_id", circuit_id)
×
284
                .field("dst_peer_id", dst_peer_id)
×
285
                .finish(),
×
286
            Event::CircuitReqAcceptFailed {
287
                circuit_id,
×
288
                dst_peer_id,
×
289
                error,
×
290
            } => f
×
291
                .debug_struct("Event::CircuitReqAcceptFailed")
×
292
                .field("circuit_id", circuit_id)
×
293
                .field("dst_peer_id", dst_peer_id)
×
294
                .field("error", error)
×
295
                .finish(),
×
296
            Event::OutboundConnectNegotiated {
297
                circuit_id,
×
298
                src_peer_id,
×
299
                src_connection_id,
×
300
                inbound_circuit_req: _,
301
                dst_stream: _,
302
                dst_pending_data: _,
303
            } => f
×
304
                .debug_struct("Event::OutboundConnectNegotiated")
×
305
                .field("circuit_id", circuit_id)
×
306
                .field("src_peer_id", src_peer_id)
×
307
                .field("src_connection_id", src_connection_id)
×
308
                .finish(),
×
309
            Event::OutboundConnectNegotiationFailed {
310
                circuit_id,
×
311
                src_peer_id,
×
312
                src_connection_id,
×
313
                inbound_circuit_req: _,
314
                status,
×
315
                error,
×
316
            } => f
×
317
                .debug_struct("Event::OutboundConnectNegotiationFailed")
×
318
                .field("circuit_id", circuit_id)
×
319
                .field("src_peer_id", src_peer_id)
×
320
                .field("src_connection_id", src_connection_id)
×
321
                .field("status", status)
×
322
                .field("error", error)
×
323
                .finish(),
×
324
            Event::CircuitClosed {
325
                circuit_id,
×
326
                dst_peer_id,
×
327
                error,
×
328
            } => f
×
329
                .debug_struct("Event::CircuitClosed")
×
330
                .field("circuit_id", circuit_id)
×
331
                .field("dst_peer_id", dst_peer_id)
×
332
                .field("error", error)
×
333
                .finish(),
×
334
        }
335
    }
×
336
}
337

338
/// [`ConnectionHandler`] that manages substreams for a relay on a single
339
/// connection with a peer.
340
pub struct Handler {
341
    endpoint: ConnectedPoint,
342

343
    /// Static [`Handler`] [`Config`].
344
    config: Config,
345

346
    /// Queue of events to return when polled.
347
    queued_events: VecDeque<
348
        ConnectionHandlerEvent<
349
            <Self as ConnectionHandler>::OutboundProtocol,
350
            (),
351
            <Self as ConnectionHandler>::ToBehaviour,
352
        >,
353
    >,
354

355
    /// The point in time when this connection started idleing.
356
    idle_at: Option<Instant>,
357

358
    /// Future handling inbound reservation request.
359
    reservation_request_future: Option<ReservationRequestFuture>,
360
    /// Timeout for the currently active reservation.
361
    active_reservation: Option<Delay>,
362

363
    /// Futures accepting an inbound circuit request.
364
    circuit_accept_futures: Futures<Result<CircuitParts, (CircuitId, PeerId, inbound_hop::Error)>>,
365
    /// Futures denying an inbound circuit request.
366
    circuit_deny_futures: Futures<(
367
        Option<CircuitId>,
368
        PeerId,
369
        proto::Status,
370
        Result<(), inbound_hop::Error>,
371
    )>,
372
    /// Futures relaying data for circuit between two peers.
373
    circuits: Futures<(CircuitId, PeerId, Result<(), std::io::Error>)>,
374

375
    /// We issue a stream upgrade for each [`PendingConnect`] request.
376
    pending_connect_requests: VecDeque<PendingConnect>,
377

378
    /// A `CONNECT` request is in flight for these circuits.
379
    active_connect_requests: HashMap<CircuitId, PendingConnect>,
380

381
    inbound_workers: futures_bounded::FuturesSet<
382
        Result<Either<inbound_hop::ReservationReq, inbound_hop::CircuitReq>, inbound_hop::Error>,
383
    >,
384
    outbound_workers: futures_bounded::FuturesMap<
385
        CircuitId,
386
        Result<outbound_stop::Circuit, outbound_stop::Error>,
387
    >,
388
}
389

390
impl Handler {
391
    pub fn new(config: Config, endpoint: ConnectedPoint) -> Handler {
9✔
392
        Handler {
9✔
393
            inbound_workers: futures_bounded::FuturesSet::new(
9✔
394
                STREAM_TIMEOUT,
9✔
395
                MAX_CONCURRENT_STREAMS_PER_CONNECTION,
9✔
396
            ),
9✔
397
            outbound_workers: futures_bounded::FuturesMap::new(
9✔
398
                STREAM_TIMEOUT,
9✔
399
                MAX_CONCURRENT_STREAMS_PER_CONNECTION,
9✔
400
            ),
9✔
401
            endpoint,
9✔
402
            config,
9✔
403
            queued_events: Default::default(),
9✔
404
            idle_at: None,
9✔
405
            reservation_request_future: Default::default(),
9✔
406
            circuit_accept_futures: Default::default(),
9✔
407
            circuit_deny_futures: Default::default(),
9✔
408
            circuits: Default::default(),
9✔
409
            active_reservation: Default::default(),
9✔
410
            pending_connect_requests: Default::default(),
9✔
411
            active_connect_requests: Default::default(),
9✔
412
        }
9✔
413
    }
9✔
414

415
    fn on_fully_negotiated_inbound(&mut self, stream: Stream) {
11✔
416
        if self
11✔
417
            .inbound_workers
11✔
418
            .try_push(inbound_hop::handle_inbound_request(
11✔
419
                stream,
11✔
420
                self.config.reservation_duration,
11✔
421
                self.config.max_circuit_duration,
11✔
422
                self.config.max_circuit_bytes,
11✔
423
            ))
11✔
424
            .is_err()
11✔
425
        {
426
            tracing::warn!("Dropping inbound stream because we are at capacity")
×
427
        }
11✔
428
    }
11✔
429

430
    fn on_fully_negotiated_outbound(&mut self, stream: Stream) {
2✔
431
        let connect = self
2✔
432
            .pending_connect_requests
2✔
433
            .pop_front()
2✔
434
            .expect("opened a stream without a pending stop command");
2✔
435

436
        if self
2✔
437
            .outbound_workers
2✔
438
            .try_push(
2✔
439
                connect.circuit_id,
2✔
440
                outbound_stop::connect(
2✔
441
                    stream,
2✔
442
                    connect.src_peer_id,
2✔
443
                    connect.max_circuit_duration,
2✔
444
                    connect.max_circuit_bytes,
2✔
445
                ),
2✔
446
            )
2✔
447
            .is_err()
2✔
448
        {
449
            tracing::warn!("Dropping outbound stream because we are at capacity")
×
450
        }
2✔
451

452
        self.active_connect_requests
2✔
453
            .insert(connect.circuit_id, connect);
2✔
454
    }
2✔
455

456
    fn on_dial_upgrade_error(
×
457
        &mut self,
×
458
        DialUpgradeError { error, .. }: DialUpgradeError<
×
459
            (),
×
460
            <Self as ConnectionHandler>::OutboundProtocol,
×
461
        >,
×
462
    ) {
×
463
        let error = match error {
×
464
            StreamUpgradeError::Timeout => outbound_stop::Error::Io(io::ErrorKind::TimedOut.into()),
×
465
            StreamUpgradeError::NegotiationFailed => outbound_stop::Error::Unsupported,
×
466
            StreamUpgradeError::Io(e) => outbound_stop::Error::Io(e),
×
467
            StreamUpgradeError::Apply(v) => libp2p_core::util::unreachable(v),
×
468
        };
469

470
        let stop_command = self
×
471
            .pending_connect_requests
×
472
            .pop_front()
×
473
            .expect("failed to open a stream without a pending stop command");
×
474

475
        self.queued_events
×
476
            .push_back(ConnectionHandlerEvent::NotifyBehaviour(
×
477
                Event::OutboundConnectNegotiationFailed {
×
478
                    circuit_id: stop_command.circuit_id,
×
479
                    src_peer_id: stop_command.src_peer_id,
×
480
                    src_connection_id: stop_command.src_connection_id,
×
481
                    inbound_circuit_req: stop_command.inbound_circuit_req,
×
482
                    status: proto::Status::CONNECTION_FAILED,
×
483
                    error,
×
484
                },
×
485
            ));
×
486
    }
×
487
}
488

489
enum ReservationRequestFuture {
490
    Accepting(BoxFuture<'static, Result<(), inbound_hop::Error>>),
491
    Denying(BoxFuture<'static, (proto::Status, Result<(), inbound_hop::Error>)>),
492
}
493

494
type Futures<T> = FuturesUnordered<BoxFuture<'static, T>>;
495

496
impl ConnectionHandler for Handler {
497
    type FromBehaviour = In;
498
    type ToBehaviour = Event;
499
    type InboundProtocol = ReadyUpgrade<StreamProtocol>;
500
    type InboundOpenInfo = ();
501
    type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
502
    type OutboundOpenInfo = ();
503

504
    fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
212✔
505
        SubstreamProtocol::new(ReadyUpgrade::new(HOP_PROTOCOL_NAME), ())
212✔
506
    }
212✔
507

508
    fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
13✔
509
        match event {
13✔
510
            In::AcceptReservationReq {
511
                inbound_reservation_req,
7✔
512
                addrs,
7✔
513
            } => {
514
                if self
7✔
515
                    .reservation_request_future
7✔
516
                    .replace(ReservationRequestFuture::Accepting(
7✔
517
                        inbound_reservation_req.accept(addrs).err_into().boxed(),
7✔
518
                    ))
7✔
519
                    .is_some()
7✔
520
                {
521
                    tracing::warn!("Dropping existing deny/accept future in favor of new one")
×
522
                }
7✔
523
            }
524
            In::DenyReservationReq {
525
                inbound_reservation_req,
1✔
526
                status,
1✔
527
            } => {
528
                if self
1✔
529
                    .reservation_request_future
1✔
530
                    .replace(ReservationRequestFuture::Denying(
1✔
531
                        inbound_reservation_req
1✔
532
                            .deny(status)
1✔
533
                            .err_into()
1✔
534
                            .map(move |result| (status, result))
1✔
535
                            .boxed(),
1✔
536
                    ))
537
                    .is_some()
1✔
538
                {
539
                    tracing::warn!("Dropping existing deny/accept future in favor of new one")
×
540
                }
1✔
541
            }
542
            In::NegotiateOutboundConnect {
543
                circuit_id,
2✔
544
                inbound_circuit_req,
2✔
545
                src_peer_id,
2✔
546
                src_connection_id,
2✔
547
            } => {
2✔
548
                self.pending_connect_requests.push_back(PendingConnect::new(
2✔
549
                    circuit_id,
2✔
550
                    inbound_circuit_req,
2✔
551
                    src_peer_id,
2✔
552
                    src_connection_id,
2✔
553
                    &self.config,
2✔
554
                ));
2✔
555
                self.queued_events
2✔
556
                    .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
2✔
557
                        protocol: SubstreamProtocol::new(ReadyUpgrade::new(STOP_PROTOCOL_NAME), ()),
2✔
558
                    });
2✔
559
            }
2✔
560
            In::DenyCircuitReq {
561
                circuit_id,
1✔
562
                inbound_circuit_req,
1✔
563
                status,
1✔
564
            } => {
565
                let dst_peer_id = inbound_circuit_req.dst();
1✔
566
                self.circuit_deny_futures.push(
1✔
567
                    inbound_circuit_req
1✔
568
                        .deny(status)
1✔
569
                        .err_into()
1✔
570
                        .map(move |result| (circuit_id, dst_peer_id, status, result))
1✔
571
                        .boxed(),
1✔
572
                );
573
            }
574
            In::AcceptAndDriveCircuit {
575
                circuit_id,
2✔
576
                dst_peer_id,
2✔
577
                inbound_circuit_req,
2✔
578
                dst_stream,
2✔
579
                dst_pending_data,
2✔
580
            } => {
581
                self.circuit_accept_futures.push(
2✔
582
                    inbound_circuit_req
2✔
583
                        .accept()
2✔
584
                        .err_into()
2✔
585
                        .map_ok(move |(src_stream, src_pending_data)| CircuitParts {
2✔
586
                            circuit_id,
2✔
587
                            src_stream,
2✔
588
                            src_pending_data,
2✔
589
                            dst_peer_id,
2✔
590
                            dst_stream,
2✔
591
                            dst_pending_data,
2✔
592
                        })
2✔
593
                        .map_err(move |e| (circuit_id, dst_peer_id, e))
2✔
594
                        .boxed(),
2✔
595
                );
596
            }
597
        }
598
    }
13✔
599

600
    fn connection_keep_alive(&self) -> bool {
46✔
601
        let Some(idle_at) = self.idle_at else {
46✔
602
            return true;
23✔
603
        };
604

605
        Instant::now().duration_since(idle_at) <= Duration::from_secs(10)
23✔
606
    }
46✔
607

608
    #[tracing::instrument(level = "trace", name = "ConnectionHandler::poll", skip(self, cx))]
609
    fn poll(
280✔
610
        &mut self,
280✔
611
        cx: &mut Context<'_>,
280✔
612
    ) -> Poll<ConnectionHandlerEvent<Self::OutboundProtocol, (), Self::ToBehaviour>> {
280✔
613
        // Return queued events.
614
        if let Some(event) = self.queued_events.pop_front() {
280✔
615
            return Poll::Ready(event);
2✔
616
        }
278✔
617

618
        // Progress existing circuits.
619
        if let Poll::Ready(Some((circuit_id, dst_peer_id, result))) =
×
620
            self.circuits.poll_next_unpin(cx)
278✔
621
        {
622
            match result {
×
623
                Ok(()) => {
624
                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
×
625
                        Event::CircuitClosed {
×
626
                            circuit_id,
×
627
                            dst_peer_id,
×
628
                            error: None,
×
629
                        },
×
630
                    ))
×
631
                }
632
                Err(e) => {
×
633
                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
×
634
                        Event::CircuitClosed {
×
635
                            circuit_id,
×
636
                            dst_peer_id,
×
637
                            error: Some(e),
×
638
                        },
×
639
                    ))
×
640
                }
641
            }
642
        }
278✔
643

644
        // Process inbound protocol workers
645
        loop {
646
            match self.inbound_workers.poll_unpin(cx) {
278✔
647
                Poll::Ready(Ok(Ok(Either::Left(inbound_reservation_req)))) => {
8✔
648
                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
8✔
649
                        Event::ReservationReqReceived {
8✔
650
                            inbound_reservation_req,
8✔
651
                            endpoint: self.endpoint.clone(),
8✔
652
                            renewed: self.active_reservation.is_some(),
8✔
653
                        },
8✔
654
                    ));
8✔
655
                }
656
                Poll::Ready(Ok(Ok(Either::Right(inbound_circuit_req)))) => {
3✔
657
                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
3✔
658
                        Event::CircuitReqReceived {
3✔
659
                            inbound_circuit_req,
3✔
660
                            endpoint: self.endpoint.clone(),
3✔
661
                        },
3✔
662
                    ));
3✔
663
                }
664
                Poll::Ready(Err(e)) => {
×
665
                    tracing::debug!("Inbound stream operation timed out: {e}");
×
666
                    continue;
×
667
                }
668
                Poll::Ready(Ok(Err(e))) => {
×
669
                    tracing::debug!("Inbound stream operation failed: {e}");
×
670
                    continue;
×
671
                }
672
                Poll::Pending => {
673
                    break;
267✔
674
                }
675
            }
676
        }
677

678
        // Process outbound protocol workers
679
        match self.outbound_workers.poll_unpin(cx) {
267✔
680
            Poll::Ready((id, Ok(Ok(circuit)))) => {
2✔
681
                let connect = self
2✔
682
                    .active_connect_requests
2✔
683
                    .remove(&id)
2✔
684
                    .expect("must have pending connect");
2✔
685

686
                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
2✔
687
                    Event::OutboundConnectNegotiated {
2✔
688
                        circuit_id: id,
2✔
689
                        src_peer_id: connect.src_peer_id,
2✔
690
                        src_connection_id: connect.src_connection_id,
2✔
691
                        inbound_circuit_req: connect.inbound_circuit_req,
2✔
692
                        dst_stream: circuit.dst_stream,
2✔
693
                        dst_pending_data: circuit.dst_pending_data,
2✔
694
                    },
2✔
695
                ));
2✔
696
            }
697
            Poll::Ready((id, Ok(Err(error)))) => {
×
698
                let connect = self
×
699
                    .active_connect_requests
×
700
                    .remove(&id)
×
701
                    .expect("must have pending connect");
×
702

703
                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
×
704
                    Event::OutboundConnectNegotiationFailed {
×
705
                        circuit_id: connect.circuit_id,
×
706
                        src_peer_id: connect.src_peer_id,
×
707
                        src_connection_id: connect.src_connection_id,
×
708
                        inbound_circuit_req: connect.inbound_circuit_req,
×
709
                        status: error.to_status(),
×
710
                        error,
×
711
                    },
×
712
                ));
×
713
            }
714
            Poll::Ready((id, Err(futures_bounded::Timeout { .. }))) => {
×
715
                let connect = self
×
716
                    .active_connect_requests
×
717
                    .remove(&id)
×
718
                    .expect("must have pending connect");
×
719

720
                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
×
721
                    Event::OutboundConnectNegotiationFailed {
×
722
                        circuit_id: connect.circuit_id,
×
723
                        src_peer_id: connect.src_peer_id,
×
724
                        src_connection_id: connect.src_connection_id,
×
725
                        inbound_circuit_req: connect.inbound_circuit_req,
×
726
                        status: proto::Status::CONNECTION_FAILED, // Best fit?
×
727
                        error: outbound_stop::Error::Io(io::ErrorKind::TimedOut.into()),
×
728
                    },
×
729
                ));
×
730
            }
731
            Poll::Pending => {}
265✔
732
        }
733

734
        // Deny new circuits.
735
        if let Poll::Ready(Some((circuit_id, dst_peer_id, status, result))) =
1✔
736
            self.circuit_deny_futures.poll_next_unpin(cx)
265✔
737
        {
738
            match result {
1✔
739
                Ok(()) => {
740
                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
1✔
741
                        Event::CircuitReqDenied {
1✔
742
                            circuit_id,
1✔
743
                            dst_peer_id,
1✔
744
                            status,
1✔
745
                        },
1✔
746
                    ));
1✔
747
                }
748
                Err(error) => {
×
749
                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
×
750
                        Event::CircuitReqDenyFailed {
×
751
                            circuit_id,
×
752
                            dst_peer_id,
×
753
                            error,
×
754
                        },
×
755
                    ));
×
756
                }
757
            }
758
        }
264✔
759

760
        // Accept new circuits.
761
        if let Poll::Ready(Some(result)) = self.circuit_accept_futures.poll_next_unpin(cx) {
264✔
762
            match result {
2✔
763
                Ok(parts) => {
2✔
764
                    let CircuitParts {
765
                        circuit_id,
2✔
766
                        mut src_stream,
2✔
767
                        src_pending_data,
2✔
768
                        dst_peer_id,
2✔
769
                        mut dst_stream,
2✔
770
                        dst_pending_data,
2✔
771
                    } = parts;
2✔
772
                    let max_circuit_duration = self.config.max_circuit_duration;
2✔
773
                    let max_circuit_bytes = self.config.max_circuit_bytes;
2✔
774

775
                    let circuit = async move {
2✔
776
                        let (result_1, result_2) = futures::future::join(
2✔
777
                            src_stream.write_all(&dst_pending_data),
2✔
778
                            dst_stream.write_all(&src_pending_data),
2✔
779
                        )
780
                        .await;
2✔
781
                        result_1?;
2✔
782
                        result_2?;
2✔
783

784
                        CopyFuture::new(
2✔
785
                            src_stream,
2✔
786
                            dst_stream,
2✔
787
                            max_circuit_duration,
2✔
788
                            max_circuit_bytes,
2✔
789
                        )
2✔
790
                        .await?;
2✔
791

792
                        Ok(())
×
793
                    }
×
794
                    .map(move |r| (circuit_id, dst_peer_id, r))
2✔
795
                    .boxed();
2✔
796

797
                    self.circuits.push(circuit);
2✔
798

799
                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
2✔
800
                        Event::CircuitReqAccepted {
2✔
801
                            circuit_id,
2✔
802
                            dst_peer_id,
2✔
803
                        },
2✔
804
                    ));
2✔
805
                }
806
                Err((circuit_id, dst_peer_id, error)) => {
×
807
                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
×
808
                        Event::CircuitReqAcceptFailed {
×
809
                            circuit_id,
×
810
                            dst_peer_id,
×
811
                            error,
×
812
                        },
×
813
                    ));
×
814
                }
815
            }
816
        }
262✔
817

818
        // Check active reservation.
819
        if let Some(Poll::Ready(())) = self
262✔
820
            .active_reservation
262✔
821
            .as_mut()
262✔
822
            .map(|fut| fut.poll_unpin(cx))
262✔
823
        {
824
            self.active_reservation = None;
1✔
825
            return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
1✔
826
                Event::ReservationTimedOut {},
1✔
827
            ));
1✔
828
        }
261✔
829

830
        // Progress reservation request.
831
        match self.reservation_request_future.as_mut() {
261✔
832
            Some(ReservationRequestFuture::Accepting(fut)) => {
7✔
833
                if let Poll::Ready(result) = fut.poll_unpin(cx) {
7✔
834
                    self.reservation_request_future = None;
7✔
835

836
                    match result {
7✔
837
                        Ok(()) => {
838
                            let renewed = self
7✔
839
                                .active_reservation
7✔
840
                                .replace(Delay::new(self.config.reservation_duration))
7✔
841
                                .is_some();
7✔
842
                            return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
7✔
843
                                Event::ReservationReqAccepted { renewed },
7✔
844
                            ));
7✔
845
                        }
846
                        Err(error) => {
×
847
                            return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
×
848
                                Event::ReservationReqAcceptFailed { error },
×
849
                            ));
×
850
                        }
851
                    }
852
                }
×
853
            }
854
            Some(ReservationRequestFuture::Denying(fut)) => {
1✔
855
                if let Poll::Ready((status, result)) = fut.poll_unpin(cx) {
1✔
856
                    self.reservation_request_future = None;
1✔
857

858
                    match result {
1✔
859
                        Ok(()) => {
860
                            return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
1✔
861
                                Event::ReservationReqDenied { status },
1✔
862
                            ))
1✔
863
                        }
864
                        Err(error) => {
×
865
                            return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
×
866
                                Event::ReservationReqDenyFailed { error },
×
867
                            ));
×
868
                        }
869
                    }
870
                }
×
871
            }
872
            None => {}
253✔
873
        }
874

875
        // Check keep alive status.
876
        if self.active_reservation.is_none() {
253✔
877
            if self.idle_at.is_none() {
171✔
878
                self.idle_at = Some(Instant::now());
10✔
879
            }
161✔
880
        } else {
82✔
881
            self.idle_at = None;
82✔
882
        }
82✔
883

884
        Poll::Pending
253✔
885
    }
280✔
886

887
    fn on_connection_event(
24✔
888
        &mut self,
24✔
889
        event: ConnectionEvent<Self::InboundProtocol, Self::OutboundProtocol>,
24✔
890
    ) {
24✔
891
        match event {
24✔
892
            ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
893
                protocol: stream,
11✔
894
                ..
895
            }) => {
11✔
896
                self.on_fully_negotiated_inbound(stream);
11✔
897
            }
11✔
898
            ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
899
                protocol: stream,
2✔
900
                ..
901
            }) => {
2✔
902
                self.on_fully_negotiated_outbound(stream);
2✔
903
            }
2✔
904
            ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
×
905
                self.on_dial_upgrade_error(dial_upgrade_error);
×
906
            }
×
907
            _ => {}
11✔
908
        }
909
    }
24✔
910
}
911

912
struct CircuitParts {
913
    circuit_id: CircuitId,
914
    src_stream: Stream,
915
    src_pending_data: Bytes,
916
    dst_peer_id: PeerId,
917
    dst_stream: Stream,
918
    dst_pending_data: Bytes,
919
}
920

921
/// Holds everything we know about a to-be-issued `CONNECT` request to a peer.
922
struct PendingConnect {
923
    circuit_id: CircuitId,
924
    inbound_circuit_req: inbound_hop::CircuitReq,
925
    src_peer_id: PeerId,
926
    src_connection_id: ConnectionId,
927
    max_circuit_duration: Duration,
928
    max_circuit_bytes: u64,
929
}
930

931
impl PendingConnect {
932
    fn new(
2✔
933
        circuit_id: CircuitId,
2✔
934
        inbound_circuit_req: inbound_hop::CircuitReq,
2✔
935
        src_peer_id: PeerId,
2✔
936
        src_connection_id: ConnectionId,
2✔
937
        config: &Config,
2✔
938
    ) -> Self {
2✔
939
        Self {
2✔
940
            circuit_id,
2✔
941
            inbound_circuit_req,
2✔
942
            src_peer_id,
2✔
943
            src_connection_id,
2✔
944
            max_circuit_duration: config.max_circuit_duration,
2✔
945
            max_circuit_bytes: config.max_circuit_bytes,
2✔
946
        }
2✔
947
    }
2✔
948
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc