• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

butlergroup / rust-libp2p / 18610913338

18 Oct 2025 04:41AM UTC coverage: 78.379% (+2.5%) from 75.842%
18610913338

push

github

butlergroup
	modified:   .github/workflows/ci.yml

36944 of 47135 relevant lines covered (78.38%)

37728.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.86
/protocols/relay/src/behaviour.rs
1
// Copyright 2021 Protocol Labs.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a
4
// copy of this software and associated documentation files (the "Software"),
5
// to deal in the Software without restriction, including without limitation
6
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7
// and/or sell copies of the Software, and to permit persons to whom the
8
// Software is furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19
// DEALINGS IN THE SOFTWARE.
20

21
//! [`NetworkBehaviour`] to act as a circuit relay v2 **relay**.
22

23
pub(crate) mod handler;
24
pub(crate) mod rate_limiter;
25
use std::{
26
    collections::{hash_map, HashMap, HashSet, VecDeque},
27
    num::NonZeroU32,
28
    ops::Add,
29
    task::{Context, Poll},
30
    time::Duration,
31
};
32

33
use either::Either;
34
use libp2p_core::{multiaddr::Protocol, transport::PortUse, ConnectedPoint, Endpoint, Multiaddr};
35
use libp2p_identity::PeerId;
36
use libp2p_swarm::{
37
    behaviour::{ConnectionClosed, FromSwarm},
38
    dummy, ConnectionDenied, ConnectionId, ExternalAddresses, NetworkBehaviour, NotifyHandler,
39
    THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
40
};
41
use web_time::Instant;
42

43
use crate::{
44
    behaviour::handler::Handler,
45
    multiaddr_ext::MultiaddrExt,
46
    proto,
47
    protocol::{inbound_hop, outbound_stop},
48
};
49

50
/// Configuration for the relay [`Behaviour`].
51
///
52
/// # Panics
53
///
54
/// [`Config::max_circuit_duration`] may not exceed [`u32::MAX`].
55
pub struct Config {
56
    pub max_reservations: usize,
57
    pub max_reservations_per_peer: usize,
58
    pub reservation_duration: Duration,
59
    pub reservation_rate_limiters: Vec<Box<dyn rate_limiter::RateLimiter>>,
60

61
    pub max_circuits: usize,
62
    pub max_circuits_per_peer: usize,
63
    pub max_circuit_duration: Duration,
64
    pub max_circuit_bytes: u64,
65
    pub circuit_src_rate_limiters: Vec<Box<dyn rate_limiter::RateLimiter>>,
66
}
67

68
impl Config {
69
    pub fn reservation_rate_per_peer(mut self, limit: NonZeroU32, interval: Duration) -> Self {
×
70
        self.reservation_rate_limiters
×
71
            .push(rate_limiter::new_per_peer(
×
72
                rate_limiter::GenericRateLimiterConfig { limit, interval },
×
73
            ));
×
74
        self
×
75
    }
×
76

77
    pub fn circuit_src_per_peer(mut self, limit: NonZeroU32, interval: Duration) -> Self {
×
78
        self.circuit_src_rate_limiters
×
79
            .push(rate_limiter::new_per_peer(
×
80
                rate_limiter::GenericRateLimiterConfig { limit, interval },
×
81
            ));
×
82
        self
×
83
    }
×
84

85
    pub fn reservation_rate_per_ip(mut self, limit: NonZeroU32, interval: Duration) -> Self {
×
86
        self.reservation_rate_limiters
×
87
            .push(rate_limiter::new_per_ip(
×
88
                rate_limiter::GenericRateLimiterConfig { limit, interval },
×
89
            ));
×
90
        self
×
91
    }
×
92

93
    pub fn circuit_src_per_ip(mut self, limit: NonZeroU32, interval: Duration) -> Self {
×
94
        self.circuit_src_rate_limiters
×
95
            .push(rate_limiter::new_per_ip(
×
96
                rate_limiter::GenericRateLimiterConfig { limit, interval },
×
97
            ));
×
98
        self
×
99
    }
×
100
}
101

102
impl std::fmt::Debug for Config {
103
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
104
        f.debug_struct("Config")
×
105
            .field("max_reservations", &self.max_reservations)
×
106
            .field("max_reservations_per_peer", &self.max_reservations_per_peer)
×
107
            .field("reservation_duration", &self.reservation_duration)
×
108
            .field(
×
109
                "reservation_rate_limiters",
×
110
                &format!("[{} rate limiters]", self.reservation_rate_limiters.len()),
×
111
            )
×
112
            .field("max_circuits", &self.max_circuits)
×
113
            .field("max_circuits_per_peer", &self.max_circuits_per_peer)
×
114
            .field("max_circuit_duration", &self.max_circuit_duration)
×
115
            .field("max_circuit_bytes", &self.max_circuit_bytes)
×
116
            .field(
×
117
                "circuit_src_rate_limiters",
×
118
                &format!("[{} rate limiters]", self.circuit_src_rate_limiters.len()),
×
119
            )
×
120
            .finish()
×
121
    }
×
122
}
123

124
impl Default for Config {
125
    fn default() -> Self {
7✔
126
        let reservation_rate_limiters = vec![
7✔
127
            // For each peer ID one reservation every 2 minutes with up
128
            // to 30 reservations per hour.
129
            rate_limiter::new_per_peer(rate_limiter::GenericRateLimiterConfig {
7✔
130
                limit: NonZeroU32::new(30).expect("30 > 0"),
7✔
131
                interval: Duration::from_secs(60 * 2),
7✔
132
            }),
7✔
133
            // For each IP address one reservation every minute with up
134
            // to 60 reservations per hour.
135
            rate_limiter::new_per_ip(rate_limiter::GenericRateLimiterConfig {
7✔
136
                limit: NonZeroU32::new(60).expect("60 > 0"),
7✔
137
                interval: Duration::from_secs(60),
7✔
138
            }),
7✔
139
        ];
140

141
        let circuit_src_rate_limiters = vec![
7✔
142
            // For each source peer ID one circuit every 2 minute with up to 30 circuits per hour.
143
            rate_limiter::new_per_peer(rate_limiter::GenericRateLimiterConfig {
7✔
144
                limit: NonZeroU32::new(30).expect("30 > 0"),
7✔
145
                interval: Duration::from_secs(60 * 2),
7✔
146
            }),
7✔
147
            // For each source IP address one circuit every minute with up to 60 circuits per hour.
148
            rate_limiter::new_per_ip(rate_limiter::GenericRateLimiterConfig {
7✔
149
                limit: NonZeroU32::new(60).expect("60 > 0"),
7✔
150
                interval: Duration::from_secs(60),
7✔
151
            }),
7✔
152
        ];
153

154
        Config {
7✔
155
            max_reservations: 128,
7✔
156
            max_reservations_per_peer: 4,
7✔
157
            reservation_duration: Duration::from_secs(60 * 60),
7✔
158
            reservation_rate_limiters,
7✔
159

7✔
160
            max_circuits: 16,
7✔
161
            max_circuits_per_peer: 4,
7✔
162
            max_circuit_duration: Duration::from_secs(2 * 60),
7✔
163
            max_circuit_bytes: 1 << 17, // 128 kibibyte
7✔
164
            circuit_src_rate_limiters,
7✔
165
        }
7✔
166
    }
7✔
167
}
168

169
/// The events produced by the relay `Behaviour`.
170
#[derive(Debug)]
171
pub enum Event {
172
    /// An inbound reservation request has been accepted.
173
    ReservationReqAccepted {
174
        src_peer_id: PeerId,
175
        /// Indicates whether the request replaces an existing reservation.
176
        renewed: bool,
177
    },
178
    /// Accepting an inbound reservation request failed.
179
    #[deprecated(
180
        note = "Will be removed in favor of logging them internally, see <https://github.com/libp2p/rust-libp2p/issues/4757> for details."
181
    )]
182
    ReservationReqAcceptFailed {
183
        src_peer_id: PeerId,
184
        error: inbound_hop::Error,
185
    },
186
    /// An inbound reservation request has been denied.
187
    ReservationReqDenied {
188
        src_peer_id: PeerId,
189
        status: StatusCode,
190
    },
191
    /// Denying an inbound reservation request has failed.
192
    #[deprecated(
193
        note = "Will be removed in favor of logging them internally, see <https://github.com/libp2p/rust-libp2p/issues/4757> for details."
194
    )]
195
    ReservationReqDenyFailed {
196
        src_peer_id: PeerId,
197
        error: inbound_hop::Error,
198
    },
199
    /// A reservation has been closed.
200
    ReservationClosed { src_peer_id: PeerId },
201
    /// An inbound reservation has timed out.
202
    ReservationTimedOut { src_peer_id: PeerId },
203
    /// An inbound circuit request has been denied.
204
    CircuitReqDenied {
205
        src_peer_id: PeerId,
206
        dst_peer_id: PeerId,
207
        status: StatusCode,
208
    },
209
    /// Denying an inbound circuit request failed.
210
    #[deprecated(
211
        note = "Will be removed in favor of logging them internally, see <https://github.com/libp2p/rust-libp2p/issues/4757> for details."
212
    )]
213
    CircuitReqDenyFailed {
214
        src_peer_id: PeerId,
215
        dst_peer_id: PeerId,
216
        error: inbound_hop::Error,
217
    },
218
    /// An inbound circuit request has been accepted.
219
    CircuitReqAccepted {
220
        src_peer_id: PeerId,
221
        dst_peer_id: PeerId,
222
    },
223
    /// An outbound connect for an inbound circuit request failed.
224
    #[deprecated(
225
        note = "Will be removed in favor of logging them internally, see <https://github.com/libp2p/rust-libp2p/issues/4757> for details."
226
    )]
227
    CircuitReqOutboundConnectFailed {
228
        src_peer_id: PeerId,
229
        dst_peer_id: PeerId,
230
        error: outbound_stop::Error,
231
    },
232
    /// Accepting an inbound circuit request failed.
233
    #[deprecated(
234
        note = "Will be removed in favor of logging them internally, see <https://github.com/libp2p/rust-libp2p/issues/4757> for details."
235
    )]
236
    CircuitReqAcceptFailed {
237
        src_peer_id: PeerId,
238
        dst_peer_id: PeerId,
239
        error: inbound_hop::Error,
240
    },
241
    /// An inbound circuit has closed.
242
    CircuitClosed {
243
        src_peer_id: PeerId,
244
        dst_peer_id: PeerId,
245
        error: Option<std::io::Error>,
246
    },
247
}
248

249
/// [`NetworkBehaviour`] implementation of the relay server
250
/// functionality of the circuit relay v2 protocol.
251
pub struct Behaviour {
252
    config: Config,
253

254
    local_peer_id: PeerId,
255

256
    reservations: HashMap<PeerId, HashSet<ConnectionId>>,
257
    circuits: CircuitsTracker,
258

259
    /// Queue of actions to return when polled.
260
    queued_actions: VecDeque<ToSwarm<Event, THandlerInEvent<Self>>>,
261

262
    external_addresses: ExternalAddresses,
263
}
264

265
impl Behaviour {
266
    pub fn new(local_peer_id: PeerId, config: Config) -> Self {
7✔
267
        Self {
7✔
268
            config,
7✔
269
            local_peer_id,
7✔
270
            reservations: Default::default(),
7✔
271
            circuits: Default::default(),
7✔
272
            queued_actions: Default::default(),
7✔
273
            external_addresses: Default::default(),
7✔
274
        }
7✔
275
    }
7✔
276

277
    fn on_connection_closed(
×
278
        &mut self,
×
279
        ConnectionClosed {
×
280
            peer_id,
×
281
            connection_id,
×
282
            ..
×
283
        }: ConnectionClosed,
×
284
    ) {
×
285
        if let hash_map::Entry::Occupied(mut peer) = self.reservations.entry(peer_id) {
×
286
            if peer.get_mut().remove(&connection_id) {
×
287
                self.queued_actions
×
288
                    .push_back(ToSwarm::GenerateEvent(Event::ReservationClosed {
×
289
                        src_peer_id: peer_id,
×
290
                    }));
×
291
            }
×
292
            if peer.get().is_empty() {
×
293
                peer.remove();
×
294
            }
×
295
        }
×
296

297
        for circuit in self
×
298
            .circuits
×
299
            .remove_by_connection(peer_id, connection_id)
×
300
            .iter()
×
301
            // Only emit [`CircuitClosed`] for accepted requests.
302
            .filter(|c| matches!(c.status, CircuitStatus::Accepted))
×
303
        {
×
304
            self.queued_actions
×
305
                .push_back(ToSwarm::GenerateEvent(Event::CircuitClosed {
×
306
                    src_peer_id: circuit.src_peer_id,
×
307
                    dst_peer_id: circuit.dst_peer_id,
×
308
                    error: Some(std::io::ErrorKind::ConnectionAborted.into()),
×
309
                }));
×
310
        }
×
311
    }
×
312
}
313

314
impl NetworkBehaviour for Behaviour {
315
    type ConnectionHandler = Either<Handler, dummy::ConnectionHandler>;
316
    type ToSwarm = Event;
317

318
    fn handle_established_inbound_connection(
9✔
319
        &mut self,
9✔
320
        _: ConnectionId,
9✔
321
        _: PeerId,
9✔
322
        local_addr: &Multiaddr,
9✔
323
        remote_addr: &Multiaddr,
9✔
324
    ) -> Result<THandler<Self>, ConnectionDenied> {
9✔
325
        if local_addr.is_relayed() {
9✔
326
            // Deny all substreams on relayed connection.
327
            return Ok(Either::Right(dummy::ConnectionHandler));
×
328
        }
9✔
329

330
        Ok(Either::Left(Handler::new(
9✔
331
            handler::Config {
9✔
332
                reservation_duration: self.config.reservation_duration,
9✔
333
                max_circuit_duration: self.config.max_circuit_duration,
9✔
334
                max_circuit_bytes: self.config.max_circuit_bytes,
9✔
335
            },
9✔
336
            ConnectedPoint::Listener {
9✔
337
                local_addr: local_addr.clone(),
9✔
338
                send_back_addr: remote_addr.clone(),
9✔
339
            },
9✔
340
        )))
9✔
341
    }
9✔
342

343
    fn handle_established_outbound_connection(
×
344
        &mut self,
×
345
        _: ConnectionId,
×
346
        _: PeerId,
×
347
        addr: &Multiaddr,
×
348
        role_override: Endpoint,
×
349
        port_use: PortUse,
×
350
    ) -> Result<THandler<Self>, ConnectionDenied> {
×
351
        if addr.is_relayed() {
×
352
            // Deny all substreams on relayed connection.
353
            return Ok(Either::Right(dummy::ConnectionHandler));
×
354
        }
×
355

356
        Ok(Either::Left(Handler::new(
×
357
            handler::Config {
×
358
                reservation_duration: self.config.reservation_duration,
×
359
                max_circuit_duration: self.config.max_circuit_duration,
×
360
                max_circuit_bytes: self.config.max_circuit_bytes,
×
361
            },
×
362
            ConnectedPoint::Dialer {
×
363
                address: addr.clone(),
×
364
                role_override,
×
365
                port_use,
×
366
            },
×
367
        )))
×
368
    }
×
369

370
    fn on_swarm_event(&mut self, event: FromSwarm) {
36✔
371
        self.external_addresses.on_swarm_event(&event);
36✔
372

373
        if let FromSwarm::ConnectionClosed(connection_closed) = event {
36✔
374
            self.on_connection_closed(connection_closed)
×
375
        }
36✔
376
    }
36✔
377

378
    fn on_connection_handler_event(
25✔
379
        &mut self,
25✔
380
        event_source: PeerId,
25✔
381
        connection: ConnectionId,
25✔
382
        event: THandlerOutEvent<Self>,
25✔
383
    ) {
25✔
384
        let event = match event {
25✔
385
            Either::Left(e) => e,
25✔
386
            Either::Right(v) => libp2p_core::util::unreachable(v),
387
        };
388

389
        match event {
25✔
390
            handler::Event::ReservationReqReceived {
391
                inbound_reservation_req,
8✔
392
                endpoint,
8✔
393
                renewed,
8✔
394
            } => {
395
                let now = Instant::now();
8✔
396

397
                assert!(
8✔
398
                    !endpoint.is_relayed(),
8✔
399
                    "`dummy::ConnectionHandler` handles relayed connections. It \
×
400
                     denies all inbound substreams."
×
401
                );
402

403
                let action = if
8✔
404
                // Deny if it is a new reservation and exceeds
405
                // `max_reservations_per_peer`.
406
                (!renewed
8✔
407
                    && self
7✔
408
                        .reservations
7✔
409
                        .get(&event_source)
7✔
410
                        .map(|cs| cs.len())
7✔
411
                        .unwrap_or(0)
7✔
412
                        > self.config.max_reservations_per_peer)
7✔
413
                    // Deny if it exceeds `max_reservations`.
414
                    || self
8✔
415
                        .reservations
8✔
416
                        .values()
8✔
417
                        .map(|cs| cs.len())
8✔
418
                        .sum::<usize>()
8✔
419
                        >= self.config.max_reservations
8✔
420
                    // Deny if it exceeds the allowed rate of reservations.
421
                    || !self
7✔
422
                        .config
7✔
423
                        .reservation_rate_limiters
7✔
424
                        .iter_mut()
7✔
425
                        .all(|limiter| {
14✔
426
                            limiter.try_next(event_source, endpoint.get_remote_address(), now)
14✔
427
                        }) {
14✔
428
                    ToSwarm::NotifyHandler {
1✔
429
                        handler: NotifyHandler::One(connection),
1✔
430
                        peer_id: event_source,
1✔
431
                        event: Either::Left(handler::In::DenyReservationReq {
1✔
432
                            inbound_reservation_req,
1✔
433
                            status: proto::Status::RESOURCE_LIMIT_EXCEEDED,
1✔
434
                        }),
1✔
435
                    }
1✔
436
                } else {
437
                    // Accept reservation.
438
                    self.reservations
7✔
439
                        .entry(event_source)
7✔
440
                        .or_default()
7✔
441
                        .insert(connection);
7✔
442

443
                    ToSwarm::NotifyHandler {
444
                        handler: NotifyHandler::One(connection),
7✔
445
                        peer_id: event_source,
7✔
446
                        event: Either::Left(handler::In::AcceptReservationReq {
447
                            inbound_reservation_req,
7✔
448
                            addrs: self
7✔
449
                                .external_addresses
7✔
450
                                .iter()
7✔
451
                                .cloned()
7✔
452
                                // Add local peer ID in case it isn't present yet.
453
                                .filter_map(|a| match a.iter().last()? {
7✔
454
                                    Protocol::P2p(_) => Some(a),
×
455
                                    _ => Some(a.with(Protocol::P2p(self.local_peer_id))),
7✔
456
                                })
7✔
457
                                .collect(),
7✔
458
                        }),
459
                    }
460
                };
461

462
                self.queued_actions.push_back(action);
8✔
463
            }
464
            handler::Event::ReservationReqAccepted { renewed } => {
7✔
465
                // Ensure local eventual consistent reservation state matches handler (source of
7✔
466
                // truth).
7✔
467
                self.reservations
7✔
468
                    .entry(event_source)
7✔
469
                    .or_default()
7✔
470
                    .insert(connection);
7✔
471

7✔
472
                self.queued_actions.push_back(ToSwarm::GenerateEvent(
7✔
473
                    Event::ReservationReqAccepted {
7✔
474
                        src_peer_id: event_source,
7✔
475
                        renewed,
7✔
476
                    },
7✔
477
                ));
7✔
478
            }
7✔
479
            handler::Event::ReservationReqAcceptFailed { error } => {
×
480
                #[allow(deprecated)]
×
481
                self.queued_actions.push_back(ToSwarm::GenerateEvent(
×
482
                    Event::ReservationReqAcceptFailed {
×
483
                        src_peer_id: event_source,
×
484
                        error,
×
485
                    },
×
486
                ));
×
487
            }
×
488
            handler::Event::ReservationReqDenied { status } => {
1✔
489
                self.queued_actions.push_back(ToSwarm::GenerateEvent(
1✔
490
                    Event::ReservationReqDenied {
1✔
491
                        src_peer_id: event_source,
1✔
492
                        status: status.into(),
1✔
493
                    },
1✔
494
                ));
1✔
495
            }
1✔
496
            handler::Event::ReservationReqDenyFailed { error } => {
×
497
                #[allow(deprecated)]
×
498
                self.queued_actions.push_back(ToSwarm::GenerateEvent(
×
499
                    Event::ReservationReqDenyFailed {
×
500
                        src_peer_id: event_source,
×
501
                        error,
×
502
                    },
×
503
                ));
×
504
            }
×
505
            handler::Event::ReservationTimedOut {} => {
506
                match self.reservations.entry(event_source) {
1✔
507
                    hash_map::Entry::Occupied(mut peer) => {
1✔
508
                        peer.get_mut().remove(&connection);
1✔
509
                        if peer.get().is_empty() {
1✔
510
                            peer.remove();
1✔
511
                        }
1✔
512
                    }
513
                    hash_map::Entry::Vacant(_) => {
514
                        unreachable!(
×
515
                            "Expect to track timed out reservation with peer {:?} on connection {:?}",
516
                            event_source,
517
                            connection,
518
                        );
519
                    }
520
                }
521

522
                self.queued_actions
1✔
523
                    .push_back(ToSwarm::GenerateEvent(Event::ReservationTimedOut {
1✔
524
                        src_peer_id: event_source,
1✔
525
                    }));
1✔
526
            }
527
            handler::Event::CircuitReqReceived {
528
                inbound_circuit_req,
3✔
529
                endpoint,
3✔
530
            } => {
531
                let now = Instant::now();
3✔
532

533
                assert!(
3✔
534
                    !endpoint.is_relayed(),
3✔
535
                    "`dummy::ConnectionHandler` handles relayed connections. It \
×
536
                     denies all inbound substreams."
×
537
                );
538

539
                let action = if self.circuits.num_circuits_of_peer(event_source)
3✔
540
                    > self.config.max_circuits_per_peer
3✔
541
                    || self.circuits.len() >= self.config.max_circuits
3✔
542
                    || !self
3✔
543
                        .config
3✔
544
                        .circuit_src_rate_limiters
3✔
545
                        .iter_mut()
3✔
546
                        .all(|limiter| {
6✔
547
                            limiter.try_next(event_source, endpoint.get_remote_address(), now)
6✔
548
                        }) {
6✔
549
                    // Deny circuit exceeding limits.
550
                    ToSwarm::NotifyHandler {
×
551
                        handler: NotifyHandler::One(connection),
×
552
                        peer_id: event_source,
×
553
                        event: Either::Left(handler::In::DenyCircuitReq {
×
554
                            circuit_id: None,
×
555
                            inbound_circuit_req,
×
556
                            status: proto::Status::RESOURCE_LIMIT_EXCEEDED,
×
557
                        }),
×
558
                    }
×
559
                } else if let Some(dst_conn) = self
3✔
560
                    .reservations
3✔
561
                    .get(&inbound_circuit_req.dst())
3✔
562
                    .and_then(|cs| cs.iter().next())
3✔
563
                {
564
                    // Accept circuit request if reservation present.
565
                    let circuit_id = self.circuits.insert(Circuit {
2✔
566
                        status: CircuitStatus::Accepting,
2✔
567
                        src_peer_id: event_source,
2✔
568
                        src_connection_id: connection,
2✔
569
                        dst_peer_id: inbound_circuit_req.dst(),
2✔
570
                        dst_connection_id: *dst_conn,
2✔
571
                    });
2✔
572

573
                    ToSwarm::NotifyHandler {
2✔
574
                        handler: NotifyHandler::One(*dst_conn),
2✔
575
                        peer_id: event_source,
2✔
576
                        event: Either::Left(handler::In::NegotiateOutboundConnect {
2✔
577
                            circuit_id,
2✔
578
                            inbound_circuit_req,
2✔
579
                            src_peer_id: event_source,
2✔
580
                            src_connection_id: connection,
2✔
581
                        }),
2✔
582
                    }
2✔
583
                } else {
584
                    // Deny circuit request if no reservation present.
585
                    ToSwarm::NotifyHandler {
1✔
586
                        handler: NotifyHandler::One(connection),
1✔
587
                        peer_id: event_source,
1✔
588
                        event: Either::Left(handler::In::DenyCircuitReq {
1✔
589
                            circuit_id: None,
1✔
590
                            inbound_circuit_req,
1✔
591
                            status: proto::Status::NO_RESERVATION,
1✔
592
                        }),
1✔
593
                    }
1✔
594
                };
595
                self.queued_actions.push_back(action);
3✔
596
            }
597
            handler::Event::CircuitReqDenied {
598
                circuit_id,
1✔
599
                dst_peer_id,
1✔
600
                status,
1✔
601
            } => {
602
                if let Some(circuit_id) = circuit_id {
1✔
603
                    self.circuits.remove(circuit_id);
×
604
                }
1✔
605

606
                self.queued_actions
1✔
607
                    .push_back(ToSwarm::GenerateEvent(Event::CircuitReqDenied {
1✔
608
                        src_peer_id: event_source,
1✔
609
                        dst_peer_id,
1✔
610
                        status: status.into(),
1✔
611
                    }));
1✔
612
            }
613
            handler::Event::CircuitReqDenyFailed {
614
                circuit_id,
×
615
                dst_peer_id,
×
616
                error,
×
617
            } => {
618
                if let Some(circuit_id) = circuit_id {
×
619
                    self.circuits.remove(circuit_id);
×
620
                }
×
621

622
                #[allow(deprecated)]
623
                self.queued_actions.push_back(ToSwarm::GenerateEvent(
×
624
                    Event::CircuitReqDenyFailed {
×
625
                        src_peer_id: event_source,
×
626
                        dst_peer_id,
×
627
                        error,
×
628
                    },
×
629
                ));
×
630
            }
631
            handler::Event::OutboundConnectNegotiated {
632
                circuit_id,
2✔
633
                src_peer_id,
2✔
634
                src_connection_id,
2✔
635
                inbound_circuit_req,
2✔
636
                dst_stream,
2✔
637
                dst_pending_data,
2✔
638
            } => {
2✔
639
                self.queued_actions.push_back(ToSwarm::NotifyHandler {
2✔
640
                    handler: NotifyHandler::One(src_connection_id),
2✔
641
                    peer_id: src_peer_id,
2✔
642
                    event: Either::Left(handler::In::AcceptAndDriveCircuit {
2✔
643
                        circuit_id,
2✔
644
                        dst_peer_id: event_source,
2✔
645
                        inbound_circuit_req,
2✔
646
                        dst_stream,
2✔
647
                        dst_pending_data,
2✔
648
                    }),
2✔
649
                });
2✔
650
            }
2✔
651
            handler::Event::OutboundConnectNegotiationFailed {
652
                circuit_id,
×
653
                src_peer_id,
×
654
                src_connection_id,
×
655
                inbound_circuit_req,
×
656
                status,
×
657
                error,
×
658
            } => {
×
659
                self.queued_actions.push_back(ToSwarm::NotifyHandler {
×
660
                    handler: NotifyHandler::One(src_connection_id),
×
661
                    peer_id: src_peer_id,
×
662
                    event: Either::Left(handler::In::DenyCircuitReq {
×
663
                        circuit_id: Some(circuit_id),
×
664
                        inbound_circuit_req,
×
665
                        status,
×
666
                    }),
×
667
                });
×
668
                #[allow(deprecated)]
×
669
                self.queued_actions.push_back(ToSwarm::GenerateEvent(
×
670
                    Event::CircuitReqOutboundConnectFailed {
×
671
                        src_peer_id,
×
672
                        dst_peer_id: event_source,
×
673
                        error,
×
674
                    },
×
675
                ));
×
676
            }
×
677
            handler::Event::CircuitReqAccepted {
678
                dst_peer_id,
2✔
679
                circuit_id,
2✔
680
            } => {
2✔
681
                self.circuits.accepted(circuit_id);
2✔
682
                self.queued_actions
2✔
683
                    .push_back(ToSwarm::GenerateEvent(Event::CircuitReqAccepted {
2✔
684
                        src_peer_id: event_source,
2✔
685
                        dst_peer_id,
2✔
686
                    }));
2✔
687
            }
2✔
688
            handler::Event::CircuitReqAcceptFailed {
689
                dst_peer_id,
×
690
                circuit_id,
×
691
                error,
×
692
            } => {
×
693
                self.circuits.remove(circuit_id);
×
694
                #[allow(deprecated)]
×
695
                self.queued_actions.push_back(ToSwarm::GenerateEvent(
×
696
                    Event::CircuitReqAcceptFailed {
×
697
                        src_peer_id: event_source,
×
698
                        dst_peer_id,
×
699
                        error,
×
700
                    },
×
701
                ));
×
702
            }
×
703
            handler::Event::CircuitClosed {
704
                dst_peer_id,
×
705
                circuit_id,
×
706
                error,
×
707
            } => {
×
708
                self.circuits.remove(circuit_id);
×
709

×
710
                self.queued_actions
×
711
                    .push_back(ToSwarm::GenerateEvent(Event::CircuitClosed {
×
712
                        src_peer_id: event_source,
×
713
                        dst_peer_id,
×
714
                        error,
×
715
                    }));
×
716
            }
×
717
        }
718
    }
25✔
719

720
    #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self))]
721
    fn poll(&mut self, _: &mut Context<'_>) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
186✔
722
        if let Some(to_swarm) = self.queued_actions.pop_front() {
186✔
723
            return Poll::Ready(to_swarm);
25✔
724
        }
161✔
725

726
        Poll::Pending
161✔
727
    }
186✔
728
}
729

730
#[derive(Default)]
731
struct CircuitsTracker {
732
    next_id: CircuitId,
733
    circuits: HashMap<CircuitId, Circuit>,
734
}
735

736
impl CircuitsTracker {
737
    fn len(&self) -> usize {
3✔
738
        self.circuits.len()
3✔
739
    }
3✔
740

741
    fn insert(&mut self, circuit: Circuit) -> CircuitId {
2✔
742
        let id = self.next_id;
2✔
743
        self.next_id = self.next_id + 1;
2✔
744

745
        self.circuits.insert(id, circuit);
2✔
746

747
        id
2✔
748
    }
2✔
749

750
    fn accepted(&mut self, circuit_id: CircuitId) {
2✔
751
        if let Some(c) = self.circuits.get_mut(&circuit_id) {
2✔
752
            c.status = CircuitStatus::Accepted;
2✔
753
        };
2✔
754
    }
2✔
755

756
    fn remove(&mut self, circuit_id: CircuitId) -> Option<Circuit> {
×
757
        self.circuits.remove(&circuit_id)
×
758
    }
×
759

760
    fn remove_by_connection(
×
761
        &mut self,
×
762
        peer_id: PeerId,
×
763
        connection_id: ConnectionId,
×
764
    ) -> Vec<Circuit> {
×
765
        let mut removed = vec![];
×
766

767
        self.circuits.retain(|_circuit_id, circuit| {
×
768
            let is_src =
×
769
                circuit.src_peer_id == peer_id && circuit.src_connection_id == connection_id;
×
770
            let is_dst =
×
771
                circuit.dst_peer_id == peer_id && circuit.dst_connection_id == connection_id;
×
772

773
            if is_src || is_dst {
×
774
                removed.push(circuit.clone());
×
775
                // Remove circuit from HashMap.
776
                false
×
777
            } else {
778
                // Retain circuit in HashMap.
779
                true
×
780
            }
781
        });
×
782

783
        removed
×
784
    }
×
785

786
    fn num_circuits_of_peer(&self, peer: PeerId) -> usize {
3✔
787
        self.circuits
3✔
788
            .iter()
3✔
789
            .filter(|(_, c)| c.src_peer_id == peer || c.dst_peer_id == peer)
3✔
790
            .count()
3✔
791
    }
3✔
792
}
793

794
#[derive(Clone)]
795
struct Circuit {
796
    src_peer_id: PeerId,
797
    src_connection_id: ConnectionId,
798
    dst_peer_id: PeerId,
799
    dst_connection_id: ConnectionId,
800
    status: CircuitStatus,
801
}
802

803
#[derive(Clone)]
804
enum CircuitStatus {
805
    Accepting,
806
    Accepted,
807
}
808

809
#[derive(Default, Clone, Copy, Debug, Hash, Eq, PartialEq)]
810
pub struct CircuitId(u64);
811

812
impl Add<u64> for CircuitId {
813
    type Output = CircuitId;
814

815
    fn add(self, rhs: u64) -> Self {
2✔
816
        CircuitId(self.0 + rhs)
2✔
817
    }
2✔
818
}
819

820
/// Status code for a relay reservation request that was denied.
821
#[derive(Debug)]
822
pub enum StatusCode {
823
    OK,
824
    ReservationRefused,
825
    ResourceLimitExceeded,
826
    PermissionDenied,
827
    ConnectionFailed,
828
    NoReservation,
829
    MalformedMessage,
830
    UnexpectedMessage,
831
}
832

833
impl From<proto::Status> for StatusCode {
834
    fn from(other: proto::Status) -> Self {
2✔
835
        match other {
2✔
836
            proto::Status::OK => Self::OK,
×
837
            proto::Status::RESERVATION_REFUSED => Self::ReservationRefused,
×
838
            proto::Status::RESOURCE_LIMIT_EXCEEDED => Self::ResourceLimitExceeded,
1✔
839
            proto::Status::PERMISSION_DENIED => Self::PermissionDenied,
×
840
            proto::Status::CONNECTION_FAILED => Self::ConnectionFailed,
×
841
            proto::Status::NO_RESERVATION => Self::NoReservation,
1✔
842
            proto::Status::MALFORMED_MESSAGE => Self::MalformedMessage,
×
843
            proto::Status::UNEXPECTED_MESSAGE => Self::UnexpectedMessage,
×
844
        }
845
    }
2✔
846
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc