• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

butlergroup / rust-libp2p / 18610913338

18 Oct 2025 04:41AM UTC coverage: 78.379% (+2.5%) from 75.842%
18610913338

push

github

butlergroup
	modified:   .github/workflows/ci.yml

36944 of 47135 relevant lines covered (78.38%)

37728.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.47
/protocols/relay/src/priv_client.rs
1
// Copyright 2021 Protocol Labs.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a
4
// copy of this software and associated documentation files (the "Software"),
5
// to deal in the Software without restriction, including without limitation
6
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7
// and/or sell copies of the Software, and to permit persons to whom the
8
// Software is furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19
// DEALINGS IN THE SOFTWARE.
20

21
//! [`NetworkBehaviour`] to act as a circuit relay v2 **client**.
22

23
pub(crate) mod handler;
24
pub(crate) mod transport;
25

26
use std::{
27
    collections::{hash_map, HashMap, VecDeque},
28
    convert::Infallible,
29
    io::{Error, IoSlice},
30
    pin::Pin,
31
    task::{Context, Poll},
32
};
33

34
use bytes::Bytes;
35
use either::Either;
36
use futures::{
37
    channel::mpsc::Receiver,
38
    future::{BoxFuture, FutureExt},
39
    io::{AsyncRead, AsyncWrite},
40
    ready,
41
    stream::StreamExt,
42
};
43
use libp2p_core::{multiaddr::Protocol, transport::PortUse, Endpoint, Multiaddr};
44
use libp2p_identity::PeerId;
45
use libp2p_swarm::{
46
    behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm},
47
    dial_opts::DialOpts,
48
    dummy, ConnectionDenied, ConnectionHandler, ConnectionId, DialFailure, NetworkBehaviour,
49
    NotifyHandler, Stream, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
50
};
51
use transport::Transport;
52

53
use crate::{
54
    multiaddr_ext::MultiaddrExt,
55
    priv_client::handler::Handler,
56
    protocol::{self, inbound_stop},
57
};
58

59
/// The events produced by the client `Behaviour`.
60
#[derive(Debug)]
61
pub enum Event {
62
    /// An outbound reservation has been accepted.
63
    ReservationReqAccepted {
64
        relay_peer_id: PeerId,
65
        /// Indicates whether the request replaces an existing reservation.
66
        renewal: bool,
67
        limit: Option<protocol::Limit>,
68
    },
69
    OutboundCircuitEstablished {
70
        relay_peer_id: PeerId,
71
        limit: Option<protocol::Limit>,
72
    },
73
    /// An inbound circuit has been established.
74
    InboundCircuitEstablished {
75
        src_peer_id: PeerId,
76
        limit: Option<protocol::Limit>,
77
    },
78
}
79

80
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
81
enum ReservationStatus {
82
    Pending,
83
    Confirmed,
84
}
85

86
/// [`NetworkBehaviour`] implementation of the relay client
87
/// functionality of the circuit relay v2 protocol.
88
pub struct Behaviour {
89
    local_peer_id: PeerId,
90

91
    from_transport: Receiver<transport::TransportToBehaviourMsg>,
92
    /// Set of directly connected peers, i.e. not connected via a relayed
93
    /// connection.
94
    directly_connected_peers: HashMap<PeerId, Vec<ConnectionId>>,
95

96
    /// Stores the address of a pending or confirmed reservation.
97
    ///
98
    /// This is indexed by the [`ConnectionId`] to a relay server and the address is the
99
    /// `/p2p-circuit` address we reserved on it.
100
    reservation_addresses: HashMap<ConnectionId, (Multiaddr, ReservationStatus)>,
101

102
    /// Queue of actions to return when polled.
103
    queued_actions: VecDeque<ToSwarm<Event, Either<handler::In, Infallible>>>,
104

105
    pending_handler_commands: HashMap<ConnectionId, handler::In>,
106
}
107

108
/// Create a new client relay [`Behaviour`] with it's corresponding [`Transport`].
109
pub fn new(local_peer_id: PeerId) -> (Transport, Behaviour) {
10✔
110
    let (transport, from_transport) = Transport::new();
10✔
111
    let behaviour = Behaviour {
10✔
112
        local_peer_id,
10✔
113
        from_transport,
10✔
114
        directly_connected_peers: Default::default(),
10✔
115
        reservation_addresses: Default::default(),
10✔
116
        queued_actions: Default::default(),
10✔
117
        pending_handler_commands: Default::default(),
10✔
118
    };
10✔
119
    (transport, behaviour)
10✔
120
}
10✔
121

122
impl Behaviour {
123
    fn on_connection_closed(
×
124
        &mut self,
×
125
        ConnectionClosed {
×
126
            peer_id,
×
127
            connection_id,
×
128
            endpoint,
×
129
            ..
×
130
        }: ConnectionClosed,
×
131
    ) {
×
132
        if !endpoint.is_relayed() {
×
133
            match self.directly_connected_peers.entry(peer_id) {
×
134
                hash_map::Entry::Occupied(mut connections) => {
×
135
                    let position = connections
×
136
                        .get()
×
137
                        .iter()
×
138
                        .position(|c| c == &connection_id)
×
139
                        .expect("Connection to be known.");
×
140
                    connections.get_mut().remove(position);
×
141

142
                    if connections.get().is_empty() {
×
143
                        connections.remove();
×
144
                    }
×
145
                }
146
                hash_map::Entry::Vacant(_) => {
147
                    unreachable!("`on_connection_closed` for unconnected peer.")
×
148
                }
149
            };
150
            if let Some((addr, ReservationStatus::Confirmed)) =
×
151
                self.reservation_addresses.remove(&connection_id)
×
152
            {
×
153
                self.queued_actions
×
154
                    .push_back(ToSwarm::ExternalAddrExpired(addr));
×
155
            }
×
156
        }
×
157
    }
×
158
}
159

160
impl NetworkBehaviour for Behaviour {
161
    type ConnectionHandler = Either<Handler, dummy::ConnectionHandler>;
162
    type ToSwarm = Event;
163

164
    fn handle_established_inbound_connection(
3✔
165
        &mut self,
3✔
166
        connection_id: ConnectionId,
3✔
167
        peer: PeerId,
3✔
168
        local_addr: &Multiaddr,
3✔
169
        remote_addr: &Multiaddr,
3✔
170
    ) -> Result<THandler<Self>, ConnectionDenied> {
3✔
171
        let pending_handler_command = self.pending_handler_commands.remove(&connection_id);
3✔
172

173
        if local_addr.is_relayed() {
3✔
174
            return Ok(Either::Right(dummy::ConnectionHandler));
2✔
175
        }
1✔
176
        let mut handler = Handler::new(self.local_peer_id, peer, remote_addr.clone());
1✔
177

178
        if let Some(event) = pending_handler_command {
1✔
179
            handler.on_behaviour_event(event)
×
180
        }
1✔
181

182
        Ok(Either::Left(handler))
1✔
183
    }
3✔
184

185
    fn handle_established_outbound_connection(
12✔
186
        &mut self,
12✔
187
        connection_id: ConnectionId,
12✔
188
        peer: PeerId,
12✔
189
        addr: &Multiaddr,
12✔
190
        _: Endpoint,
12✔
191
        _: PortUse,
12✔
192
    ) -> Result<THandler<Self>, ConnectionDenied> {
12✔
193
        let pending_handler_command = self.pending_handler_commands.remove(&connection_id);
12✔
194

195
        if addr.is_relayed() {
12✔
196
            return Ok(Either::Right(dummy::ConnectionHandler));
2✔
197
        }
10✔
198

199
        let mut handler = Handler::new(self.local_peer_id, peer, addr.clone());
10✔
200

201
        if let Some(event) = pending_handler_command {
10✔
202
            handler.on_behaviour_event(event)
8✔
203
        }
2✔
204

205
        Ok(Either::Left(handler))
10✔
206
    }
12✔
207

208
    fn on_swarm_event(&mut self, event: FromSwarm) {
61✔
209
        match event {
15✔
210
            FromSwarm::ConnectionEstablished(ConnectionEstablished {
211
                peer_id,
11✔
212
                connection_id,
11✔
213
                endpoint,
11✔
214
                ..
215
            }) if !endpoint.is_relayed() => {
15✔
216
                self.directly_connected_peers
11✔
217
                    .entry(peer_id)
11✔
218
                    .or_default()
11✔
219
                    .push(connection_id);
11✔
220
            }
11✔
221
            FromSwarm::ConnectionClosed(connection_closed) => {
×
222
                self.on_connection_closed(connection_closed)
×
223
            }
224
            FromSwarm::DialFailure(DialFailure { connection_id, .. }) => {
2✔
225
                self.reservation_addresses.remove(&connection_id);
2✔
226
                self.pending_handler_commands.remove(&connection_id);
2✔
227
            }
2✔
228
            _ => {}
48✔
229
        }
230
    }
61✔
231

232
    fn on_connection_handler_event(
11✔
233
        &mut self,
11✔
234
        event_source: PeerId,
11✔
235
        connection: ConnectionId,
11✔
236
        handler_event: THandlerOutEvent<Self>,
11✔
237
    ) {
11✔
238
        let handler_event = match handler_event {
11✔
239
            Either::Left(e) => e,
11✔
240
            Either::Right(v) => libp2p_core::util::unreachable(v),
241
        };
242

243
        let event = match handler_event {
11✔
244
            handler::Event::ReservationReqAccepted { renewal, limit } => {
7✔
245
                let (addr, status) = self
7✔
246
                    .reservation_addresses
7✔
247
                    .get_mut(&connection)
7✔
248
                    .expect("Relay connection exist");
7✔
249

250
                if !renewal && *status == ReservationStatus::Pending {
7✔
251
                    *status = ReservationStatus::Confirmed;
5✔
252
                    self.queued_actions
5✔
253
                        .push_back(ToSwarm::ExternalAddrConfirmed(addr.clone()));
5✔
254
                }
5✔
255

256
                Event::ReservationReqAccepted {
7✔
257
                    relay_peer_id: event_source,
7✔
258
                    renewal,
7✔
259
                    limit,
7✔
260
                }
7✔
261
            }
262
            handler::Event::OutboundCircuitEstablished { limit } => {
2✔
263
                Event::OutboundCircuitEstablished {
2✔
264
                    relay_peer_id: event_source,
2✔
265
                    limit,
2✔
266
                }
2✔
267
            }
268
            handler::Event::InboundCircuitEstablished { src_peer_id, limit } => {
2✔
269
                Event::InboundCircuitEstablished { src_peer_id, limit }
2✔
270
            }
271
        };
272

273
        self.queued_actions.push_back(ToSwarm::GenerateEvent(event));
11✔
274
    }
11✔
275

276
    #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
277
    fn poll(
185✔
278
        &mut self,
185✔
279
        cx: &mut Context<'_>,
185✔
280
    ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
185✔
281
        if let Some(action) = self.queued_actions.pop_front() {
185✔
282
            return Poll::Ready(action);
16✔
283
        }
169✔
284

285
        let action = match ready!(self.from_transport.poll_next_unpin(cx)) {
169✔
286
            Some(transport::TransportToBehaviourMsg::ListenReq {
287
                relay_peer_id,
8✔
288
                relay_addr,
8✔
289
                to_listener,
8✔
290
            }) => {
291
                match self
8✔
292
                    .directly_connected_peers
8✔
293
                    .get(&relay_peer_id)
8✔
294
                    .and_then(|cs| cs.first())
8✔
295
                {
296
                    Some(connection_id) => {
2✔
297
                        self.reservation_addresses.insert(
2✔
298
                            *connection_id,
2✔
299
                            (
2✔
300
                                relay_addr
2✔
301
                                    .with(Protocol::P2p(relay_peer_id))
2✔
302
                                    .with(Protocol::P2pCircuit)
2✔
303
                                    .with(Protocol::P2p(self.local_peer_id)),
2✔
304
                                ReservationStatus::Pending,
2✔
305
                            ),
2✔
306
                        );
307

308
                        ToSwarm::NotifyHandler {
2✔
309
                            peer_id: relay_peer_id,
2✔
310
                            handler: NotifyHandler::One(*connection_id),
2✔
311
                            event: Either::Left(handler::In::Reserve { to_listener }),
2✔
312
                        }
2✔
313
                    }
314
                    None => {
315
                        let opts = DialOpts::peer_id(relay_peer_id)
6✔
316
                            .addresses(vec![relay_addr.clone()])
6✔
317
                            .extend_addresses_through_behaviour()
6✔
318
                            .build();
6✔
319
                        let relayed_connection_id = opts.connection_id();
6✔
320

321
                        self.reservation_addresses.insert(
6✔
322
                            relayed_connection_id,
6✔
323
                            (
6✔
324
                                relay_addr
6✔
325
                                    .with(Protocol::P2p(relay_peer_id))
6✔
326
                                    .with(Protocol::P2pCircuit)
6✔
327
                                    .with(Protocol::P2p(self.local_peer_id)),
6✔
328
                                ReservationStatus::Pending,
6✔
329
                            ),
6✔
330
                        );
331

332
                        self.pending_handler_commands
6✔
333
                            .insert(relayed_connection_id, handler::In::Reserve { to_listener });
6✔
334
                        ToSwarm::Dial { opts }
6✔
335
                    }
336
                }
337
            }
338
            Some(transport::TransportToBehaviourMsg::DialReq {
339
                relay_addr,
3✔
340
                relay_peer_id,
3✔
341
                dst_peer_id,
3✔
342
                send_back,
3✔
343
                ..
344
            }) => {
345
                match self
3✔
346
                    .directly_connected_peers
3✔
347
                    .get(&relay_peer_id)
3✔
348
                    .and_then(|cs| cs.first())
3✔
349
                {
350
                    Some(connection_id) => ToSwarm::NotifyHandler {
×
351
                        peer_id: relay_peer_id,
×
352
                        handler: NotifyHandler::One(*connection_id),
×
353
                        event: Either::Left(handler::In::EstablishCircuit {
×
354
                            to_dial: send_back,
×
355
                            dst_peer_id,
×
356
                        }),
×
357
                    },
×
358
                    None => {
359
                        let opts = DialOpts::peer_id(relay_peer_id)
3✔
360
                            .addresses(vec![relay_addr])
3✔
361
                            .extend_addresses_through_behaviour()
3✔
362
                            .build();
3✔
363
                        let connection_id = opts.connection_id();
3✔
364

365
                        self.pending_handler_commands.insert(
3✔
366
                            connection_id,
3✔
367
                            handler::In::EstablishCircuit {
3✔
368
                                to_dial: send_back,
3✔
369
                                dst_peer_id,
3✔
370
                            },
3✔
371
                        );
372

373
                        ToSwarm::Dial { opts }
3✔
374
                    }
375
                }
376
            }
377
            None => unreachable!(
×
378
                "`relay::Behaviour` polled after channel from \
379
                     `Transport` has been closed. Unreachable under \
380
                     the assumption that the `client::Behaviour` is never polled after \
381
                     `client::Transport` is dropped.",
382
            ),
383
        };
384

385
        Poll::Ready(action)
11✔
386
    }
185✔
387
}
388

389
/// Represents a connection to another peer via a relay.
390
///
391
/// Internally, this uses a stream to the relay.
392
pub struct Connection {
393
    pub(crate) state: ConnectionState,
394
}
395

396
pub(crate) enum ConnectionState {
397
    InboundAccepting {
398
        accept: BoxFuture<'static, Result<ConnectionState, Error>>,
399
    },
400
    Operational {
401
        read_buffer: Bytes,
402
        substream: Stream,
403
    },
404
}
405

406
impl Unpin for ConnectionState {}
407

408
impl ConnectionState {
409
    pub(crate) fn new_inbound(circuit: inbound_stop::Circuit) -> Self {
2✔
410
        ConnectionState::InboundAccepting {
411
            accept: async {
2✔
412
                let (substream, read_buffer) = circuit.accept().await.map_err(Error::other)?;
2✔
413
                Ok(ConnectionState::Operational {
2✔
414
                    read_buffer,
2✔
415
                    substream,
2✔
416
                })
2✔
417
            }
2✔
418
            .boxed(),
2✔
419
        }
420
    }
2✔
421

422
    pub(crate) fn new_outbound(substream: Stream, read_buffer: Bytes) -> Self {
2✔
423
        ConnectionState::Operational {
2✔
424
            substream,
2✔
425
            read_buffer,
2✔
426
        }
2✔
427
    }
2✔
428
}
429

430
impl AsyncWrite for Connection {
431
    fn poll_write(
60✔
432
        mut self: Pin<&mut Self>,
60✔
433
        cx: &mut Context,
60✔
434
        buf: &[u8],
60✔
435
    ) -> Poll<Result<usize, Error>> {
60✔
436
        loop {
437
            match &mut self.state {
60✔
438
                ConnectionState::InboundAccepting { accept } => {
×
439
                    *self = Connection {
×
440
                        state: ready!(accept.poll_unpin(cx))?,
×
441
                    };
442
                }
443
                ConnectionState::Operational { substream, .. } => {
60✔
444
                    return Pin::new(substream).poll_write(cx, buf);
60✔
445
                }
446
            }
447
        }
448
    }
60✔
449
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Error>> {
102✔
450
        loop {
451
            match &mut self.state {
102✔
452
                ConnectionState::InboundAccepting { accept } => {
×
453
                    *self = Connection {
×
454
                        state: ready!(accept.poll_unpin(cx))?,
×
455
                    };
456
                }
457
                ConnectionState::Operational { substream, .. } => {
102✔
458
                    return Pin::new(substream).poll_flush(cx);
102✔
459
                }
460
            }
461
        }
462
    }
102✔
463
    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Error>> {
×
464
        loop {
465
            match &mut self.state {
×
466
                ConnectionState::InboundAccepting { accept } => {
×
467
                    *self = Connection {
×
468
                        state: ready!(accept.poll_unpin(cx))?,
×
469
                    };
470
                }
471
                ConnectionState::Operational { substream, .. } => {
×
472
                    return Pin::new(substream).poll_close(cx);
×
473
                }
474
            }
475
        }
476
    }
×
477

478
    fn poll_write_vectored(
×
479
        mut self: Pin<&mut Self>,
×
480
        cx: &mut Context,
×
481
        bufs: &[IoSlice],
×
482
    ) -> Poll<Result<usize, Error>> {
×
483
        loop {
484
            match &mut self.state {
×
485
                ConnectionState::InboundAccepting { accept } => {
×
486
                    *self = Connection {
×
487
                        state: ready!(accept.poll_unpin(cx))?,
×
488
                    };
489
                }
490
                ConnectionState::Operational { substream, .. } => {
×
491
                    return Pin::new(substream).poll_write_vectored(cx, bufs);
×
492
                }
493
            }
494
        }
495
    }
×
496
}
497

498
impl AsyncRead for Connection {
499
    fn poll_read(
117✔
500
        mut self: Pin<&mut Self>,
117✔
501
        cx: &mut Context<'_>,
117✔
502
        buf: &mut [u8],
117✔
503
    ) -> Poll<Result<usize, Error>> {
117✔
504
        loop {
505
            match &mut self.state {
119✔
506
                ConnectionState::InboundAccepting { accept } => {
2✔
507
                    *self = Connection {
2✔
508
                        state: ready!(accept.poll_unpin(cx))?,
2✔
509
                    };
510
                }
511
                ConnectionState::Operational {
512
                    read_buffer,
117✔
513
                    substream,
117✔
514
                    ..
515
                } => {
516
                    if !read_buffer.is_empty() {
117✔
517
                        let n = std::cmp::min(read_buffer.len(), buf.len());
×
518
                        let data = read_buffer.split_to(n);
×
519
                        buf[0..n].copy_from_slice(&data[..]);
×
520
                        return Poll::Ready(Ok(n));
×
521
                    }
117✔
522

523
                    return Pin::new(substream).poll_read(cx, buf);
117✔
524
                }
525
            }
526
        }
527
    }
117✔
528
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc