• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

butlergroup / rust-libp2p / 18610913338

18 Oct 2025 04:41AM UTC coverage: 78.379% (+2.5%) from 75.842%
18610913338

push

github

butlergroup
	modified:   .github/workflows/ci.yml

36944 of 47135 relevant lines covered (78.38%)

37728.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.94
/protocols/relay/src/priv_client/handler.rs
1
// Copyright 2021 Protocol Labs.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a
4
// copy of this software and associated documentation files (the "Software"),
5
// to deal in the Software without restriction, including without limitation
6
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7
// and/or sell copies of the Software, and to permit persons to whom the
8
// Software is furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19
// DEALINGS IN THE SOFTWARE.
20

21
use std::{
22
    collections::VecDeque,
23
    convert::Infallible,
24
    fmt, io,
25
    task::{Context, Poll},
26
    time::Duration,
27
};
28

29
use futures::{
30
    channel::{mpsc, mpsc::Sender, oneshot},
31
    future::FutureExt,
32
};
33
use futures_timer::Delay;
34
use libp2p_core::{multiaddr::Protocol, upgrade::ReadyUpgrade, Multiaddr};
35
use libp2p_identity::PeerId;
36
use libp2p_swarm::{
37
    handler::{ConnectionEvent, FullyNegotiatedInbound},
38
    ConnectionHandler, ConnectionHandlerEvent, Stream, StreamProtocol, StreamUpgradeError,
39
    SubstreamProtocol,
40
};
41

42
use crate::{
43
    client::Connection,
44
    priv_client,
45
    priv_client::{transport, transport::ToListenerMsg},
46
    proto,
47
    protocol::{self, inbound_stop, outbound_hop},
48
    HOP_PROTOCOL_NAME, STOP_PROTOCOL_NAME,
49
};
50

51
/// The maximum number of circuits being denied concurrently.
52
///
53
/// Circuits to be denied exceeding the limit are dropped.
54
const MAX_NUMBER_DENYING_CIRCUIT: usize = 8;
55
const DENYING_CIRCUIT_TIMEOUT: Duration = Duration::from_secs(60);
56

57
const MAX_CONCURRENT_STREAMS_PER_CONNECTION: usize = 10;
58
const STREAM_TIMEOUT: Duration = Duration::from_secs(60);
59

60
pub enum In {
61
    Reserve {
62
        to_listener: mpsc::Sender<transport::ToListenerMsg>,
63
    },
64
    EstablishCircuit {
65
        dst_peer_id: PeerId,
66
        to_dial: oneshot::Sender<Result<priv_client::Connection, outbound_hop::ConnectError>>,
67
    },
68
}
69

70
impl fmt::Debug for In {
71
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
72
        match self {
×
73
            In::Reserve { to_listener: _ } => f.debug_struct("In::Reserve").finish(),
×
74
            In::EstablishCircuit {
75
                dst_peer_id,
×
76
                to_dial: _,
77
            } => f
×
78
                .debug_struct("In::EstablishCircuit")
×
79
                .field("dst_peer_id", dst_peer_id)
×
80
                .finish(),
×
81
        }
82
    }
×
83
}
84

85
#[derive(Debug)]
86
pub enum Event {
87
    ReservationReqAccepted {
88
        /// Indicates whether the request replaces an existing reservation.
89
        renewal: bool,
90
        limit: Option<protocol::Limit>,
91
    },
92
    /// An outbound circuit has been established.
93
    OutboundCircuitEstablished { limit: Option<protocol::Limit> },
94
    /// An inbound circuit has been established.
95
    InboundCircuitEstablished {
96
        src_peer_id: PeerId,
97
        limit: Option<protocol::Limit>,
98
    },
99
}
100

101
pub struct Handler {
102
    local_peer_id: PeerId,
103
    remote_peer_id: PeerId,
104
    remote_addr: Multiaddr,
105

106
    /// Queue of events to return when polled.
107
    queued_events: VecDeque<
108
        ConnectionHandlerEvent<
109
            <Handler as ConnectionHandler>::OutboundProtocol,
110
            (),
111
            <Handler as ConnectionHandler>::ToBehaviour,
112
        >,
113
    >,
114

115
    pending_streams: VecDeque<oneshot::Sender<Result<Stream, StreamUpgradeError<Infallible>>>>,
116

117
    inflight_reserve_requests: futures_bounded::FuturesTupleSet<
118
        Result<outbound_hop::Reservation, outbound_hop::ReserveError>,
119
        mpsc::Sender<transport::ToListenerMsg>,
120
    >,
121

122
    inflight_outbound_connect_requests: futures_bounded::FuturesTupleSet<
123
        Result<outbound_hop::Circuit, outbound_hop::ConnectError>,
124
        oneshot::Sender<Result<priv_client::Connection, outbound_hop::ConnectError>>,
125
    >,
126

127
    inflight_inbound_circuit_requests:
128
        futures_bounded::FuturesSet<Result<inbound_stop::Circuit, inbound_stop::Error>>,
129

130
    inflight_outbound_circuit_deny_requests:
131
        futures_bounded::FuturesSet<Result<(), inbound_stop::Error>>,
132

133
    reservation: Reservation,
134
}
135

136
impl Handler {
137
    pub fn new(local_peer_id: PeerId, remote_peer_id: PeerId, remote_addr: Multiaddr) -> Self {
11✔
138
        Self {
11✔
139
            local_peer_id,
11✔
140
            remote_peer_id,
11✔
141
            remote_addr,
11✔
142
            queued_events: Default::default(),
11✔
143
            pending_streams: Default::default(),
11✔
144
            inflight_reserve_requests: futures_bounded::FuturesTupleSet::new(
11✔
145
                STREAM_TIMEOUT,
11✔
146
                MAX_CONCURRENT_STREAMS_PER_CONNECTION,
11✔
147
            ),
11✔
148
            inflight_inbound_circuit_requests: futures_bounded::FuturesSet::new(
11✔
149
                STREAM_TIMEOUT,
11✔
150
                MAX_CONCURRENT_STREAMS_PER_CONNECTION,
11✔
151
            ),
11✔
152
            inflight_outbound_connect_requests: futures_bounded::FuturesTupleSet::new(
11✔
153
                STREAM_TIMEOUT,
11✔
154
                MAX_CONCURRENT_STREAMS_PER_CONNECTION,
11✔
155
            ),
11✔
156
            inflight_outbound_circuit_deny_requests: futures_bounded::FuturesSet::new(
11✔
157
                DENYING_CIRCUIT_TIMEOUT,
11✔
158
                MAX_NUMBER_DENYING_CIRCUIT,
11✔
159
            ),
11✔
160
            reservation: Reservation::None,
11✔
161
        }
11✔
162
    }
11✔
163

164
    fn insert_to_deny_futs(&mut self, circuit: inbound_stop::Circuit) {
×
165
        let src_peer_id = circuit.src_peer_id();
×
166

167
        if self
×
168
            .inflight_outbound_circuit_deny_requests
×
169
            .try_push(circuit.deny(proto::Status::NO_RESERVATION))
×
170
            .is_err()
×
171
        {
172
            tracing::warn!(
×
173
                peer=%src_peer_id,
174
                "Dropping existing inbound circuit request to be denied from peer in favor of new one"
×
175
            )
176
        }
×
177
    }
×
178

179
    fn make_new_reservation(&mut self, to_listener: Sender<ToListenerMsg>) {
8✔
180
        let (sender, receiver) = oneshot::channel();
8✔
181

182
        self.pending_streams.push_back(sender);
8✔
183
        self.queued_events
8✔
184
            .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
8✔
185
                protocol: SubstreamProtocol::new(ReadyUpgrade::new(HOP_PROTOCOL_NAME), ()),
8✔
186
            });
8✔
187
        let result = self.inflight_reserve_requests.try_push(
8✔
188
            async move {
8✔
189
                let stream = receiver
8✔
190
                    .await
8✔
191
                    .map_err(|_| io::Error::from(io::ErrorKind::BrokenPipe))?
8✔
192
                    .map_err(into_reserve_error)?;
8✔
193

194
                let reservation = outbound_hop::make_reservation(stream).await?;
8✔
195

196
                Ok(reservation)
7✔
197
            },
8✔
198
            to_listener,
8✔
199
        );
200

201
        if result.is_err() {
8✔
202
            tracing::warn!("Dropping in-flight reservation request because we are at capacity");
×
203
        }
8✔
204
    }
8✔
205

206
    fn establish_new_circuit(
3✔
207
        &mut self,
3✔
208
        to_dial: oneshot::Sender<Result<Connection, outbound_hop::ConnectError>>,
3✔
209
        dst_peer_id: PeerId,
3✔
210
    ) {
3✔
211
        let (sender, receiver) = oneshot::channel();
3✔
212

213
        self.pending_streams.push_back(sender);
3✔
214
        self.queued_events
3✔
215
            .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
3✔
216
                protocol: SubstreamProtocol::new(ReadyUpgrade::new(HOP_PROTOCOL_NAME), ()),
3✔
217
            });
3✔
218
        let result = self.inflight_outbound_connect_requests.try_push(
3✔
219
            async move {
3✔
220
                let stream = receiver
3✔
221
                    .await
3✔
222
                    .map_err(|_| io::Error::from(io::ErrorKind::BrokenPipe))?
3✔
223
                    .map_err(into_connect_error)?;
3✔
224

225
                outbound_hop::open_circuit(stream, dst_peer_id).await
3✔
226
            },
3✔
227
            to_dial,
3✔
228
        );
229

230
        if result.is_err() {
3✔
231
            tracing::warn!("Dropping in-flight connect request because we are at capacity")
×
232
        }
3✔
233
    }
3✔
234
}
235

236
impl ConnectionHandler for Handler {
237
    type FromBehaviour = In;
238
    type ToBehaviour = Event;
239
    type InboundProtocol = ReadyUpgrade<StreamProtocol>;
240
    type InboundOpenInfo = ();
241
    type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
242
    type OutboundOpenInfo = ();
243

244
    fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
189✔
245
        SubstreamProtocol::new(ReadyUpgrade::new(STOP_PROTOCOL_NAME), ())
189✔
246
    }
189✔
247

248
    fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
10✔
249
        match event {
10✔
250
            In::Reserve { to_listener } => {
7✔
251
                self.make_new_reservation(to_listener);
7✔
252
            }
7✔
253
            In::EstablishCircuit {
254
                to_dial,
3✔
255
                dst_peer_id,
3✔
256
            } => {
3✔
257
                self.establish_new_circuit(to_dial, dst_peer_id);
3✔
258
            }
3✔
259
        }
260
    }
10✔
261

262
    fn connection_keep_alive(&self) -> bool {
34✔
263
        self.reservation.is_some()
34✔
264
    }
34✔
265

266
    #[tracing::instrument(level = "trace", name = "ConnectionHandler::poll", skip(self, cx))]
267
    fn poll(
259✔
268
        &mut self,
259✔
269
        cx: &mut Context<'_>,
259✔
270
    ) -> Poll<ConnectionHandlerEvent<Self::OutboundProtocol, (), Self::ToBehaviour>> {
259✔
271
        loop {
272
            // Reservations
273
            match self.inflight_reserve_requests.poll_unpin(cx) {
262✔
274
                Poll::Ready((
275
                    Ok(Ok(outbound_hop::Reservation {
276
                        renewal_timeout,
7✔
277
                        addrs,
7✔
278
                        limit,
7✔
279
                    })),
280
                    to_listener,
7✔
281
                )) => {
282
                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
7✔
283
                        self.reservation.accepted(
7✔
284
                            renewal_timeout,
7✔
285
                            addrs,
7✔
286
                            to_listener,
7✔
287
                            self.local_peer_id,
7✔
288
                            limit,
7✔
289
                        ),
7✔
290
                    ));
7✔
291
                }
292
                Poll::Ready((Ok(Err(error)), mut to_listener)) => {
1✔
293
                    if let Err(e) =
×
294
                        to_listener.try_send(transport::ToListenerMsg::Reservation(Err(error)))
1✔
295
                    {
296
                        tracing::debug!("Unable to send error to listener: {}", e.into_send_error())
×
297
                    }
1✔
298
                    self.reservation.failed();
1✔
299
                    continue;
1✔
300
                }
301
                Poll::Ready((Err(futures_bounded::Timeout { .. }), mut to_listener)) => {
×
302
                    if let Err(e) =
×
303
                        to_listener.try_send(transport::ToListenerMsg::Reservation(Err(
×
304
                            outbound_hop::ReserveError::Io(io::ErrorKind::TimedOut.into()),
×
305
                        )))
×
306
                    {
307
                        tracing::debug!("Unable to send error to listener: {}", e.into_send_error())
×
308
                    }
×
309
                    self.reservation.failed();
×
310
                    continue;
×
311
                }
312
                Poll::Pending => {}
254✔
313
            }
314

315
            // Circuits
316
            match self.inflight_outbound_connect_requests.poll_unpin(cx) {
254✔
317
                Poll::Ready((
318
                    Ok(Ok(outbound_hop::Circuit {
319
                        limit,
2✔
320
                        read_buffer,
2✔
321
                        stream,
2✔
322
                    })),
323
                    to_dialer,
2✔
324
                )) => {
325
                    if to_dialer
2✔
326
                        .send(Ok(priv_client::Connection {
2✔
327
                            state: priv_client::ConnectionState::new_outbound(stream, read_buffer),
2✔
328
                        }))
2✔
329
                        .is_err()
2✔
330
                    {
331
                        tracing::debug!(
×
332
                            "Dropping newly established circuit because the listener is gone"
×
333
                        );
334
                        continue;
×
335
                    }
2✔
336

337
                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
2✔
338
                        Event::OutboundCircuitEstablished { limit },
2✔
339
                    ));
2✔
340
                }
341
                Poll::Ready((Ok(Err(error)), to_dialer)) => {
1✔
342
                    let _ = to_dialer.send(Err(error));
1✔
343
                    continue;
1✔
344
                }
345
                Poll::Ready((Err(futures_bounded::Timeout { .. }), to_dialer)) => {
×
346
                    if to_dialer
×
347
                        .send(Err(outbound_hop::ConnectError::Io(
×
348
                            io::ErrorKind::TimedOut.into(),
×
349
                        )))
×
350
                        .is_err()
×
351
                    {
352
                        tracing::debug!("Unable to send error to dialer")
×
353
                    }
×
354
                    self.reservation.failed();
×
355
                    continue;
×
356
                }
357
                Poll::Pending => {}
251✔
358
            }
359

360
            // Return queued events.
361
            if let Some(event) = self.queued_events.pop_front() {
251✔
362
                return Poll::Ready(event);
11✔
363
            }
240✔
364

365
            match self.inflight_inbound_circuit_requests.poll_unpin(cx) {
240✔
366
                Poll::Ready(Ok(Ok(circuit))) => match &mut self.reservation {
2✔
367
                    Reservation::Accepted { pending_msgs, .. }
2✔
368
                    | Reservation::Renewing { pending_msgs, .. } => {
×
369
                        let src_peer_id = circuit.src_peer_id();
2✔
370
                        let limit = circuit.limit();
2✔
371

372
                        let connection = super::ConnectionState::new_inbound(circuit);
2✔
373

374
                        pending_msgs.push_back(
2✔
375
                            transport::ToListenerMsg::IncomingRelayedConnection {
2✔
376
                                stream: super::Connection { state: connection },
2✔
377
                                src_peer_id,
2✔
378
                                relay_peer_id: self.remote_peer_id,
2✔
379
                                relay_addr: self.remote_addr.clone(),
2✔
380
                            },
2✔
381
                        );
382
                        return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
2✔
383
                            Event::InboundCircuitEstablished { src_peer_id, limit },
2✔
384
                        ));
2✔
385
                    }
386
                    Reservation::None => {
387
                        self.insert_to_deny_futs(circuit);
×
388
                        continue;
×
389
                    }
390
                },
391
                Poll::Ready(Ok(Err(e))) => {
×
392
                    tracing::debug!("An inbound circuit request failed: {e}");
×
393
                    continue;
×
394
                }
395
                Poll::Ready(Err(e)) => {
×
396
                    tracing::debug!("An inbound circuit request timed out: {e}");
×
397
                    continue;
×
398
                }
399
                Poll::Pending => {}
238✔
400
            }
401

402
            if let Poll::Ready(Some(to_listener)) = self.reservation.poll(cx) {
238✔
403
                self.make_new_reservation(to_listener);
1✔
404
                continue;
1✔
405
            }
237✔
406

407
            // Deny incoming circuit requests.
408
            match self.inflight_outbound_circuit_deny_requests.poll_unpin(cx) {
237✔
409
                Poll::Ready(Ok(Ok(()))) => continue,
×
410
                Poll::Ready(Ok(Err(error))) => {
×
411
                    tracing::debug!("Denying inbound circuit failed: {error}");
×
412
                    continue;
×
413
                }
414
                Poll::Ready(Err(futures_bounded::Timeout { .. })) => {
415
                    tracing::debug!("Denying inbound circuit timed out");
×
416
                    continue;
×
417
                }
418
                Poll::Pending => {}
237✔
419
            }
420

421
            return Poll::Pending;
237✔
422
        }
423
    }
259✔
424

425
    fn on_connection_event(
26✔
426
        &mut self,
26✔
427
        event: ConnectionEvent<Self::InboundProtocol, Self::OutboundProtocol>,
26✔
428
    ) {
26✔
429
        match event {
26✔
430
            ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
431
                protocol: stream,
2✔
432
                ..
433
            }) => {
434
                if self
2✔
435
                    .inflight_inbound_circuit_requests
2✔
436
                    .try_push(inbound_stop::handle_open_circuit(stream))
2✔
437
                    .is_err()
2✔
438
                {
439
                    tracing::warn!("Dropping inbound stream because we are at capacity")
×
440
                }
2✔
441
            }
442
            ConnectionEvent::FullyNegotiatedOutbound(ev) => {
11✔
443
                if let Some(next) = self.pending_streams.pop_front() {
11✔
444
                    let _ = next.send(Ok(ev.protocol));
11✔
445
                }
11✔
446
            }
447
            ConnectionEvent::ListenUpgradeError(ev) => libp2p_core::util::unreachable(ev.error),
×
448
            ConnectionEvent::DialUpgradeError(ev) => {
×
449
                if let Some(next) = self.pending_streams.pop_front() {
×
450
                    let _ = next.send(Err(ev.error));
×
451
                }
×
452
            }
453
            _ => {}
13✔
454
        }
455
    }
26✔
456
}
457

458
enum Reservation {
459
    /// The Reservation is accepted by the relay.
460
    Accepted {
461
        renewal_timeout: Delay,
462
        /// Buffer of messages to be send to the transport listener.
463
        pending_msgs: VecDeque<transport::ToListenerMsg>,
464
        to_listener: mpsc::Sender<transport::ToListenerMsg>,
465
    },
466
    /// The reservation is being renewed with the relay.
467
    Renewing {
468
        /// Buffer of messages to be send to the transport listener.
469
        pending_msgs: VecDeque<transport::ToListenerMsg>,
470
    },
471
    None,
472
}
473

474
impl Reservation {
475
    fn accepted(
7✔
476
        &mut self,
7✔
477
        renewal_timeout: Delay,
7✔
478
        addrs: Vec<Multiaddr>,
7✔
479
        to_listener: mpsc::Sender<transport::ToListenerMsg>,
7✔
480
        local_peer_id: PeerId,
7✔
481
        limit: Option<protocol::Limit>,
7✔
482
    ) -> Event {
7✔
483
        let (renewal, mut pending_msgs) = match std::mem::replace(self, Self::None) {
7✔
484
            Reservation::Accepted { pending_msgs, .. }
1✔
485
            | Reservation::Renewing { pending_msgs, .. } => (true, pending_msgs),
2✔
486
            Reservation::None => (false, VecDeque::new()),
5✔
487
        };
488

489
        pending_msgs.push_back(transport::ToListenerMsg::Reservation(Ok(
7✔
490
            transport::Reservation {
491
                addrs: addrs
7✔
492
                    .into_iter()
7✔
493
                    .map(|a| {
7✔
494
                        a.with(Protocol::P2pCircuit)
7✔
495
                            .with(Protocol::P2p(local_peer_id))
7✔
496
                    })
7✔
497
                    .collect(),
7✔
498
            },
499
        )));
500

501
        *self = Reservation::Accepted {
7✔
502
            renewal_timeout,
7✔
503
            pending_msgs,
7✔
504
            to_listener,
7✔
505
        };
7✔
506

507
        Event::ReservationReqAccepted { renewal, limit }
7✔
508
    }
7✔
509

510
    fn is_some(&self) -> bool {
34✔
511
        matches!(self, Self::Accepted { .. } | Self::Renewing { .. })
34✔
512
    }
34✔
513

514
    /// Marks the current reservation as failed.
515
    fn failed(&mut self) {
1✔
516
        *self = Reservation::None;
1✔
517
    }
1✔
518

519
    fn forward_messages_to_transport_listener(&mut self, cx: &mut Context<'_>) {
238✔
520
        if let Reservation::Accepted {
521
            pending_msgs,
83✔
522
            to_listener,
83✔
523
            ..
524
        } = self
238✔
525
        {
526
            if !pending_msgs.is_empty() {
83✔
527
                match to_listener.poll_ready(cx) {
9✔
528
                    Poll::Ready(Ok(())) => {
529
                        if let Err(e) = to_listener
9✔
530
                            .start_send(pending_msgs.pop_front().expect("Called !is_empty()."))
9✔
531
                        {
532
                            tracing::debug!("Failed to sent pending message to listener: {:?}", e);
×
533
                            *self = Reservation::None;
×
534
                        }
9✔
535
                    }
536
                    Poll::Ready(Err(e)) => {
×
537
                        tracing::debug!("Channel to listener failed: {:?}", e);
×
538
                        *self = Reservation::None;
×
539
                    }
540
                    Poll::Pending => {}
×
541
                }
542
            }
74✔
543
        }
155✔
544
    }
238✔
545

546
    fn poll(
238✔
547
        &mut self,
238✔
548
        cx: &mut Context<'_>,
238✔
549
    ) -> Poll<Option<mpsc::Sender<transport::ToListenerMsg>>> {
238✔
550
        self.forward_messages_to_transport_listener(cx);
238✔
551

552
        // Check renewal timeout if any.
553
        let (next_reservation, poll_val) = match std::mem::replace(self, Reservation::None) {
238✔
554
            Reservation::Accepted {
555
                mut renewal_timeout,
83✔
556
                pending_msgs,
83✔
557
                to_listener,
83✔
558
            } => match renewal_timeout.poll_unpin(cx) {
83✔
559
                Poll::Ready(()) => (
1✔
560
                    Reservation::Renewing { pending_msgs },
1✔
561
                    Poll::Ready(Some(to_listener)),
1✔
562
                ),
1✔
563
                Poll::Pending => (
82✔
564
                    Reservation::Accepted {
82✔
565
                        renewal_timeout,
82✔
566
                        pending_msgs,
82✔
567
                        to_listener,
82✔
568
                    },
82✔
569
                    Poll::Pending,
82✔
570
                ),
82✔
571
            },
572
            r => (r, Poll::Pending),
155✔
573
        };
574
        *self = next_reservation;
238✔
575

576
        poll_val
238✔
577
    }
238✔
578
}
579

580
fn into_reserve_error(e: StreamUpgradeError<Infallible>) -> outbound_hop::ReserveError {
×
581
    match e {
×
582
        StreamUpgradeError::Timeout => {
583
            outbound_hop::ReserveError::Io(io::ErrorKind::TimedOut.into())
×
584
        }
585
        StreamUpgradeError::Apply(never) => libp2p_core::util::unreachable(never),
×
586
        StreamUpgradeError::NegotiationFailed => outbound_hop::ReserveError::Unsupported,
×
587
        StreamUpgradeError::Io(e) => outbound_hop::ReserveError::Io(e),
×
588
    }
589
}
×
590

591
fn into_connect_error(e: StreamUpgradeError<Infallible>) -> outbound_hop::ConnectError {
×
592
    match e {
×
593
        StreamUpgradeError::Timeout => {
594
            outbound_hop::ConnectError::Io(io::ErrorKind::TimedOut.into())
×
595
        }
596
        StreamUpgradeError::Apply(never) => libp2p_core::util::unreachable(never),
×
597
        StreamUpgradeError::NegotiationFailed => outbound_hop::ConnectError::Unsupported,
×
598
        StreamUpgradeError::Io(e) => outbound_hop::ConnectError::Io(e),
×
599
    }
600
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc