• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

butlergroup / rust-libp2p / 18610913338

18 Oct 2025 04:41AM UTC coverage: 78.379% (+2.5%) from 75.842%
18610913338

push

github

butlergroup
	modified:   .github/workflows/ci.yml

36944 of 47135 relevant lines covered (78.38%)

37728.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.95
/transports/tcp/src/lib.rs
1
// Copyright 2017 Parity Technologies (UK) Ltd.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a
4
// copy of this software and associated documentation files (the "Software"),
5
// to deal in the Software without restriction, including without limitation
6
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7
// and/or sell copies of the Software, and to permit persons to whom the
8
// Software is furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19
// DEALINGS IN THE SOFTWARE.
20

21
//! Implementation of the libp2p [`libp2p_core::Transport`] trait for TCP/IP.
22
//!
23
//! # Usage
24
//!
25
//! This crate provides [`tokio::Transport`]
26
//! which implement the [`libp2p_core::Transport`] trait for use as a
27
//! transport with `libp2p-core` or `libp2p-swarm`.
28

29
#![cfg_attr(docsrs, feature(doc_cfg))]
30

31
mod provider;
32

33
use std::{
34
    collections::{HashSet, VecDeque},
35
    io,
36
    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, TcpListener},
37
    pin::Pin,
38
    sync::{Arc, RwLock},
39
    task::{Context, Poll, Waker},
40
    time::Duration,
41
};
42

43
use futures::{future::Ready, prelude::*, stream::SelectAll};
44
use futures_timer::Delay;
45
use if_watch::IfEvent;
46
use libp2p_core::{
47
    multiaddr::{Multiaddr, Protocol},
48
    transport::{DialOpts, ListenerId, PortUse, TransportError, TransportEvent},
49
};
50
#[cfg(feature = "tokio")]
51
pub use provider::tokio;
52
use provider::{Incoming, Provider};
53
use socket2::{Domain, Socket, Type};
54

55
/// The configuration for a TCP/IP transport capability for libp2p.
56
#[derive(Clone, Debug)]
57
pub struct Config {
58
    /// TTL to set for opened sockets, or `None` to keep default.
59
    ttl: Option<u32>,
60
    /// `TCP_NODELAY` to set for opened sockets.
61
    nodelay: bool,
62
    /// Size of the listen backlog for listen sockets.
63
    backlog: u32,
64
}
65

66
type Port = u16;
67

68
/// The configuration for port reuse of listening sockets.
69
#[derive(Debug, Clone, Default)]
70
struct PortReuse {
71
    /// The addresses and ports of the listening sockets
72
    /// registered as eligible for port reuse when dialing
73
    listen_addrs: Arc<RwLock<HashSet<(IpAddr, Port)>>>,
74
}
75

76
impl PortReuse {
77
    /// Registers a socket address for port reuse.
78
    ///
79
    /// Has no effect if port reuse is disabled.
80
    fn register(&mut self, ip: IpAddr, port: Port) {
988✔
81
        tracing::trace!(%ip, %port, "Registering for port reuse");
988✔
82
        self.listen_addrs
988✔
83
            .write()
988✔
84
            .expect("`register()` and `unregister()` never panic while holding the lock")
988✔
85
            .insert((ip, port));
988✔
86
    }
988✔
87

88
    /// Unregisters a socket address for port reuse.
89
    ///
90
    /// Has no effect if port reuse is disabled.
91
    fn unregister(&mut self, ip: IpAddr, port: Port) {
1,002✔
92
        tracing::trace!(%ip, %port, "Unregistering for port reuse");
1,002✔
93
        self.listen_addrs
1,002✔
94
            .write()
1,002✔
95
            .expect("`register()` and `unregister()` never panic while holding the lock")
1,002✔
96
            .remove(&(ip, port));
1,002✔
97
    }
1,002✔
98

99
    /// Selects a listening socket address suitable for use
100
    /// as the local socket address when dialing.
101
    ///
102
    /// If multiple listening sockets are registered for port
103
    /// reuse, one is chosen whose IP protocol version and
104
    /// loopback status is the same as that of `remote_ip`.
105
    ///
106
    /// Returns `None` if port reuse is disabled or no suitable
107
    /// listening socket address is found.
108
    fn local_dial_addr(&self, remote_ip: &IpAddr) -> Option<SocketAddr> {
323✔
109
        for (ip, port) in self
323✔
110
            .listen_addrs
323✔
111
            .read()
323✔
112
            .expect("`local_dial_addr` never panic while holding the lock")
323✔
113
            .iter()
323✔
114
        {
115
            if ip.is_ipv4() == remote_ip.is_ipv4() && ip.is_loopback() == remote_ip.is_loopback() {
303✔
116
                if remote_ip.is_ipv4() {
296✔
117
                    return Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), *port));
293✔
118
                } else {
119
                    return Some(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), *port));
3✔
120
                }
121
            }
7✔
122
        }
123

124
        None
27✔
125
    }
323✔
126
}
127

128
impl Config {
129
    /// Creates a new configuration for a TCP/IP transport:
130
    ///
131
    ///   * Nagle's algorithm is _disabled_, i.e. `TCP_NODELAY` _enabled_. See [`Config::nodelay`].
132
    ///   * Reuse of listening ports is _disabled_. See [`Config::port_reuse`].
133
    ///   * No custom `IP_TTL` is set. The default of the OS TCP stack applies. See [`Config::ttl`].
134
    ///   * The size of the listen backlog for new listening sockets is `1024`. See
135
    ///     [`Config::listen_backlog`].
136
    pub fn new() -> Self {
2,019✔
137
        Self {
2,019✔
138
            ttl: None,
2,019✔
139
            nodelay: true, // Disable Nagle's algorithm by default.
2,019✔
140
            backlog: 1024,
2,019✔
141
        }
2,019✔
142
    }
2,019✔
143

144
    /// Configures the `IP_TTL` option for new sockets.
145
    pub fn ttl(mut self, value: u32) -> Self {
×
146
        self.ttl = Some(value);
×
147
        self
×
148
    }
×
149

150
    /// Configures the `TCP_NODELAY` option for new sockets.
151
    pub fn nodelay(mut self, value: bool) -> Self {
×
152
        self.nodelay = value;
×
153
        self
×
154
    }
×
155

156
    /// Configures the listen backlog for new listen sockets.
157
    pub fn listen_backlog(mut self, backlog: u32) -> Self {
×
158
        self.backlog = backlog;
×
159
        self
×
160
    }
×
161

162
    /// Configures port reuse for local sockets, which implies
163
    /// reuse of listening ports for outgoing connections to
164
    /// enhance NAT traversal capabilities.
165
    ///
166
    /// # Deprecation Notice
167
    ///
168
    /// The new implementation works on a per-connection basis, defined by the behaviour. This
169
    /// removes the necessity to configure the transport for port reuse, instead the behaviour
170
    /// requiring this behaviour can decide whether to use port reuse or not.
171
    ///
172
    /// The API to configure port reuse is part of [`Transport`] and the option can be found in
173
    /// [`libp2p_core::transport::DialOpts`].
174
    ///
175
    /// If [`PortUse::Reuse`] is enabled, the transport will try to reuse the local port of the
176
    /// listener. If that's not possible, i.e. there is no listener or the transport doesn't allow
177
    /// a direct control over ports, a new port (or the default behaviour) is used. If port reuse
178
    /// is enabled for a connection, this option will be treated on a best-effort basis.
179
    #[deprecated(
180
        since = "0.42.0",
181
        note = "This option does nothing now, since the port reuse policy is now decided on a per-connection basis by the behaviour. The function will be removed in a future release."
182
    )]
183
    pub fn port_reuse(self, _port_reuse: bool) -> Self {
×
184
        self
×
185
    }
×
186

187
    fn create_socket(&self, socket_addr: SocketAddr, port_use: PortUse) -> io::Result<Socket> {
1,294✔
188
        let socket = Socket::new(
1,294✔
189
            Domain::for_address(socket_addr),
1,294✔
190
            Type::STREAM,
191
            Some(socket2::Protocol::TCP),
1,294✔
192
        )?;
×
193
        if socket_addr.is_ipv6() {
1,294✔
194
            socket.set_only_v6(true)?;
18✔
195
        }
1,276✔
196
        if let Some(ttl) = self.ttl {
1,294✔
197
            socket.set_ttl_v4(ttl)?;
×
198
        }
1,294✔
199
        socket.set_tcp_nodelay(self.nodelay)?;
1,294✔
200
        socket.set_reuse_address(true)?;
1,294✔
201
        #[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))]
202
        if port_use == PortUse::Reuse {
1,294✔
203
            socket.set_reuse_port(true)?;
1,142✔
204
        }
152✔
205

206
        #[cfg(not(all(unix, not(any(target_os = "solaris", target_os = "illumos")))))]
207
        let _ = port_use; // silence the unused warning on non-unix platforms (i.e. Windows)
208

209
        socket.set_nonblocking(true)?;
1,294✔
210

211
        Ok(socket)
1,294✔
212
    }
1,294✔
213
}
214

215
impl Default for Config {
216
    fn default() -> Self {
2,014✔
217
        Self::new()
2,014✔
218
    }
2,014✔
219
}
220

221
/// An abstract [`libp2p_core::Transport`] implementation.
222
///
223
/// You shouldn't need to use this type directly. Use one of the following instead:
224
///
225
/// - [`tokio::Transport`]
226
pub struct Transport<T>
227
where
228
    T: Provider + Send,
229
{
230
    config: Config,
231

232
    /// The configuration of port reuse when dialing.
233
    port_reuse: PortReuse,
234
    /// All the active listeners.
235
    /// The [`ListenStream`] struct contains a stream that we want to be pinned. Since the
236
    /// `VecDeque` can be resized, the only way is to use a `Pin<Box<>>`.
237
    listeners: SelectAll<ListenStream<T>>,
238
    /// Pending transport events to return from [`libp2p_core::Transport::poll`].
239
    pending_events:
240
        VecDeque<TransportEvent<<Self as libp2p_core::Transport>::ListenerUpgrade, io::Error>>,
241
}
242

243
impl<T> Transport<T>
244
where
245
    T: Provider + Send,
246
{
247
    /// Create a new instance of [`Transport`].
248
    ///
249
    /// If you don't want to specify a [`Config`], use [`Transport::default`].
250
    ///
251
    /// It is best to call this function through one of the type-aliases of this type:
252
    ///
253
    /// - [`tokio::Transport::new`]
254
    pub fn new(config: Config) -> Self {
9✔
255
        Transport {
9✔
256
            config,
9✔
257
            ..Default::default()
9✔
258
        }
9✔
259
    }
9✔
260

261
    fn do_listen(
803✔
262
        &mut self,
803✔
263
        id: ListenerId,
803✔
264
        socket_addr: SocketAddr,
803✔
265
    ) -> io::Result<ListenStream<T>> {
803✔
266
        let socket = self.config.create_socket(socket_addr, PortUse::Reuse)?;
803✔
267
        socket.bind(&socket_addr.into())?;
803✔
268
        socket.listen(self.config.backlog as _)?;
803✔
269
        socket.set_nonblocking(true)?;
803✔
270
        let listener: TcpListener = socket.into();
803✔
271
        let local_addr = listener.local_addr()?;
803✔
272

273
        if local_addr.ip().is_unspecified() {
803✔
274
            return ListenStream::<T>::new(
9✔
275
                id,
9✔
276
                listener,
9✔
277
                Some(T::new_if_watcher()?),
9✔
278
                self.port_reuse.clone(),
9✔
279
            );
280
        }
794✔
281

282
        self.port_reuse.register(local_addr.ip(), local_addr.port());
794✔
283
        let listen_addr = ip_to_multiaddr(local_addr.ip(), local_addr.port());
794✔
284
        self.pending_events.push_back(TransportEvent::NewAddress {
794✔
285
            listener_id: id,
794✔
286
            listen_addr,
794✔
287
        });
794✔
288
        ListenStream::<T>::new(id, listener, None, self.port_reuse.clone())
794✔
289
    }
803✔
290
}
291

292
impl<T> Default for Transport<T>
293
where
294
    T: Provider + Send,
295
{
296
    /// Creates a [`Transport`] with reasonable defaults.
297
    ///
298
    /// This transport will have port-reuse disabled.
299
    fn default() -> Self {
1,801✔
300
        Transport {
1,801✔
301
            port_reuse: PortReuse::default(),
1,801✔
302
            config: Config::default(),
1,801✔
303
            listeners: SelectAll::new(),
1,801✔
304
            pending_events: VecDeque::new(),
1,801✔
305
        }
1,801✔
306
    }
1,801✔
307
}
308

309
impl<T> libp2p_core::Transport for Transport<T>
310
where
311
    T: Provider + Send + 'static,
312
    T::Listener: Unpin,
313
    T::Stream: Unpin,
314
{
315
    type Output = T::Stream;
316
    type Error = io::Error;
317
    type Dial = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
318
    type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
319

320
    fn listen_on(
804✔
321
        &mut self,
804✔
322
        id: ListenerId,
804✔
323
        addr: Multiaddr,
804✔
324
    ) -> Result<(), TransportError<Self::Error>> {
804✔
325
        let socket_addr = multiaddr_to_socketaddr(addr.clone())
804✔
326
            .map_err(|_| TransportError::MultiaddrNotSupported(addr))?;
804✔
327
        tracing::debug!("listening on {}", socket_addr);
803✔
328
        let listener = self
803✔
329
            .do_listen(id, socket_addr)
803✔
330
            .map_err(TransportError::Other)?;
803✔
331
        self.listeners.push(listener);
803✔
332
        Ok(())
803✔
333
    }
804✔
334

335
    fn remove_listener(&mut self, id: ListenerId) -> bool {
1✔
336
        if let Some(listener) = self.listeners.iter_mut().find(|l| l.listener_id == id) {
1✔
337
            listener.close(Ok(()));
1✔
338
            true
1✔
339
        } else {
340
            false
×
341
        }
342
    }
1✔
343

344
    fn dial(
148✔
345
        &mut self,
148✔
346
        addr: Multiaddr,
148✔
347
        opts: DialOpts,
148✔
348
    ) -> Result<Self::Dial, TransportError<Self::Error>> {
148✔
349
        let socket_addr = if let Ok(socket_addr) = multiaddr_to_socketaddr(addr.clone()) {
148✔
350
            if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() {
145✔
351
                return Err(TransportError::MultiaddrNotSupported(addr));
×
352
            }
145✔
353
            socket_addr
145✔
354
        } else {
355
            return Err(TransportError::MultiaddrNotSupported(addr));
3✔
356
        };
357
        tracing::debug!(address=%socket_addr, "dialing address");
145✔
358

359
        let socket = self
145✔
360
            .config
145✔
361
            .create_socket(socket_addr, opts.port_use)
145✔
362
            .map_err(TransportError::Other)?;
145✔
363

364
        let bind_addr = match self.port_reuse.local_dial_addr(&socket_addr.ip()) {
145✔
365
            Some(socket_addr) if opts.port_use == PortUse::Reuse => {
128✔
366
                tracing::trace!(address=%addr, "Binding dial socket to listen socket address");
68✔
367
                Some(socket_addr)
68✔
368
            }
369
            _ => None,
77✔
370
        };
371

372
        let local_config = self.config.clone();
145✔
373

374
        Ok(async move {
142✔
375
            if let Some(bind_addr) = bind_addr {
142✔
376
                socket.bind(&bind_addr.into())?;
68✔
377
            }
74✔
378

379
            // [`Transport::dial`] should do no work unless the returned [`Future`] is polled. Thus
380
            // do the `connect` call within the [`Future`].
381
            let socket = match (socket.connect(&socket_addr.into()), bind_addr) {
142✔
382
                (Ok(()), _) => socket,
×
383
                (Err(err), _) if err.raw_os_error() == Some(libc::EINPROGRESS) => socket,
142✔
384
                (Err(err), _) if err.kind() == io::ErrorKind::WouldBlock => socket,
1✔
385
                (Err(err), Some(bind_addr)) if err.kind() == io::ErrorKind::AddrNotAvailable  => {
1✔
386
                    // The socket was bound to a local address that is no longer available.
387
                    // Retry without binding.
388
                    tracing::debug!(connect_addr = %socket_addr, ?bind_addr, "Failed to connect using existing socket because we already have a connection, re-dialing with new port");
1✔
389
                    std::mem::drop(socket);
1✔
390
                    let socket = local_config.create_socket(socket_addr, PortUse::New)?;
1✔
391
                    match socket.connect(&socket_addr.into()) {
1✔
392
                        Ok(()) => socket,
×
393
                        Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) => socket,
1✔
394
                        Err(err) if err.kind() == io::ErrorKind::WouldBlock => socket,
×
395
                        Err(err) => return Err(err),
×
396
                    }
397
                }
398
                (Err(err), _) => return Err(err),
×
399
            };
400

401
            let stream = T::new_stream(socket.into()).await?;
142✔
402
            Ok(stream)
116✔
403
        }
141✔
404
        .boxed())
145✔
405
    }
148✔
406

407
    /// Poll all listeners.
408
    #[tracing::instrument(level = "trace", name = "Transport::poll", skip(self, cx))]
409
    fn poll(
26,662✔
410
        mut self: Pin<&mut Self>,
26,662✔
411
        cx: &mut Context<'_>,
26,662✔
412
    ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
26,662✔
413
        // Return pending events from closed listeners.
414
        if let Some(event) = self.pending_events.pop_front() {
26,662✔
415
            return Poll::Ready(event);
793✔
416
        }
25,869✔
417

418
        match self.listeners.poll_next_unpin(cx) {
25,869✔
419
            Poll::Ready(Some(transport_event)) => Poll::Ready(transport_event),
137✔
420
            _ => Poll::Pending,
25,732✔
421
        }
422
    }
26,662✔
423
}
424

425
/// A stream of incoming connections on one or more interfaces.
426
struct ListenStream<T>
427
where
428
    T: Provider,
429
{
430
    /// The ID of this listener.
431
    listener_id: ListenerId,
432
    /// The socket address that the listening socket is bound to,
433
    /// which may be a "wildcard address" like `INADDR_ANY` or `IN6ADDR_ANY`
434
    /// when listening on all interfaces for IPv4 respectively IPv6 connections.
435
    listen_addr: SocketAddr,
436
    /// The async listening socket for incoming connections.
437
    listener: T::Listener,
438
    /// Watcher for network interface changes.
439
    /// Reports [`IfEvent`]s for new / deleted ip-addresses when interfaces
440
    /// become or stop being available.
441
    ///
442
    /// `None` if the socket is only listening on a single interface.
443
    if_watcher: Option<T::IfWatcher>,
444
    /// The port reuse configuration for outgoing connections.
445
    ///
446
    /// If enabled, all IP addresses on which this listening stream
447
    /// is accepting connections (`in_addr`) are registered for reuse
448
    /// as local addresses for the sockets of outgoing connections. They are
449
    /// unregistered when the stream encounters an error or is dropped.
450
    port_reuse: PortReuse,
451
    /// How long to sleep after a (non-fatal) error while trying
452
    /// to accept a new connection.
453
    sleep_on_error: Duration,
454
    /// The current pause, if any.
455
    pause: Option<Delay>,
456
    /// Pending event to reported.
457
    pending_event: Option<<Self as Stream>::Item>,
458
    /// The listener can be manually closed with
459
    /// [`Transport::remove_listener`](libp2p_core::Transport::remove_listener).
460
    is_closed: bool,
461
    /// The stream must be awaken after it has been closed to deliver the last event.
462
    close_listener_waker: Option<Waker>,
463
}
464

465
impl<T> ListenStream<T>
466
where
467
    T: Provider,
468
{
469
    /// Constructs a [`ListenStream`] for incoming connections around
470
    /// the given [`TcpListener`].
471
    fn new(
803✔
472
        listener_id: ListenerId,
803✔
473
        listener: TcpListener,
803✔
474
        if_watcher: Option<T::IfWatcher>,
803✔
475
        port_reuse: PortReuse,
803✔
476
    ) -> io::Result<Self> {
803✔
477
        let listen_addr = listener.local_addr()?;
803✔
478
        let listener = T::new_listener(listener)?;
803✔
479

480
        Ok(ListenStream {
803✔
481
            port_reuse,
803✔
482
            listener,
803✔
483
            listener_id,
803✔
484
            listen_addr,
803✔
485
            if_watcher,
803✔
486
            pause: None,
803✔
487
            sleep_on_error: Duration::from_millis(100),
803✔
488
            pending_event: None,
803✔
489
            is_closed: false,
803✔
490
            close_listener_waker: None,
803✔
491
        })
803✔
492
    }
803✔
493

494
    /// Disables port reuse for any listen address of this stream.
495
    ///
496
    /// This is done when the [`ListenStream`] encounters a fatal
497
    /// error (for the stream) or is dropped.
498
    ///
499
    /// Has no effect if port reuse is disabled.
500
    fn disable_port_reuse(&mut self) {
803✔
501
        match &self.if_watcher {
803✔
502
            Some(if_watcher) => {
9✔
503
                for ip_net in T::addrs(if_watcher) {
35✔
504
                    self.port_reuse
35✔
505
                        .unregister(ip_net.addr(), self.listen_addr.port());
35✔
506
                }
35✔
507
            }
508
            None => self
794✔
509
                .port_reuse
794✔
510
                .unregister(self.listen_addr.ip(), self.listen_addr.port()),
794✔
511
        }
512
    }
803✔
513

514
    /// Close the listener.
515
    ///
516
    /// This will create a [`TransportEvent::ListenerClosed`] and
517
    /// terminate the stream once the event has been reported.
518
    fn close(&mut self, reason: Result<(), io::Error>) {
1✔
519
        if self.is_closed {
1✔
520
            return;
×
521
        }
1✔
522
        self.pending_event = Some(TransportEvent::ListenerClosed {
1✔
523
            listener_id: self.listener_id,
1✔
524
            reason,
1✔
525
        });
1✔
526
        self.is_closed = true;
1✔
527

528
        // Wake the stream to deliver the last event.
529
        if let Some(waker) = self.close_listener_waker.take() {
1✔
530
            waker.wake();
×
531
        }
1✔
532
    }
1✔
533

534
    /// Poll for a next If Event.
535
    fn poll_if_addr(&mut self, cx: &mut Context<'_>) -> Poll<<Self as Stream>::Item> {
1,037✔
536
        let Some(if_watcher) = self.if_watcher.as_mut() else {
1,037✔
537
            return Poll::Pending;
995✔
538
        };
539

540
        let my_listen_addr_port = self.listen_addr.port();
42✔
541

542
        while let Poll::Ready(Some(event)) = if_watcher.poll_next_unpin(cx) {
56✔
543
            match event {
35✔
544
                Ok(IfEvent::Up(inet)) => {
35✔
545
                    let ip = inet.addr();
35✔
546
                    if self.listen_addr.is_ipv4() == ip.is_ipv4() {
35✔
547
                        let ma = ip_to_multiaddr(ip, my_listen_addr_port);
21✔
548
                        tracing::debug!(address=%ma, "New listen address");
21✔
549
                        self.port_reuse.register(ip, my_listen_addr_port);
21✔
550
                        return Poll::Ready(TransportEvent::NewAddress {
21✔
551
                            listener_id: self.listener_id,
21✔
552
                            listen_addr: ma,
21✔
553
                        });
21✔
554
                    }
14✔
555
                }
556
                Ok(IfEvent::Down(inet)) => {
×
557
                    let ip = inet.addr();
×
558
                    if self.listen_addr.is_ipv4() == ip.is_ipv4() {
×
559
                        let ma = ip_to_multiaddr(ip, my_listen_addr_port);
×
560
                        tracing::debug!(address=%ma, "Expired listen address");
×
561
                        self.port_reuse.unregister(ip, my_listen_addr_port);
×
562
                        return Poll::Ready(TransportEvent::AddressExpired {
×
563
                            listener_id: self.listener_id,
×
564
                            listen_addr: ma,
×
565
                        });
×
566
                    }
×
567
                }
568
                Err(error) => {
×
569
                    self.pause = Some(Delay::new(self.sleep_on_error));
×
570
                    return Poll::Ready(TransportEvent::ListenerError {
×
571
                        listener_id: self.listener_id,
×
572
                        error,
×
573
                    });
×
574
                }
575
            }
576
        }
577

578
        Poll::Pending
21✔
579
    }
1,037✔
580
}
581

582
impl<T> Drop for ListenStream<T>
583
where
584
    T: Provider,
585
{
586
    fn drop(&mut self) {
803✔
587
        self.disable_port_reuse();
803✔
588
    }
803✔
589
}
590

591
impl<T> Stream for ListenStream<T>
592
where
593
    T: Provider,
594
    T::Listener: Unpin,
595
    T::Stream: Unpin,
596
{
597
    type Item = TransportEvent<Ready<Result<T::Stream, io::Error>>, io::Error>;
598

599
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
1,037✔
600
        if let Some(mut pause) = self.pause.take() {
1,037✔
601
            match pause.poll_unpin(cx) {
×
602
                Poll::Ready(_) => {}
×
603
                Poll::Pending => {
604
                    self.pause = Some(pause);
×
605
                    return Poll::Pending;
×
606
                }
607
            }
608
        }
1,037✔
609

610
        if let Some(event) = self.pending_event.take() {
1,037✔
611
            return Poll::Ready(Some(event));
×
612
        }
1,037✔
613

614
        if self.is_closed {
1,037✔
615
            // Terminate the stream if the listener closed
616
            // and all remaining events have been reported.
617
            return Poll::Ready(None);
×
618
        }
1,037✔
619

620
        if let Poll::Ready(event) = self.poll_if_addr(cx) {
1,037✔
621
            return Poll::Ready(Some(event));
21✔
622
        }
1,016✔
623

624
        // Take the pending connection from the backlog.
625
        match T::poll_accept(&mut self.listener, cx) {
1,016✔
626
            Poll::Ready(Ok(Incoming {
627
                local_addr,
116✔
628
                remote_addr,
116✔
629
                stream,
116✔
630
            })) => {
631
                let local_addr = ip_to_multiaddr(local_addr.ip(), local_addr.port());
116✔
632
                let remote_addr = ip_to_multiaddr(remote_addr.ip(), remote_addr.port());
116✔
633

634
                tracing::debug!(
116✔
635
                    remote_address=%remote_addr,
636
                    local_address=%local_addr,
637
                    "Incoming connection from remote at local"
×
638
                );
639

640
                return Poll::Ready(Some(TransportEvent::Incoming {
116✔
641
                    listener_id: self.listener_id,
116✔
642
                    upgrade: future::ok(stream),
116✔
643
                    local_addr,
116✔
644
                    send_back_addr: remote_addr,
116✔
645
                }));
116✔
646
            }
647
            Poll::Ready(Err(error)) => {
×
648
                // These errors are non-fatal for the listener stream.
649
                self.pause = Some(Delay::new(self.sleep_on_error));
×
650
                return Poll::Ready(Some(TransportEvent::ListenerError {
×
651
                    listener_id: self.listener_id,
×
652
                    error,
×
653
                }));
×
654
            }
655
            Poll::Pending => {}
900✔
656
        }
657

658
        self.close_listener_waker = Some(cx.waker().clone());
900✔
659
        Poll::Pending
900✔
660
    }
1,037✔
661
}
662

663
/// Extracts a `SocketAddr` from a given `Multiaddr`.
664
///
665
/// Fails if the given `Multiaddr` does not begin with an IP
666
/// protocol encapsulating a TCP port.
667
fn multiaddr_to_socketaddr(mut addr: Multiaddr) -> Result<SocketAddr, ()> {
1,306✔
668
    // "Pop" the IP address and TCP port from the end of the address,
669
    // ignoring a `/p2p/...` suffix as well as any prefix of possibly
670
    // outer protocols, if present.
671
    let mut port = None;
1,306✔
672
    while let Some(proto) = addr.pop() {
2,906✔
673
        match proto {
2,906✔
674
            Protocol::Ip4(ipv4) => match port {
1,277✔
675
                Some(port) => return Ok(SocketAddr::new(ipv4.into(), port)),
1,277✔
676
                None => return Err(()),
×
677
            },
678
            Protocol::Ip6(ipv6) => match port {
20✔
679
                Some(port) => return Ok(SocketAddr::new(ipv6.into(), port)),
20✔
680
                None => return Err(()),
×
681
            },
682
            Protocol::Tcp(portnum) => match port {
1,299✔
683
                Some(_) => return Err(()),
1✔
684
                None => port = Some(portnum),
1,298✔
685
            },
686
            Protocol::P2p(_) => {}
302✔
687
            _ => return Err(()),
8✔
688
        }
689
    }
690
    Err(())
×
691
}
1,306✔
692

693
// Create a [`Multiaddr`] from the given IP address and port number.
694
fn ip_to_multiaddr(ip: IpAddr, port: u16) -> Multiaddr {
1,484✔
695
    Multiaddr::empty().with(ip.into()).with(Protocol::Tcp(port))
1,484✔
696
}
1,484✔
697

698
#[cfg(all(test, feature = "tokio"))]
699
mod tests {
700
    use futures::{
701
        channel::{mpsc, oneshot},
702
        future::poll_fn,
703
    };
704
    use libp2p_core::{Endpoint, Transport as _};
705

706
    use super::*;
707

708
    #[test]
709
    fn multiaddr_to_tcp_conversion() {
1✔
710
        use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
711

712
        assert!(
1✔
713
            multiaddr_to_socketaddr("/ip4/127.0.0.1/udp/1234".parse::<Multiaddr>().unwrap())
1✔
714
                .is_err()
1✔
715
        );
716

717
        assert_eq!(
1✔
718
            multiaddr_to_socketaddr("/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap()),
1✔
719
            Ok(SocketAddr::new(
1✔
720
                IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
1✔
721
                12345,
1✔
722
            ))
1✔
723
        );
724
        assert_eq!(
1✔
725
            multiaddr_to_socketaddr(
1✔
726
                "/ip4/255.255.255.255/tcp/8080"
1✔
727
                    .parse::<Multiaddr>()
1✔
728
                    .unwrap()
1✔
729
            ),
730
            Ok(SocketAddr::new(
1✔
731
                IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)),
1✔
732
                8080,
1✔
733
            ))
1✔
734
        );
735
        assert_eq!(
1✔
736
            multiaddr_to_socketaddr("/ip6/::1/tcp/12345".parse::<Multiaddr>().unwrap()),
1✔
737
            Ok(SocketAddr::new(
1✔
738
                IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)),
1✔
739
                12345,
1✔
740
            ))
1✔
741
        );
742
        assert_eq!(
1✔
743
            multiaddr_to_socketaddr(
1✔
744
                "/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/tcp/8080"
1✔
745
                    .parse::<Multiaddr>()
1✔
746
                    .unwrap()
1✔
747
            ),
748
            Ok(SocketAddr::new(
1✔
749
                IpAddr::V6(Ipv6Addr::new(
1✔
750
                    65535, 65535, 65535, 65535, 65535, 65535, 65535, 65535,
1✔
751
                )),
1✔
752
                8080,
1✔
753
            ))
1✔
754
        );
755
    }
1✔
756

757
    #[test]
758
    fn communicating_between_dialer_and_listener() {
1✔
759
        let _ = tracing_subscriber::fmt()
1✔
760
            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1✔
761
            .try_init();
1✔
762

763
        async fn listener<T: Provider>(addr: Multiaddr, mut ready_tx: mpsc::Sender<Multiaddr>) {
2✔
764
            let mut tcp = Transport::<T>::default().boxed();
2✔
765
            tcp.listen_on(ListenerId::next(), addr).unwrap();
2✔
766
            loop {
767
                match tcp.select_next_some().await {
4✔
768
                    TransportEvent::NewAddress { listen_addr, .. } => {
2✔
769
                        ready_tx.send(listen_addr).await.unwrap();
2✔
770
                    }
771
                    TransportEvent::Incoming { upgrade, .. } => {
2✔
772
                        let mut upgrade = upgrade.await.unwrap();
2✔
773
                        let mut buf = [0u8; 3];
2✔
774
                        upgrade.read_exact(&mut buf).await.unwrap();
2✔
775
                        assert_eq!(buf, [1, 2, 3]);
2✔
776
                        upgrade.write_all(&[4, 5, 6]).await.unwrap();
2✔
777
                        return;
2✔
778
                    }
779
                    e => panic!("Unexpected transport event: {e:?}"),
780
                }
781
            }
782
        }
2✔
783

784
        async fn dialer<T: Provider>(mut ready_rx: mpsc::Receiver<Multiaddr>) {
2✔
785
            let addr = ready_rx.next().await.unwrap();
2✔
786
            let mut tcp = Transport::<T>::default();
2✔
787

788
            // Obtain a future socket through dialing
789
            let mut socket = tcp
2✔
790
                .dial(
2✔
791
                    addr.clone(),
2✔
792
                    DialOpts {
2✔
793
                        role: Endpoint::Dialer,
2✔
794
                        port_use: PortUse::Reuse,
2✔
795
                    },
2✔
796
                )
2✔
797
                .unwrap()
2✔
798
                .await
2✔
799
                .unwrap();
2✔
800
            socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap();
2✔
801

802
            let mut buf = [0u8; 3];
2✔
803
            socket.read_exact(&mut buf).await.unwrap();
2✔
804
            assert_eq!(buf, [4, 5, 6]);
2✔
805
        }
2✔
806

807
        fn test(addr: Multiaddr) {
2✔
808
            let (ready_tx, ready_rx) = mpsc::channel(1);
2✔
809
            let listener = listener::<tokio::Tcp>(addr, ready_tx);
2✔
810
            let dialer = dialer::<tokio::Tcp>(ready_rx);
2✔
811
            let rt = ::tokio::runtime::Builder::new_current_thread()
2✔
812
                .enable_io()
2✔
813
                .build()
2✔
814
                .unwrap();
2✔
815
            let tasks = ::tokio::task::LocalSet::new();
2✔
816
            let listener = tasks.spawn_local(listener);
2✔
817
            tasks.block_on(&rt, dialer);
2✔
818
            tasks.block_on(&rt, listener).unwrap();
2✔
819
        }
2✔
820

821
        test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
1✔
822
        test("/ip6/::1/tcp/0".parse().unwrap());
1✔
823
    }
1✔
824

825
    #[test]
826
    fn wildcard_expansion() {
1✔
827
        let _ = tracing_subscriber::fmt()
1✔
828
            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1✔
829
            .try_init();
1✔
830

831
        async fn listener<T: Provider>(addr: Multiaddr, mut ready_tx: mpsc::Sender<Multiaddr>) {
2✔
832
            let mut tcp = Transport::<T>::default().boxed();
2✔
833
            tcp.listen_on(ListenerId::next(), addr).unwrap();
2✔
834

835
            loop {
836
                match tcp.select_next_some().await {
6✔
837
                    TransportEvent::NewAddress { listen_addr, .. } => {
4✔
838
                        let mut iter = listen_addr.iter();
4✔
839
                        match iter.next().expect("ip address") {
4✔
840
                            Protocol::Ip4(ip) => assert!(!ip.is_unspecified()),
3✔
841
                            Protocol::Ip6(ip) => assert!(!ip.is_unspecified()),
1✔
842
                            other => panic!("Unexpected protocol: {other}"),
843
                        }
844
                        if let Protocol::Tcp(port) = iter.next().expect("port") {
4✔
845
                            assert_ne!(0, port)
4✔
846
                        } else {
847
                            panic!("No TCP port in address: {listen_addr}")
848
                        }
849
                        ready_tx.send(listen_addr).await.ok();
4✔
850
                    }
851
                    TransportEvent::Incoming { .. } => {
852
                        return;
2✔
853
                    }
854
                    _ => {}
855
                }
856
            }
857
        }
2✔
858

859
        async fn dialer<T: Provider>(mut ready_rx: mpsc::Receiver<Multiaddr>) {
2✔
860
            let dest_addr = ready_rx.next().await.unwrap();
2✔
861
            let mut tcp = Transport::<T>::default();
2✔
862
            tcp.dial(
2✔
863
                dest_addr,
2✔
864
                DialOpts {
2✔
865
                    role: Endpoint::Dialer,
2✔
866
                    port_use: PortUse::New,
2✔
867
                },
2✔
868
            )
2✔
869
            .unwrap()
2✔
870
            .await
2✔
871
            .unwrap();
2✔
872
        }
2✔
873

874
        fn test(addr: Multiaddr) {
2✔
875
            let (ready_tx, ready_rx) = mpsc::channel(1);
2✔
876
            let listener = listener::<tokio::Tcp>(addr, ready_tx);
2✔
877
            let dialer = dialer::<tokio::Tcp>(ready_rx);
2✔
878
            let rt = ::tokio::runtime::Builder::new_current_thread()
2✔
879
                .enable_io()
2✔
880
                .build()
2✔
881
                .unwrap();
2✔
882
            let tasks = ::tokio::task::LocalSet::new();
2✔
883
            let listener = tasks.spawn_local(listener);
2✔
884
            tasks.block_on(&rt, dialer);
2✔
885
            tasks.block_on(&rt, listener).unwrap();
2✔
886
        }
2✔
887

888
        test("/ip4/0.0.0.0/tcp/0".parse().unwrap());
1✔
889
        test("/ip6/::1/tcp/0".parse().unwrap());
1✔
890
    }
1✔
891

892
    #[test]
893
    fn port_reuse_dialing() {
1✔
894
        let _ = tracing_subscriber::fmt()
1✔
895
            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1✔
896
            .try_init();
1✔
897

898
        async fn listener<T: Provider>(
2✔
899
            addr: Multiaddr,
2✔
900
            mut ready_tx: mpsc::Sender<Multiaddr>,
2✔
901
            port_reuse_rx: oneshot::Receiver<Protocol<'_>>,
2✔
902
        ) {
2✔
903
            let mut tcp = Transport::<T>::new(Config::new()).boxed();
2✔
904
            tcp.listen_on(ListenerId::next(), addr).unwrap();
2✔
905
            loop {
906
                match tcp.select_next_some().await {
4✔
907
                    TransportEvent::NewAddress { listen_addr, .. } => {
2✔
908
                        ready_tx.send(listen_addr).await.ok();
2✔
909
                    }
910
                    TransportEvent::Incoming {
911
                        upgrade,
2✔
912
                        mut send_back_addr,
2✔
913
                        ..
914
                    } => {
915
                        // Receive the dialer tcp port reuse
916
                        let remote_port_reuse = port_reuse_rx.await.unwrap();
2✔
917
                        // And check it is the same as the remote port used for upgrade
918
                        assert_eq!(send_back_addr.pop().unwrap(), remote_port_reuse);
2✔
919

920
                        let mut upgrade = upgrade.await.unwrap();
2✔
921
                        let mut buf = [0u8; 3];
2✔
922
                        upgrade.read_exact(&mut buf).await.unwrap();
2✔
923
                        assert_eq!(buf, [1, 2, 3]);
2✔
924
                        upgrade.write_all(&[4, 5, 6]).await.unwrap();
2✔
925
                        return;
2✔
926
                    }
927
                    e => panic!("Unexpected event: {e:?}"),
928
                }
929
            }
930
        }
2✔
931

932
        async fn dialer<T: Provider>(
2✔
933
            addr: Multiaddr,
2✔
934
            mut ready_rx: mpsc::Receiver<Multiaddr>,
2✔
935
            port_reuse_tx: oneshot::Sender<Protocol<'_>>,
2✔
936
        ) {
2✔
937
            let dest_addr = ready_rx.next().await.unwrap();
2✔
938
            let mut tcp = Transport::<T>::new(Config::new());
2✔
939
            tcp.listen_on(ListenerId::next(), addr).unwrap();
2✔
940
            match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await {
2✔
941
                TransportEvent::NewAddress { .. } => {
942
                    // Check that tcp and listener share the same port reuse SocketAddr
943
                    let listener = tcp.listeners.iter().next().unwrap();
2✔
944
                    let port_reuse_tcp = tcp.port_reuse.local_dial_addr(&listener.listen_addr.ip());
2✔
945
                    let port_reuse_listener = listener
2✔
946
                        .port_reuse
2✔
947
                        .local_dial_addr(&listener.listen_addr.ip());
2✔
948
                    assert!(port_reuse_tcp.is_some());
2✔
949
                    assert_eq!(port_reuse_tcp, port_reuse_listener);
2✔
950

951
                    // Send the dialer tcp port reuse to the listener
952
                    port_reuse_tx
2✔
953
                        .send(Protocol::Tcp(port_reuse_tcp.unwrap().port()))
2✔
954
                        .ok();
2✔
955

956
                    // Obtain a future socket through dialing
957
                    let mut socket = tcp
2✔
958
                        .dial(
2✔
959
                            dest_addr,
2✔
960
                            DialOpts {
2✔
961
                                role: Endpoint::Dialer,
2✔
962
                                port_use: PortUse::Reuse,
2✔
963
                            },
2✔
964
                        )
2✔
965
                        .unwrap()
2✔
966
                        .await
2✔
967
                        .unwrap();
2✔
968
                    socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap();
2✔
969
                    // socket.flush().await;
970
                    let mut buf = [0u8; 3];
2✔
971
                    socket.read_exact(&mut buf).await.unwrap();
2✔
972
                    assert_eq!(buf, [4, 5, 6]);
2✔
973
                }
974
                e => panic!("Unexpected transport event: {e:?}"),
975
            }
976
        }
2✔
977

978
        fn test(addr: Multiaddr) {
2✔
979
            let (ready_tx, ready_rx) = mpsc::channel(1);
2✔
980
            let (port_reuse_tx, port_reuse_rx) = oneshot::channel();
2✔
981
            let listener = listener::<tokio::Tcp>(addr.clone(), ready_tx, port_reuse_rx);
2✔
982
            let dialer = dialer::<tokio::Tcp>(addr, ready_rx, port_reuse_tx);
2✔
983
            let rt = ::tokio::runtime::Builder::new_current_thread()
2✔
984
                .enable_io()
2✔
985
                .build()
2✔
986
                .unwrap();
2✔
987
            let tasks = ::tokio::task::LocalSet::new();
2✔
988
            let listener = tasks.spawn_local(listener);
2✔
989
            tasks.block_on(&rt, dialer);
2✔
990
            tasks.block_on(&rt, listener).unwrap();
2✔
991
        }
2✔
992

993
        test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
1✔
994
        test("/ip6/::1/tcp/0".parse().unwrap());
1✔
995
    }
1✔
996

997
    #[test]
998
    fn port_reuse_listening() {
1✔
999
        let _ = tracing_subscriber::fmt()
1✔
1000
            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1✔
1001
            .try_init();
1✔
1002

1003
        async fn listen_twice<T: Provider>(addr: Multiaddr) {
1✔
1004
            let mut tcp = Transport::<T>::new(Config::new());
1✔
1005
            tcp.listen_on(ListenerId::next(), addr).unwrap();
1✔
1006
            match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await {
1✔
1007
                TransportEvent::NewAddress {
1008
                    listen_addr: addr1, ..
1✔
1009
                } => {
1010
                    let listener1 = tcp.listeners.iter().next().unwrap();
1✔
1011
                    let port_reuse_tcp =
1✔
1012
                        tcp.port_reuse.local_dial_addr(&listener1.listen_addr.ip());
1✔
1013
                    let port_reuse_listener1 = listener1
1✔
1014
                        .port_reuse
1✔
1015
                        .local_dial_addr(&listener1.listen_addr.ip());
1✔
1016
                    assert!(port_reuse_tcp.is_some());
1✔
1017
                    assert_eq!(port_reuse_tcp, port_reuse_listener1);
1✔
1018

1019
                    // Listen on the same address a second time.
1020
                    tcp.listen_on(ListenerId::next(), addr1.clone()).unwrap();
1✔
1021
                    match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await {
1✔
1022
                        TransportEvent::NewAddress {
1023
                            listen_addr: addr2, ..
1✔
1024
                        } => assert_eq!(addr1, addr2),
1✔
1025
                        e => panic!("Unexpected transport event: {e:?}"),
1026
                    }
1027
                }
1028
                e => panic!("Unexpected transport event: {e:?}"),
1029
            }
1030
        }
1✔
1031

1032
        fn test(addr: Multiaddr) {
1✔
1033
            let listener = listen_twice::<tokio::Tcp>(addr);
1✔
1034
            let rt = ::tokio::runtime::Builder::new_current_thread()
1✔
1035
                .enable_io()
1✔
1036
                .build()
1✔
1037
                .unwrap();
1✔
1038
            rt.block_on(listener);
1✔
1039
        }
1✔
1040

1041
        test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
1✔
1042
    }
1✔
1043

1044
    #[test]
1045
    fn listen_port_0() {
1✔
1046
        let _ = tracing_subscriber::fmt()
1✔
1047
            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1✔
1048
            .try_init();
1✔
1049

1050
        async fn listen<T: Provider>(addr: Multiaddr) -> Multiaddr {
2✔
1051
            let mut tcp = Transport::<T>::default().boxed();
2✔
1052
            tcp.listen_on(ListenerId::next(), addr).unwrap();
2✔
1053
            tcp.select_next_some()
2✔
1054
                .await
2✔
1055
                .into_new_address()
2✔
1056
                .expect("listen address")
2✔
1057
        }
2✔
1058

1059
        fn test(addr: Multiaddr) {
2✔
1060
            let rt = ::tokio::runtime::Builder::new_current_thread()
2✔
1061
                .enable_io()
2✔
1062
                .build()
2✔
1063
                .unwrap();
2✔
1064
            let new_addr = rt.block_on(listen::<tokio::Tcp>(addr));
2✔
1065
            assert!(!new_addr.to_string().contains("tcp/0"));
2✔
1066
        }
2✔
1067

1068
        test("/ip6/::1/tcp/0".parse().unwrap());
1✔
1069
        test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
1✔
1070
    }
1✔
1071

1072
    #[test]
1073
    fn listen_invalid_addr() {
1✔
1074
        let _ = tracing_subscriber::fmt()
1✔
1075
            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1✔
1076
            .try_init();
1✔
1077

1078
        fn test(addr: Multiaddr) {
1✔
1079
            let mut tcp = tokio::Transport::default();
1✔
1080
            assert!(tcp.listen_on(ListenerId::next(), addr).is_err());
1✔
1081
        }
1✔
1082

1083
        test("/ip4/127.0.0.1/tcp/12345/tcp/12345".parse().unwrap());
1✔
1084
    }
1✔
1085

1086
    #[test]
1087
    fn test_remove_listener() {
1✔
1088
        let _ = tracing_subscriber::fmt()
1✔
1089
            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1✔
1090
            .try_init();
1✔
1091

1092
        async fn cycle_listeners<T: Provider>() -> bool {
1✔
1093
            let mut tcp = Transport::<T>::default().boxed();
1✔
1094
            let listener_id = ListenerId::next();
1✔
1095
            tcp.listen_on(listener_id, "/ip4/127.0.0.1/tcp/0".parse().unwrap())
1✔
1096
                .unwrap();
1✔
1097
            tcp.remove_listener(listener_id)
1✔
1098
        }
1✔
1099

1100
        #[cfg(feature = "tokio")]
1101
        {
1102
            let rt = ::tokio::runtime::Builder::new_current_thread()
1✔
1103
                .enable_io()
1✔
1104
                .build()
1✔
1105
                .unwrap();
1✔
1106
            assert!(rt.block_on(cycle_listeners::<tokio::Tcp>()));
1✔
1107
        }
1108
    }
1✔
1109

1110
    #[test]
1111
    fn test_listens_ipv4_ipv6_separately() {
1✔
1112
        fn test<T: Provider>() {
1✔
1113
            let port = {
1✔
1114
                let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1✔
1115
                listener.local_addr().unwrap().port()
1✔
1116
            };
1117
            let mut tcp = Transport::<T>::default().boxed();
1✔
1118
            let listener_id = ListenerId::next();
1✔
1119
            tcp.listen_on(
1✔
1120
                listener_id,
1✔
1121
                format!("/ip4/0.0.0.0/tcp/{port}").parse().unwrap(),
1✔
1122
            )
1123
            .unwrap();
1✔
1124
            tcp.listen_on(
1✔
1125
                ListenerId::next(),
1✔
1126
                format!("/ip6/::/tcp/{port}").parse().unwrap(),
1✔
1127
            )
1128
            .unwrap();
1✔
1129
        }
1✔
1130
        #[cfg(feature = "tokio")]
1131
        {
1132
            let rt = ::tokio::runtime::Builder::new_current_thread()
1✔
1133
                .enable_io()
1✔
1134
                .build()
1✔
1135
                .unwrap();
1✔
1136
            rt.block_on(async {
1✔
1137
                test::<tokio::Tcp>();
1✔
1138
            });
1✔
1139
        }
1140
    }
1✔
1141
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc