• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

butlergroup / rust-libp2p / 18610913338

18 Oct 2025 04:41AM UTC coverage: 78.379% (+2.5%) from 75.842%
18610913338

push

github

butlergroup
	modified:   .github/workflows/ci.yml

36944 of 47135 relevant lines covered (78.38%)

37728.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.75
/swarm/src/lib.rs
1
// Copyright 2019 Parity Technologies (UK) Ltd.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a
4
// copy of this software and associated documentation files (the "Software"),
5
// to deal in the Software without restriction, including without limitation
6
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7
// and/or sell copies of the Software, and to permit persons to whom the
8
// Software is furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19
// DEALINGS IN THE SOFTWARE.
20

21
//! High-level network manager.
22
//!
23
//! A [`Swarm`] contains the state of the network as a whole. The entire
24
//! behaviour of a libp2p network can be controlled through the `Swarm`.
25
//! The `Swarm` struct contains all active and pending connections to
26
//! remotes and manages the state of all the substreams that have been
27
//! opened, and all the upgrades that were built upon these substreams.
28
//!
29
//! # Initializing a Swarm
30
//!
31
//! Creating a `Swarm` requires three things:
32
//!
33
//!  1. A network identity of the local node in form of a [`PeerId`].
34
//!  2. An implementation of the [`Transport`] trait. This is the type that will be used in order to
35
//!     reach nodes on the network based on their address. See the `transport` module for more
36
//!     information.
37
//!  3. An implementation of the [`NetworkBehaviour`] trait. This is a state machine that defines
38
//!     how the swarm should behave once it is connected to a node.
39
//!
40
//! # Network Behaviour
41
//!
42
//! The [`NetworkBehaviour`] trait is implemented on types that indicate to
43
//! the swarm how it should behave. This includes which protocols are supported
44
//! and which nodes to try to connect to. It is the `NetworkBehaviour` that
45
//! controls what happens on the network. Multiple types that implement
46
//! `NetworkBehaviour` can be composed into a single behaviour.
47
//!
48
//! # Protocols Handler
49
//!
50
//! The [`ConnectionHandler`] trait defines how each active connection to a
51
//! remote should behave: how to handle incoming substreams, which protocols
52
//! are supported, when to open a new outbound substream, etc.
53

54
#![cfg_attr(docsrs, feature(doc_cfg))]
55

56
mod connection;
57
mod executor;
58
mod stream;
59
mod stream_protocol;
60
#[cfg(test)]
61
mod test;
62
mod upgrade;
63

64
pub mod behaviour;
65
pub mod dial_opts;
66
pub mod dummy;
67
pub mod handler;
68
mod listen_opts;
69
mod translation;
70

71
/// Bundles all symbols required for the [`libp2p_swarm_derive::NetworkBehaviour`] macro.
72
#[doc(hidden)]
73
pub mod derive_prelude {
74
    pub use either::Either;
75
    pub use futures::prelude as futures;
76
    pub use libp2p_core::{
77
        transport::{ListenerId, PortUse},
78
        ConnectedPoint, Endpoint, Multiaddr,
79
    };
80
    pub use libp2p_identity::PeerId;
81

82
    pub use crate::{
83
        behaviour::{
84
            AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, ExpiredListenAddr,
85
            ExternalAddrConfirmed, ExternalAddrExpired, FromSwarm, ListenFailure, ListenerClosed,
86
            ListenerError, NewExternalAddrCandidate, NewExternalAddrOfPeer, NewListenAddr,
87
            NewListener,
88
        },
89
        connection::ConnectionId,
90
        ConnectionDenied, ConnectionHandler, ConnectionHandlerSelect, DialError, NetworkBehaviour,
91
        THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
92
    };
93
}
94

95
use std::{
96
    collections::{HashMap, HashSet, VecDeque},
97
    error, fmt, io,
98
    num::{NonZeroU32, NonZeroU8, NonZeroUsize},
99
    pin::Pin,
100
    task::{Context, Poll},
101
    time::Duration,
102
};
103

104
pub use behaviour::{
105
    AddressChange, CloseConnection, ConnectionClosed, DialFailure, ExpiredListenAddr,
106
    ExternalAddrExpired, ExternalAddresses, FromSwarm, ListenAddresses, ListenFailure,
107
    ListenerClosed, ListenerError, NetworkBehaviour, NewExternalAddrCandidate,
108
    NewExternalAddrOfPeer, NewListenAddr, NotifyHandler, PeerAddresses, ToSwarm,
109
};
110
pub use connection::{pool::ConnectionCounters, ConnectionError, ConnectionId, SupportedProtocols};
111
use connection::{
112
    pool::{EstablishedConnection, Pool, PoolConfig, PoolEvent},
113
    IncomingInfo, PendingInboundConnectionError, PendingOutboundConnectionError,
114
};
115
use dial_opts::{DialOpts, PeerCondition};
116
pub use executor::Executor;
117
use futures::{prelude::*, stream::FusedStream};
118
pub use handler::{
119
    ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, OneShotHandler,
120
    OneShotHandlerConfig, StreamUpgradeError, SubstreamProtocol,
121
};
122
use libp2p_core::{
123
    connection::ConnectedPoint,
124
    muxing::StreamMuxerBox,
125
    transport::{self, ListenerId, TransportError, TransportEvent},
126
    Multiaddr, Transport,
127
};
128
use libp2p_identity::PeerId;
129
#[cfg(feature = "macros")]
130
pub use libp2p_swarm_derive::NetworkBehaviour;
131
pub use listen_opts::ListenOpts;
132
use smallvec::SmallVec;
133
pub use stream::Stream;
134
pub use stream_protocol::{InvalidProtocol, StreamProtocol};
135
use tracing::Instrument;
136
#[doc(hidden)]
137
pub use translation::_address_translation;
138

139
use crate::{behaviour::ExternalAddrConfirmed, handler::UpgradeInfoSend};
140

141
/// Event generated by the [`NetworkBehaviour`] that the swarm will report back.
142
type TBehaviourOutEvent<TBehaviour> = <TBehaviour as NetworkBehaviour>::ToSwarm;
143

144
/// [`ConnectionHandler`] of the [`NetworkBehaviour`] for all the protocols the [`NetworkBehaviour`]
145
/// supports.
146
pub type THandler<TBehaviour> = <TBehaviour as NetworkBehaviour>::ConnectionHandler;
147

148
/// Custom event that can be received by the [`ConnectionHandler`] of the
149
/// [`NetworkBehaviour`].
150
pub type THandlerInEvent<TBehaviour> = <THandler<TBehaviour> as ConnectionHandler>::FromBehaviour;
151

152
/// Custom event that can be produced by the [`ConnectionHandler`] of the [`NetworkBehaviour`].
153
pub type THandlerOutEvent<TBehaviour> = <THandler<TBehaviour> as ConnectionHandler>::ToBehaviour;
154

155
/// Event generated by the `Swarm`.
156
#[derive(Debug)]
157
#[non_exhaustive]
158
pub enum SwarmEvent<TBehaviourOutEvent> {
159
    /// Event generated by the `NetworkBehaviour`.
160
    Behaviour(TBehaviourOutEvent),
161
    /// A connection to the given peer has been opened.
162
    ConnectionEstablished {
163
        /// Identity of the peer that we have connected to.
164
        peer_id: PeerId,
165
        /// Identifier of the connection.
166
        connection_id: ConnectionId,
167
        /// Endpoint of the connection that has been opened.
168
        endpoint: ConnectedPoint,
169
        /// Number of established connections to this peer, including the one that has just been
170
        /// opened.
171
        num_established: NonZeroU32,
172
        /// [`Some`] when the new connection is an outgoing connection.
173
        /// Addresses are dialed concurrently. Contains the addresses and errors
174
        /// of dial attempts that failed before the one successful dial.
175
        concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<io::Error>)>>,
176
        /// How long it took to establish this connection
177
        established_in: std::time::Duration,
178
    },
179
    /// A connection with the given peer has been closed,
180
    /// possibly as a result of an error.
181
    ConnectionClosed {
182
        /// Identity of the peer that we have connected to.
183
        peer_id: PeerId,
184
        /// Identifier of the connection.
185
        connection_id: ConnectionId,
186
        /// Endpoint of the connection that has been closed.
187
        endpoint: ConnectedPoint,
188
        /// Number of other remaining connections to this same peer.
189
        num_established: u32,
190
        /// Reason for the disconnection, if it was not a successful
191
        /// active close.
192
        cause: Option<ConnectionError>,
193
    },
194
    /// A new connection arrived on a listener and is in the process of protocol negotiation.
195
    ///
196
    /// A corresponding [`ConnectionEstablished`](SwarmEvent::ConnectionEstablished) or
197
    /// [`IncomingConnectionError`](SwarmEvent::IncomingConnectionError) event will later be
198
    /// generated for this connection.
199
    IncomingConnection {
200
        /// Identifier of the connection.
201
        connection_id: ConnectionId,
202
        /// Local connection address.
203
        /// This address has been earlier reported with a
204
        /// [`NewListenAddr`](SwarmEvent::NewListenAddr) event.
205
        local_addr: Multiaddr,
206
        /// Address used to send back data to the remote.
207
        send_back_addr: Multiaddr,
208
    },
209
    /// An error happened on an inbound connection during its initial handshake.
210
    ///
211
    /// This can include, for example, an error during the handshake of the encryption layer, or
212
    /// the connection unexpectedly closed.
213
    IncomingConnectionError {
214
        /// Identifier of the connection.
215
        connection_id: ConnectionId,
216
        /// Local connection address.
217
        /// This address has been earlier reported with a
218
        /// [`NewListenAddr`](SwarmEvent::NewListenAddr) event.
219
        local_addr: Multiaddr,
220
        /// Address used to send back data to the remote.
221
        send_back_addr: Multiaddr,
222
        /// The error that happened.
223
        error: ListenError,
224
        /// If known, [`PeerId`] of the peer that tried to connect to us.
225
        peer_id: Option<PeerId>,
226
    },
227
    /// An error happened on an outbound connection.
228
    OutgoingConnectionError {
229
        /// Identifier of the connection.
230
        connection_id: ConnectionId,
231
        /// If known, [`PeerId`] of the peer we tried to reach.
232
        peer_id: Option<PeerId>,
233
        /// Error that has been encountered.
234
        error: DialError,
235
    },
236
    /// One of our listeners has reported a new local listening address.
237
    NewListenAddr {
238
        /// The listener that is listening on the new address.
239
        listener_id: ListenerId,
240
        /// The new address that is being listened on.
241
        address: Multiaddr,
242
    },
243
    /// One of our listeners has reported the expiration of a listening address.
244
    ExpiredListenAddr {
245
        /// The listener that is no longer listening on the address.
246
        listener_id: ListenerId,
247
        /// The expired address.
248
        address: Multiaddr,
249
    },
250
    /// One of the listeners gracefully closed.
251
    ListenerClosed {
252
        /// The listener that closed.
253
        listener_id: ListenerId,
254
        /// The addresses that the listener was listening on. These addresses are now considered
255
        /// expired, similar to if a [`ExpiredListenAddr`](SwarmEvent::ExpiredListenAddr) event
256
        /// has been generated for each of them.
257
        addresses: Vec<Multiaddr>,
258
        /// Reason for the closure. Contains `Ok(())` if the stream produced `None`, or `Err`
259
        /// if the stream produced an error.
260
        reason: Result<(), io::Error>,
261
    },
262
    /// One of the listeners reported a non-fatal error.
263
    ListenerError {
264
        /// The listener that errored.
265
        listener_id: ListenerId,
266
        /// The listener error.
267
        error: io::Error,
268
    },
269
    /// A new dialing attempt has been initiated by the [`NetworkBehaviour`]
270
    /// implementation.
271
    ///
272
    /// A [`ConnectionEstablished`](SwarmEvent::ConnectionEstablished) event is
273
    /// reported if the dialing attempt succeeds, otherwise a
274
    /// [`OutgoingConnectionError`](SwarmEvent::OutgoingConnectionError) event
275
    /// is reported.
276
    Dialing {
277
        /// Identity of the peer that we are connecting to.
278
        peer_id: Option<PeerId>,
279

280
        /// Identifier of the connection.
281
        connection_id: ConnectionId,
282
    },
283
    /// We have discovered a new candidate for an external address for us.
284
    NewExternalAddrCandidate { address: Multiaddr },
285
    /// An external address of the local node was confirmed.
286
    ExternalAddrConfirmed { address: Multiaddr },
287
    /// An external address of the local node expired, i.e. is no-longer confirmed.
288
    ExternalAddrExpired { address: Multiaddr },
289
    /// We have discovered a new address of a peer.
290
    NewExternalAddrOfPeer { peer_id: PeerId, address: Multiaddr },
291
}
292

293
impl<TBehaviourOutEvent> SwarmEvent<TBehaviourOutEvent> {
294
    /// Extract the `TBehaviourOutEvent` from this [`SwarmEvent`] in case it is the `Behaviour`
295
    /// variant, otherwise fail.
296
    #[allow(clippy::result_large_err)]
297
    pub fn try_into_behaviour_event(self) -> Result<TBehaviourOutEvent, Self> {
788✔
298
        match self {
788✔
299
            SwarmEvent::Behaviour(inner) => Ok(inner),
593✔
300
            other => Err(other),
195✔
301
        }
302
    }
788✔
303
}
304

305
/// Contains the state of the network, plus the way it should behave.
306
///
307
/// Note: Needs to be polled via `<Swarm as Stream>` in order to make
308
/// progress.
309
pub struct Swarm<TBehaviour>
310
where
311
    TBehaviour: NetworkBehaviour,
312
{
313
    /// [`Transport`] for dialing remote peers and listening for incoming connection.
314
    transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
315

316
    /// The nodes currently active.
317
    pool: Pool<THandler<TBehaviour>>,
318

319
    /// The local peer ID.
320
    local_peer_id: PeerId,
321

322
    /// Handles which nodes to connect to and how to handle the events sent back by the protocol
323
    /// handlers.
324
    behaviour: TBehaviour,
325

326
    /// List of protocols that the behaviour says it supports.
327
    supported_protocols: SmallVec<[Vec<u8>; 16]>,
328

329
    confirmed_external_addr: HashSet<Multiaddr>,
330

331
    /// Multiaddresses that our listeners are listening on,
332
    listened_addrs: HashMap<ListenerId, SmallVec<[Multiaddr; 1]>>,
333

334
    /// Pending event to be delivered to connection handlers
335
    /// (or dropped if the peer disconnected) before the `behaviour`
336
    /// can be polled again.
337
    pending_handler_event: Option<(PeerId, PendingNotifyHandler, THandlerInEvent<TBehaviour>)>,
338

339
    pending_swarm_events: VecDeque<SwarmEvent<TBehaviour::ToSwarm>>,
340
}
341

342
impl<TBehaviour> Unpin for Swarm<TBehaviour> where TBehaviour: NetworkBehaviour {}
343

344
impl<TBehaviour> Swarm<TBehaviour>
345
where
346
    TBehaviour: NetworkBehaviour,
347
{
348
    /// Creates a new [`Swarm`] from the given [`Transport`], [`NetworkBehaviour`], [`PeerId`] and
349
    /// [`Config`].
350
    pub fn new(
8,637✔
351
        transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
8,637✔
352
        behaviour: TBehaviour,
8,637✔
353
        local_peer_id: PeerId,
8,637✔
354
        config: Config,
8,637✔
355
    ) -> Self {
8,637✔
356
        tracing::info!(%local_peer_id);
8,637✔
357

358
        Swarm {
8,637✔
359
            local_peer_id,
8,637✔
360
            transport,
8,637✔
361
            pool: Pool::new(local_peer_id, config.pool_config),
8,637✔
362
            behaviour,
8,637✔
363
            supported_protocols: Default::default(),
8,637✔
364
            confirmed_external_addr: Default::default(),
8,637✔
365
            listened_addrs: HashMap::new(),
8,637✔
366
            pending_handler_event: None,
8,637✔
367
            pending_swarm_events: VecDeque::default(),
8,637✔
368
        }
8,637✔
369
    }
8,637✔
370

371
    /// Returns information about the connections underlying the [`Swarm`].
372
    pub fn network_info(&self) -> NetworkInfo {
3✔
373
        let num_peers = self.pool.num_peers();
3✔
374
        let connection_counters = self.pool.counters().clone();
3✔
375
        NetworkInfo {
3✔
376
            num_peers,
3✔
377
            connection_counters,
3✔
378
        }
3✔
379
    }
3✔
380

381
    /// Starts listening on the given address.
382
    /// Returns an error if the address is not supported.
383
    ///
384
    /// Listeners report their new listening addresses as [`SwarmEvent::NewListenAddr`].
385
    /// Depending on the underlying transport, one listener may have multiple listening addresses.
386
    pub fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> {
8,407✔
387
        let opts = ListenOpts::new(addr);
8,407✔
388
        let id = opts.listener_id();
8,407✔
389
        self.add_listener(opts)?;
8,407✔
390
        Ok(id)
8,407✔
391
    }
8,407✔
392

393
    /// Remove some listener.
394
    ///
395
    /// Returns `true` if there was a listener with this ID, `false`
396
    /// otherwise.
397
    pub fn remove_listener(&mut self, listener_id: ListenerId) -> bool {
1✔
398
        self.transport.remove_listener(listener_id)
1✔
399
    }
1✔
400

401
    /// Dial a known or unknown peer.
402
    ///
403
    /// See also [`DialOpts`].
404
    ///
405
    /// ```
406
    /// # use libp2p_swarm::Swarm;
407
    /// # use libp2p_swarm::dial_opts::{DialOpts, PeerCondition};
408
    /// # use libp2p_core::{Multiaddr, Transport};
409
    /// # use libp2p_core::transport::dummy::DummyTransport;
410
    /// # use libp2p_swarm::dummy;
411
    /// # use libp2p_identity::PeerId;
412
    /// #
413
    /// # #[tokio::main]
414
    /// # async fn main() {
415
    /// let mut swarm = build_swarm();
416
    ///
417
    /// // Dial a known peer.
418
    /// swarm.dial(PeerId::random());
419
    ///
420
    /// // Dial an unknown peer.
421
    /// swarm.dial("/ip6/::1/tcp/12345".parse::<Multiaddr>().unwrap());
422
    /// # }
423
    ///
424
    /// # fn build_swarm() -> Swarm<dummy::Behaviour> {
425
    /// #     Swarm::new(DummyTransport::new().boxed(), dummy::Behaviour, PeerId::random(), libp2p_swarm::Config::with_tokio_executor())
426
    /// # }
427
    /// ```
428
    pub fn dial(&mut self, opts: impl Into<DialOpts>) -> Result<(), DialError> {
17,034✔
429
        let dial_opts = opts.into();
17,034✔
430

431
        let peer_id = dial_opts.get_peer_id();
17,034✔
432
        let condition = dial_opts.peer_condition();
17,034✔
433
        let connection_id = dial_opts.connection_id();
17,034✔
434

435
        let should_dial = match (condition, peer_id) {
17,034✔
436
            (_, None) => true,
671✔
437
            (PeerCondition::Always, _) => true,
2,990✔
438
            (PeerCondition::Disconnected, Some(peer_id)) => !self.pool.is_connected(peer_id),
×
439
            (PeerCondition::NotDialing, Some(peer_id)) => !self.pool.is_dialing(peer_id),
×
440
            (PeerCondition::DisconnectedAndNotDialing, Some(peer_id)) => {
13,373✔
441
                !self.pool.is_dialing(peer_id) && !self.pool.is_connected(peer_id)
13,373✔
442
            }
443
        };
444

445
        if !should_dial {
17,034✔
446
            let e = DialError::DialPeerConditionFalse(condition);
1,152✔
447

448
            self.behaviour
1,152✔
449
                .on_swarm_event(FromSwarm::DialFailure(DialFailure {
1,152✔
450
                    peer_id,
1,152✔
451
                    error: &e,
1,152✔
452
                    connection_id,
1,152✔
453
                }));
1,152✔
454

455
            return Err(e);
1,152✔
456
        }
15,882✔
457

458
        let addresses = {
15,858✔
459
            let mut addresses_from_opts = dial_opts.get_addresses();
15,882✔
460

461
            match self.behaviour.handle_pending_outbound_connection(
15,882✔
462
                connection_id,
15,882✔
463
                peer_id,
15,882✔
464
                addresses_from_opts.as_slice(),
15,882✔
465
                dial_opts.role_override(),
15,882✔
466
            ) {
15,882✔
467
                Ok(addresses) => {
15,867✔
468
                    if dial_opts.extend_addresses_through_behaviour() {
15,867✔
469
                        addresses_from_opts.extend(addresses)
12,185✔
470
                    } else {
471
                        let num_addresses = addresses.len();
3,682✔
472

473
                        if num_addresses > 0 {
3,682✔
474
                            tracing::debug!(
29✔
475
                                connection=%connection_id,
476
                                discarded_addresses_count=%num_addresses,
477
                                "discarding addresses from `NetworkBehaviour` because `DialOpts::extend_addresses_through_behaviour is `false` for connection"
×
478
                            )
479
                        }
3,653✔
480
                    }
481
                }
482
                Err(cause) => {
15✔
483
                    let error = DialError::Denied { cause };
15✔
484

485
                    self.behaviour
15✔
486
                        .on_swarm_event(FromSwarm::DialFailure(DialFailure {
15✔
487
                            peer_id,
15✔
488
                            error: &error,
15✔
489
                            connection_id,
15✔
490
                        }));
15✔
491

492
                    return Err(error);
15✔
493
                }
494
            }
495

496
            let mut unique_addresses = HashSet::new();
15,867✔
497
            addresses_from_opts.retain(|addr| {
16,871✔
498
                !self.listened_addrs.values().flatten().any(|a| a == addr)
16,995✔
499
                    && unique_addresses.insert(addr.clone())
16,862✔
500
            });
16,862✔
501

502
            if addresses_from_opts.is_empty() {
15,867✔
503
                let error = DialError::NoAddresses;
9✔
504
                self.behaviour
9✔
505
                    .on_swarm_event(FromSwarm::DialFailure(DialFailure {
9✔
506
                        peer_id,
9✔
507
                        error: &error,
9✔
508
                        connection_id,
9✔
509
                    }));
9✔
510
                return Err(error);
9✔
511
            };
15,858✔
512

513
            addresses_from_opts
15,858✔
514
        };
515

516
        let dials = addresses
15,858✔
517
            .into_iter()
15,858✔
518
            .map(|a| match peer_id.map_or(Ok(a.clone()), |p| a.with_p2p(p)) {
15,932✔
519
                Ok(address) => {
15,932✔
520
                    let dial = self.transport.dial(
15,932✔
521
                        address.clone(),
15,932✔
522
                        transport::DialOpts {
15,932✔
523
                            role: dial_opts.role_override(),
15,932✔
524
                            port_use: dial_opts.port_use(),
15,932✔
525
                        },
15,932✔
526
                    );
527
                    let span = tracing::debug_span!(parent: tracing::Span::none(), "Transport::dial", %address);
15,932✔
528
                    span.follows_from(tracing::Span::current());
15,932✔
529
                    match dial {
15,932✔
530
                        Ok(fut) => fut
15,878✔
531
                            .map(|r| (address, r.map_err(TransportError::Other)))
15,878✔
532
                            .instrument(span)
15,878✔
533
                            .boxed(),
15,878✔
534
                        Err(err) => futures::future::ready((address, Err(err))).boxed(),
54✔
535
                    }
536
                }
537
                Err(address) => futures::future::ready((
×
538
                    address.clone(),
×
539
                    Err(TransportError::MultiaddrNotSupported(address)),
×
540
                ))
×
541
                .boxed(),
×
542
            })
15,932✔
543
            .collect();
15,858✔
544

545
        self.pool.add_outgoing(
15,858✔
546
            dials,
15,858✔
547
            peer_id,
15,858✔
548
            dial_opts.role_override(),
15,858✔
549
            dial_opts.port_use(),
15,858✔
550
            dial_opts.dial_concurrency_override(),
15,858✔
551
            connection_id,
15,858✔
552
        );
553

554
        Ok(())
15,858✔
555
    }
17,034✔
556

557
    /// Returns an iterator that produces the list of addresses we're listening on.
558
    pub fn listeners(&self) -> impl Iterator<Item = &Multiaddr> {
3✔
559
        self.listened_addrs.values().flatten()
3✔
560
    }
3✔
561

562
    /// Returns the peer ID of the swarm passed as parameter.
563
    pub fn local_peer_id(&self) -> &PeerId {
14,951✔
564
        &self.local_peer_id
14,951✔
565
    }
14,951✔
566

567
    /// List all **confirmed** external address for the local node.
568
    pub fn external_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
2,925✔
569
        self.confirmed_external_addr.iter()
2,925✔
570
    }
2,925✔
571

572
    fn add_listener(&mut self, opts: ListenOpts) -> Result<(), TransportError<io::Error>> {
8,408✔
573
        let addr = opts.address();
8,408✔
574
        let listener_id = opts.listener_id();
8,408✔
575

576
        if let Err(e) = self.transport.listen_on(listener_id, addr.clone()) {
8,408✔
577
            self.behaviour
×
578
                .on_swarm_event(FromSwarm::ListenerError(behaviour::ListenerError {
×
579
                    listener_id,
×
580
                    err: &e,
×
581
                }));
×
582

583
            return Err(e);
×
584
        }
8,408✔
585

586
        self.behaviour
8,408✔
587
            .on_swarm_event(FromSwarm::NewListener(behaviour::NewListener {
8,408✔
588
                listener_id,
8,408✔
589
            }));
8,408✔
590

591
        Ok(())
8,408✔
592
    }
8,408✔
593

594
    /// Add a **confirmed** external address for the local node.
595
    ///
596
    /// This function should only be called with addresses that are guaranteed to be reachable.
597
    /// The address is broadcast to all [`NetworkBehaviour`]s via
598
    /// [`FromSwarm::ExternalAddrConfirmed`].
599
    pub fn add_external_address(&mut self, a: Multiaddr) {
7,536✔
600
        self.behaviour
7,536✔
601
            .on_swarm_event(FromSwarm::ExternalAddrConfirmed(ExternalAddrConfirmed {
7,536✔
602
                addr: &a,
7,536✔
603
            }));
7,536✔
604
        self.confirmed_external_addr.insert(a);
7,536✔
605
    }
7,536✔
606

607
    /// Remove an external address for the local node.
608
    ///
609
    /// The address is broadcast to all [`NetworkBehaviour`]s via
610
    /// [`FromSwarm::ExternalAddrExpired`].
611
    pub fn remove_external_address(&mut self, addr: &Multiaddr) {
×
612
        self.behaviour
×
613
            .on_swarm_event(FromSwarm::ExternalAddrExpired(ExternalAddrExpired { addr }));
×
614
        self.confirmed_external_addr.remove(addr);
×
615
    }
×
616

617
    /// Add a new external address of a remote peer.
618
    ///
619
    /// The address is broadcast to all [`NetworkBehaviour`]s via
620
    /// [`FromSwarm::NewExternalAddrOfPeer`].
621
    pub fn add_peer_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
2✔
622
        self.behaviour
2✔
623
            .on_swarm_event(FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer {
2✔
624
                peer_id,
2✔
625
                addr: &addr,
2✔
626
            }))
2✔
627
    }
2✔
628

629
    /// Disconnects a peer by its peer ID, closing all connections to said peer.
630
    ///
631
    /// Returns `Ok(())` if there was one or more established connections to the peer.
632
    ///
633
    /// Closing a connection via [`Swarm::disconnect_peer_id`] will poll
634
    /// [`ConnectionHandler::poll_close`] to completion. Use this function if you want to close
635
    /// a connection _despite_ it still being in use by one or more handlers.
636
    #[allow(clippy::result_unit_err)]
637
    pub fn disconnect_peer_id(&mut self, peer_id: PeerId) -> Result<(), ()> {
5✔
638
        let was_connected = self.pool.is_connected(peer_id);
5✔
639
        self.pool.disconnect(peer_id);
5✔
640

641
        if was_connected {
5✔
642
            Ok(())
4✔
643
        } else {
644
            Err(())
1✔
645
        }
646
    }
5✔
647

648
    /// Attempt to gracefully close a connection.
649
    ///
650
    /// Closing a connection is asynchronous but this function will return immediately.
651
    /// A [`SwarmEvent::ConnectionClosed`] event will be emitted
652
    /// once the connection is actually closed.
653
    ///
654
    /// # Returns
655
    ///
656
    /// - `true` if the connection was established and is now being closed.
657
    /// - `false` if the connection was not found or is no longer established.
658
    pub fn close_connection(&mut self, connection_id: ConnectionId) -> bool {
×
659
        if let Some(established) = self.pool.get_established(connection_id) {
×
660
            established.start_close();
×
661
            return true;
×
662
        }
×
663

664
        false
×
665
    }
×
666

667
    /// Checks whether there is an established connection to a peer.
668
    pub fn is_connected(&self, peer_id: &PeerId) -> bool {
15✔
669
        self.pool.is_connected(*peer_id)
15✔
670
    }
15✔
671

672
    /// Returns the currently connected peers.
673
    pub fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
×
674
        self.pool.iter_connected()
×
675
    }
×
676

677
    /// Returns a reference to the provided [`NetworkBehaviour`].
678
    pub fn behaviour(&self) -> &TBehaviour {
5,401✔
679
        &self.behaviour
5,401✔
680
    }
5,401✔
681

682
    /// Returns a mutable reference to the provided [`NetworkBehaviour`].
683
    pub fn behaviour_mut(&mut self) -> &mut TBehaviour {
9,440✔
684
        &mut self.behaviour
9,440✔
685
    }
9,440✔
686

687
    fn handle_pool_event(&mut self, event: PoolEvent<THandlerOutEvent<TBehaviour>>) {
85,662✔
688
        match event {
85,662✔
689
            PoolEvent::ConnectionEstablished {
690
                peer_id,
29,275✔
691
                id,
29,275✔
692
                endpoint,
29,275✔
693
                connection,
29,275✔
694
                concurrent_dial_errors,
29,275✔
695
                established_in,
29,275✔
696
            } => {
697
                let handler = match endpoint.clone() {
29,275✔
698
                    ConnectedPoint::Dialer {
699
                        address,
14,469✔
700
                        role_override,
14,469✔
701
                        port_use,
14,469✔
702
                    } => {
703
                        match self.behaviour.handle_established_outbound_connection(
14,469✔
704
                            id,
14,469✔
705
                            peer_id,
14,469✔
706
                            &address,
14,469✔
707
                            role_override,
14,469✔
708
                            port_use,
14,469✔
709
                        ) {
14,469✔
710
                            Ok(handler) => handler,
14,466✔
711
                            Err(cause) => {
3✔
712
                                let dial_error = DialError::Denied { cause };
3✔
713
                                self.behaviour.on_swarm_event(FromSwarm::DialFailure(
3✔
714
                                    DialFailure {
3✔
715
                                        connection_id: id,
3✔
716
                                        error: &dial_error,
3✔
717
                                        peer_id: Some(peer_id),
3✔
718
                                    },
3✔
719
                                ));
3✔
720

721
                                self.pending_swarm_events.push_back(
3✔
722
                                    SwarmEvent::OutgoingConnectionError {
3✔
723
                                        peer_id: Some(peer_id),
3✔
724
                                        connection_id: id,
3✔
725
                                        error: dial_error,
3✔
726
                                    },
3✔
727
                                );
728
                                return;
3✔
729
                            }
730
                        }
731
                    }
732
                    ConnectedPoint::Listener {
733
                        local_addr,
14,806✔
734
                        send_back_addr,
14,806✔
735
                    } => {
736
                        match self.behaviour.handle_established_inbound_connection(
14,806✔
737
                            id,
14,806✔
738
                            peer_id,
14,806✔
739
                            &local_addr,
14,806✔
740
                            &send_back_addr,
14,806✔
741
                        ) {
14,806✔
742
                            Ok(handler) => handler,
14,494✔
743
                            Err(cause) => {
312✔
744
                                let listen_error = ListenError::Denied { cause };
312✔
745
                                self.behaviour.on_swarm_event(FromSwarm::ListenFailure(
312✔
746
                                    ListenFailure {
312✔
747
                                        local_addr: &local_addr,
312✔
748
                                        send_back_addr: &send_back_addr,
312✔
749
                                        error: &listen_error,
312✔
750
                                        connection_id: id,
312✔
751
                                        peer_id: Some(peer_id),
312✔
752
                                    },
312✔
753
                                ));
312✔
754

755
                                self.pending_swarm_events.push_back(
312✔
756
                                    SwarmEvent::IncomingConnectionError {
312✔
757
                                        connection_id: id,
312✔
758
                                        send_back_addr,
312✔
759
                                        local_addr,
312✔
760
                                        error: listen_error,
312✔
761
                                        peer_id: Some(peer_id),
312✔
762
                                    },
312✔
763
                                );
764
                                return;
312✔
765
                            }
766
                        }
767
                    }
768
                };
769

770
                let supported_protocols = handler
28,960✔
771
                    .listen_protocol()
28,960✔
772
                    .upgrade()
28,960✔
773
                    .protocol_info()
28,960✔
774
                    .map(|p| p.as_ref().as_bytes().to_vec())
29,109✔
775
                    .collect();
28,960✔
776
                let other_established_connection_ids = self
28,960✔
777
                    .pool
28,960✔
778
                    .iter_established_connections_of_peer(&peer_id)
28,960✔
779
                    .collect::<Vec<_>>();
28,960✔
780
                let num_established = NonZeroU32::new(
28,960✔
781
                    u32::try_from(other_established_connection_ids.len() + 1).unwrap(),
28,960✔
782
                )
783
                .expect("n + 1 is always non-zero; qed");
28,960✔
784

785
                self.pool
28,960✔
786
                    .spawn_connection(id, peer_id, &endpoint, connection, handler);
28,960✔
787

788
                tracing::debug!(
28,960✔
789
                    peer=%peer_id,
790
                    ?endpoint,
791
                    total_peers=%num_established,
792
                    "Connection established"
4✔
793
                );
794
                let failed_addresses = concurrent_dial_errors
28,960✔
795
                    .as_ref()
28,960✔
796
                    .map(|es| {
28,960✔
797
                        es.iter()
14,466✔
798
                            .map(|(a, _)| a)
14,466✔
799
                            .cloned()
14,466✔
800
                            .collect::<Vec<Multiaddr>>()
14,466✔
801
                    })
14,466✔
802
                    .unwrap_or_default();
28,960✔
803
                self.behaviour
28,960✔
804
                    .on_swarm_event(FromSwarm::ConnectionEstablished(
28,960✔
805
                        behaviour::ConnectionEstablished {
28,960✔
806
                            peer_id,
28,960✔
807
                            connection_id: id,
28,960✔
808
                            endpoint: &endpoint,
28,960✔
809
                            failed_addresses: &failed_addresses,
28,960✔
810
                            other_established: other_established_connection_ids.len(),
28,960✔
811
                        },
28,960✔
812
                    ));
28,960✔
813
                self.supported_protocols = supported_protocols;
28,960✔
814
                self.pending_swarm_events
28,960✔
815
                    .push_back(SwarmEvent::ConnectionEstablished {
28,960✔
816
                        peer_id,
28,960✔
817
                        connection_id: id,
28,960✔
818
                        num_established,
28,960✔
819
                        endpoint,
28,960✔
820
                        concurrent_dial_errors,
28,960✔
821
                        established_in,
28,960✔
822
                    });
28,960✔
823
            }
824
            PoolEvent::PendingOutboundConnectionError {
825
                id: connection_id,
64✔
826
                error,
64✔
827
                peer,
64✔
828
            } => {
829
                let error = error.into();
64✔
830

831
                self.behaviour
64✔
832
                    .on_swarm_event(FromSwarm::DialFailure(DialFailure {
64✔
833
                        peer_id: peer,
64✔
834
                        error: &error,
64✔
835
                        connection_id,
64✔
836
                    }));
64✔
837

838
                if let Some(peer) = peer {
64✔
839
                    tracing::debug!(%peer, "Connection attempt to peer failed with {:?}.", error,);
64✔
840
                } else {
841
                    tracing::debug!("Connection attempt to unknown peer failed with {:?}", error);
×
842
                }
843

844
                self.pending_swarm_events
64✔
845
                    .push_back(SwarmEvent::OutgoingConnectionError {
64✔
846
                        peer_id: peer,
64✔
847
                        connection_id,
64✔
848
                        error,
64✔
849
                    });
64✔
850
            }
851
            PoolEvent::PendingInboundConnectionError {
852
                id,
1✔
853
                send_back_addr,
1✔
854
                local_addr,
1✔
855
                error,
1✔
856
            } => {
857
                let error = error.into();
1✔
858

859
                tracing::debug!("Incoming connection failed: {:?}", error);
1✔
860
                self.behaviour
1✔
861
                    .on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
1✔
862
                        local_addr: &local_addr,
1✔
863
                        send_back_addr: &send_back_addr,
1✔
864
                        error: &error,
1✔
865
                        connection_id: id,
1✔
866
                        peer_id: None,
1✔
867
                    }));
1✔
868
                self.pending_swarm_events
1✔
869
                    .push_back(SwarmEvent::IncomingConnectionError {
1✔
870
                        connection_id: id,
1✔
871
                        local_addr,
1✔
872
                        send_back_addr,
1✔
873
                        error,
1✔
874
                        peer_id: None,
1✔
875
                    });
1✔
876
            }
877
            PoolEvent::ConnectionClosed {
878
                id,
465✔
879
                connected,
465✔
880
                error,
465✔
881
                remaining_established_connection_ids,
465✔
882
                ..
883
            } => {
884
                if let Some(error) = error.as_ref() {
465✔
885
                    tracing::debug!(
435✔
886
                        total_peers=%remaining_established_connection_ids.len(),
×
887
                        "Connection closed with error {:?}: {:?}",
×
888
                        error,
889
                        connected,
890
                    );
891
                } else {
892
                    tracing::debug!(
30✔
893
                        total_peers=%remaining_established_connection_ids.len(),
×
894
                        "Connection closed: {:?}",
×
895
                        connected
896
                    );
897
                }
898
                let peer_id = connected.peer_id;
465✔
899
                let endpoint = connected.endpoint;
465✔
900
                let num_established =
465✔
901
                    u32::try_from(remaining_established_connection_ids.len()).unwrap();
465✔
902

903
                self.behaviour
465✔
904
                    .on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
465✔
905
                        peer_id,
465✔
906
                        connection_id: id,
465✔
907
                        endpoint: &endpoint,
465✔
908
                        cause: error.as_ref(),
465✔
909
                        remaining_established: num_established as usize,
465✔
910
                    }));
465✔
911
                self.pending_swarm_events
465✔
912
                    .push_back(SwarmEvent::ConnectionClosed {
465✔
913
                        peer_id,
465✔
914
                        connection_id: id,
465✔
915
                        endpoint,
465✔
916
                        cause: error,
465✔
917
                        num_established,
465✔
918
                    });
465✔
919
            }
920
            PoolEvent::ConnectionEvent { peer_id, id, event } => {
55,857✔
921
                self.behaviour
55,857✔
922
                    .on_connection_handler_event(peer_id, id, event);
55,857✔
923
            }
55,857✔
924
            PoolEvent::AddressChange {
925
                peer_id,
×
926
                id,
×
927
                new_endpoint,
×
928
                old_endpoint,
×
929
            } => {
×
930
                self.behaviour
×
931
                    .on_swarm_event(FromSwarm::AddressChange(AddressChange {
×
932
                        peer_id,
×
933
                        connection_id: id,
×
934
                        old: &old_endpoint,
×
935
                        new: &new_endpoint,
×
936
                    }));
×
937
            }
×
938
        }
939
    }
85,662✔
940

941
    fn handle_transport_event(
24,087✔
942
        &mut self,
24,087✔
943
        event: TransportEvent<
24,087✔
944
            <transport::Boxed<(PeerId, StreamMuxerBox)> as Transport>::ListenerUpgrade,
24,087✔
945
            io::Error,
24,087✔
946
        >,
24,087✔
947
    ) {
24,087✔
948
        match event {
24,087✔
949
            TransportEvent::Incoming {
950
                listener_id: _,
951
                upgrade,
15,676✔
952
                local_addr,
15,676✔
953
                send_back_addr,
15,676✔
954
            } => {
955
                let connection_id = ConnectionId::next();
15,676✔
956

957
                match self.behaviour.handle_pending_inbound_connection(
15,676✔
958
                    connection_id,
15,676✔
959
                    &local_addr,
15,676✔
960
                    &send_back_addr,
15,676✔
961
                ) {
15,676✔
962
                    Ok(()) => {}
15,676✔
963
                    Err(cause) => {
×
964
                        let listen_error = ListenError::Denied { cause };
×
965

966
                        self.behaviour
×
967
                            .on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
×
968
                                local_addr: &local_addr,
×
969
                                send_back_addr: &send_back_addr,
×
970
                                error: &listen_error,
×
971
                                connection_id,
×
972
                                peer_id: None,
×
973
                            }));
×
974

975
                        self.pending_swarm_events
×
976
                            .push_back(SwarmEvent::IncomingConnectionError {
×
977
                                connection_id,
×
978
                                local_addr,
×
979
                                send_back_addr,
×
980
                                error: listen_error,
×
981
                                peer_id: None,
×
982
                            });
×
983
                        return;
×
984
                    }
985
                }
986

987
                self.pool.add_incoming(
15,676✔
988
                    upgrade,
15,676✔
989
                    IncomingInfo {
15,676✔
990
                        local_addr: &local_addr,
15,676✔
991
                        send_back_addr: &send_back_addr,
15,676✔
992
                    },
15,676✔
993
                    connection_id,
15,676✔
994
                );
995

996
                self.pending_swarm_events
15,676✔
997
                    .push_back(SwarmEvent::IncomingConnection {
15,676✔
998
                        connection_id,
15,676✔
999
                        local_addr,
15,676✔
1000
                        send_back_addr,
15,676✔
1001
                    })
15,676✔
1002
            }
1003
            TransportEvent::NewAddress {
1004
                listener_id,
8,408✔
1005
                listen_addr,
8,408✔
1006
            } => {
1007
                tracing::debug!(
8,408✔
1008
                    listener=?listener_id,
1009
                    address=%listen_addr,
1010
                    "New listener address"
4✔
1011
                );
1012
                let addrs = self.listened_addrs.entry(listener_id).or_default();
8,408✔
1013
                if !addrs.contains(&listen_addr) {
8,408✔
1014
                    addrs.push(listen_addr.clone())
8,407✔
1015
                }
1✔
1016
                self.behaviour
8,408✔
1017
                    .on_swarm_event(FromSwarm::NewListenAddr(NewListenAddr {
8,408✔
1018
                        listener_id,
8,408✔
1019
                        addr: &listen_addr,
8,408✔
1020
                    }));
8,408✔
1021
                self.pending_swarm_events
8,408✔
1022
                    .push_back(SwarmEvent::NewListenAddr {
8,408✔
1023
                        listener_id,
8,408✔
1024
                        address: listen_addr,
8,408✔
1025
                    })
8,408✔
1026
            }
1027
            TransportEvent::AddressExpired {
1028
                listener_id,
×
1029
                listen_addr,
×
1030
            } => {
1031
                tracing::debug!(
×
1032
                    listener=?listener_id,
1033
                    address=%listen_addr,
1034
                    "Expired listener address"
×
1035
                );
1036
                if let Some(addrs) = self.listened_addrs.get_mut(&listener_id) {
×
1037
                    addrs.retain(|a| a != &listen_addr);
×
1038
                }
×
1039
                self.behaviour
×
1040
                    .on_swarm_event(FromSwarm::ExpiredListenAddr(ExpiredListenAddr {
×
1041
                        listener_id,
×
1042
                        addr: &listen_addr,
×
1043
                    }));
×
1044
                self.pending_swarm_events
×
1045
                    .push_back(SwarmEvent::ExpiredListenAddr {
×
1046
                        listener_id,
×
1047
                        address: listen_addr,
×
1048
                    })
×
1049
            }
1050
            TransportEvent::ListenerClosed {
1051
                listener_id,
3✔
1052
                reason,
3✔
1053
            } => {
1054
                tracing::debug!(
3✔
1055
                    listener=?listener_id,
1056
                    ?reason,
1057
                    "Listener closed"
×
1058
                );
1059
                let addrs = self.listened_addrs.remove(&listener_id).unwrap_or_default();
3✔
1060
                for addr in addrs.iter() {
3✔
1061
                    self.behaviour.on_swarm_event(FromSwarm::ExpiredListenAddr(
2✔
1062
                        ExpiredListenAddr { listener_id, addr },
2✔
1063
                    ));
2✔
1064
                }
2✔
1065
                self.behaviour
3✔
1066
                    .on_swarm_event(FromSwarm::ListenerClosed(ListenerClosed {
3✔
1067
                        listener_id,
3✔
1068
                        reason: reason.as_ref().copied(),
3✔
1069
                    }));
3✔
1070
                self.pending_swarm_events
3✔
1071
                    .push_back(SwarmEvent::ListenerClosed {
3✔
1072
                        listener_id,
3✔
1073
                        addresses: addrs.to_vec(),
3✔
1074
                        reason,
3✔
1075
                    })
3✔
1076
            }
1077
            TransportEvent::ListenerError { listener_id, error } => {
×
1078
                self.behaviour
×
1079
                    .on_swarm_event(FromSwarm::ListenerError(ListenerError {
×
1080
                        listener_id,
×
1081
                        err: &error,
×
1082
                    }));
×
1083
                self.pending_swarm_events
×
1084
                    .push_back(SwarmEvent::ListenerError { listener_id, error })
×
1085
            }
1086
        }
1087
    }
24,087✔
1088

1089
    fn handle_behaviour_event(
89,390✔
1090
        &mut self,
89,390✔
1091
        event: ToSwarm<TBehaviour::ToSwarm, THandlerInEvent<TBehaviour>>,
89,390✔
1092
    ) {
89,390✔
1093
        match event {
89,390✔
1094
            ToSwarm::GenerateEvent(event) => {
51,460✔
1095
                self.pending_swarm_events
51,460✔
1096
                    .push_back(SwarmEvent::Behaviour(event));
51,460✔
1097
            }
51,460✔
1098
            ToSwarm::Dial { opts } => {
13,400✔
1099
                let peer_id = opts.get_peer_id();
13,400✔
1100
                let connection_id = opts.connection_id();
13,400✔
1101
                if let Ok(()) = self.dial(opts) {
13,400✔
1102
                    self.pending_swarm_events.push_back(SwarmEvent::Dialing {
12,239✔
1103
                        peer_id,
12,239✔
1104
                        connection_id,
12,239✔
1105
                    });
12,239✔
1106
                }
12,311✔
1107
            }
1108
            ToSwarm::ListenOn { opts } => {
1✔
1109
                // Error is dispatched internally, safe to ignore.
1✔
1110
                let _ = self.add_listener(opts);
1✔
1111
            }
1✔
1112
            ToSwarm::RemoveListener { id } => {
1✔
1113
                self.remove_listener(id);
1✔
1114
            }
1✔
1115
            ToSwarm::NotifyHandler {
1116
                peer_id,
24,312✔
1117
                handler,
24,312✔
1118
                event,
24,312✔
1119
            } => {
1120
                assert!(self.pending_handler_event.is_none());
24,312✔
1121
                let handler = match handler {
24,312✔
1122
                    NotifyHandler::One(connection) => PendingNotifyHandler::One(connection),
17,036✔
1123
                    NotifyHandler::Any => {
1124
                        let ids = self
7,276✔
1125
                            .pool
7,276✔
1126
                            .iter_established_connections_of_peer(&peer_id)
7,276✔
1127
                            .collect();
7,276✔
1128
                        PendingNotifyHandler::Any(ids)
7,276✔
1129
                    }
1130
                };
1131

1132
                self.pending_handler_event = Some((peer_id, handler, event));
24,312✔
1133
            }
1134
            ToSwarm::NewExternalAddrCandidate(addr) => {
75✔
1135
                if !self.confirmed_external_addr.contains(&addr) {
75✔
1136
                    self.behaviour
35✔
1137
                        .on_swarm_event(FromSwarm::NewExternalAddrCandidate(
35✔
1138
                            NewExternalAddrCandidate { addr: &addr },
35✔
1139
                        ));
35✔
1140
                    self.pending_swarm_events
35✔
1141
                        .push_back(SwarmEvent::NewExternalAddrCandidate { address: addr });
35✔
1142
                }
43✔
1143
            }
1144
            ToSwarm::ExternalAddrConfirmed(addr) => {
32✔
1145
                self.add_external_address(addr.clone());
32✔
1146
                self.pending_swarm_events
32✔
1147
                    .push_back(SwarmEvent::ExternalAddrConfirmed { address: addr });
32✔
1148
            }
32✔
1149
            ToSwarm::ExternalAddrExpired(addr) => {
×
1150
                self.remove_external_address(&addr);
×
1151
                self.pending_swarm_events
×
1152
                    .push_back(SwarmEvent::ExternalAddrExpired { address: addr });
×
1153
            }
×
1154
            ToSwarm::CloseConnection {
1155
                peer_id,
11✔
1156
                connection,
11✔
1157
            } => match connection {
11✔
1158
                CloseConnection::One(connection_id) => {
1✔
1159
                    if let Some(conn) = self.pool.get_established(connection_id) {
1✔
1160
                        conn.start_close();
1✔
1161
                    }
1✔
1162
                }
1163
                CloseConnection::All => {
10✔
1164
                    self.pool.disconnect(peer_id);
10✔
1165
                }
10✔
1166
            },
1167
            ToSwarm::NewExternalAddrOfPeer { peer_id, address } => {
98✔
1168
                self.behaviour
98✔
1169
                    .on_swarm_event(FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer {
98✔
1170
                        peer_id,
98✔
1171
                        addr: &address,
98✔
1172
                    }));
98✔
1173
                self.pending_swarm_events
98✔
1174
                    .push_back(SwarmEvent::NewExternalAddrOfPeer { peer_id, address });
98✔
1175
            }
98✔
1176
        }
1177
    }
89,390✔
1178

1179
    /// Internal function used by everything event-related.
1180
    ///
1181
    /// Polls the `Swarm` for the next event.
1182
    #[tracing::instrument(level = "debug", name = "Swarm::poll", skip(self, cx))]
1183
    fn poll_next_event(
2,725,291✔
1184
        mut self: Pin<&mut Self>,
2,725,291✔
1185
        cx: &mut Context<'_>,
2,725,291✔
1186
    ) -> Poll<SwarmEvent<TBehaviour::ToSwarm>> {
2,725,291✔
1187
        // We use a `this` variable because the compiler can't mutably borrow multiple times
1188
        // across a `Deref`.
1189
        let this = &mut *self;
2,725,291✔
1190

1191
        // This loop polls the components below in a prioritized order.
1192
        //
1193
        // 1. [`NetworkBehaviour`]
1194
        // 2. Connection [`Pool`]
1195
        // 3. [`ListenersStream`]
1196
        //
1197
        // (1) is polled before (2) to prioritize local work over work coming from a remote.
1198
        //
1199
        // (2) is polled before (3) to prioritize existing connections
1200
        // over upgrading new incoming connections.
1201
        loop {
1202
            if let Some(swarm_event) = this.pending_swarm_events.pop_front() {
2,948,742✔
1203
                return Poll::Ready(swarm_event);
117,756✔
1204
            }
2,830,986✔
1205

1206
            match this.pending_handler_event.take() {
2,830,986✔
1207
                // Try to deliver the pending event emitted by the [`NetworkBehaviour`] in the
1208
                // previous iteration to the connection handler(s).
1209
                Some((peer_id, handler, event)) => match handler {
24,312✔
1210
                    PendingNotifyHandler::One(conn_id) => {
17,036✔
1211
                        match this.pool.get_established(conn_id) {
17,036✔
1212
                            Some(conn) => match notify_one(conn, event, cx) {
17,036✔
1213
                                None => continue,
17,036✔
1214
                                Some(event) => {
×
1215
                                    this.pending_handler_event = Some((peer_id, handler, event));
×
1216
                                }
×
1217
                            },
1218
                            None => continue,
×
1219
                        }
1220
                    }
1221
                    PendingNotifyHandler::Any(ids) => {
7,276✔
1222
                        match notify_any::<_, TBehaviour>(ids, &mut this.pool, event, cx) {
7,276✔
1223
                            None => continue,
7,276✔
1224
                            Some((event, ids)) => {
×
1225
                                let handler = PendingNotifyHandler::Any(ids);
×
1226
                                this.pending_handler_event = Some((peer_id, handler, event));
×
1227
                            }
×
1228
                        }
1229
                    }
1230
                },
1231
                // No pending event. Allow the [`NetworkBehaviour`] to make progress.
1232
                None => match this.behaviour.poll(cx) {
2,806,674✔
1233
                    Poll::Pending => {}
2,717,284✔
1234
                    Poll::Ready(behaviour_event) => {
89,390✔
1235
                        this.handle_behaviour_event(behaviour_event);
89,390✔
1236

1237
                        continue;
89,390✔
1238
                    }
1239
                },
1240
            }
1241

1242
            // Poll the known peers.
1243
            match this.pool.poll(cx) {
2,717,284✔
1244
                Poll::Pending => {}
2,631,622✔
1245
                Poll::Ready(pool_event) => {
85,662✔
1246
                    this.handle_pool_event(pool_event);
85,662✔
1247
                    continue;
85,662✔
1248
                }
1249
            }
1250

1251
            // Poll the listener(s) for new connections.
1252
            match Pin::new(&mut this.transport).poll(cx) {
2,631,622✔
1253
                Poll::Pending => {}
2,607,535✔
1254
                Poll::Ready(transport_event) => {
24,087✔
1255
                    this.handle_transport_event(transport_event);
24,087✔
1256
                    continue;
24,087✔
1257
                }
1258
            }
1259

1260
            return Poll::Pending;
2,607,535✔
1261
        }
1262
    }
2,725,291✔
1263
}
1264

1265
/// Connection to notify of a pending event.
1266
///
1267
/// The connection IDs out of which to notify one of an event are captured at
1268
/// the time the behaviour emits the event, in order not to forward the event to
1269
/// a new connection which the behaviour may not have been aware of at the time
1270
/// it issued the request for sending it.
1271
enum PendingNotifyHandler {
1272
    One(ConnectionId),
1273
    Any(SmallVec<[ConnectionId; 10]>),
1274
}
1275

1276
/// Notify a single connection of an event.
1277
///
1278
/// Returns `Some` with the given event if the connection is not currently
1279
/// ready to receive another event, in which case the current task is
1280
/// scheduled to be woken up.
1281
///
1282
/// Returns `None` if the connection is closing or the event has been
1283
/// successfully sent, in either case the event is consumed.
1284
fn notify_one<THandlerInEvent>(
17,036✔
1285
    conn: &mut EstablishedConnection<THandlerInEvent>,
17,036✔
1286
    event: THandlerInEvent,
17,036✔
1287
    cx: &mut Context<'_>,
17,036✔
1288
) -> Option<THandlerInEvent> {
17,036✔
1289
    match conn.poll_ready_notify_handler(cx) {
17,036✔
1290
        Poll::Pending => Some(event),
×
1291
        Poll::Ready(Err(())) => None, // connection is closing
×
1292
        Poll::Ready(Ok(())) => {
1293
            // Can now only fail if connection is closing.
1294
            let _ = conn.notify_handler(event);
17,036✔
1295
            None
17,036✔
1296
        }
1297
    }
1298
}
17,036✔
1299

1300
/// Notify any one of a given list of connections of a peer of an event.
1301
///
1302
/// Returns `Some` with the given event and a new list of connections if
1303
/// none of the given connections was able to receive the event but at
1304
/// least one of them is not closing, in which case the current task
1305
/// is scheduled to be woken up. The returned connections are those which
1306
/// may still become ready to receive another event.
1307
///
1308
/// Returns `None` if either all connections are closing or the event
1309
/// was successfully sent to a handler, in either case the event is consumed.
1310
fn notify_any<THandler, TBehaviour>(
7,276✔
1311
    ids: SmallVec<[ConnectionId; 10]>,
7,276✔
1312
    pool: &mut Pool<THandler>,
7,276✔
1313
    event: THandlerInEvent<TBehaviour>,
7,276✔
1314
    cx: &mut Context<'_>,
7,276✔
1315
) -> Option<(THandlerInEvent<TBehaviour>, SmallVec<[ConnectionId; 10]>)>
7,276✔
1316
where
7,276✔
1317
    TBehaviour: NetworkBehaviour,
7,276✔
1318
    THandler: ConnectionHandler<
7,276✔
1319
        FromBehaviour = THandlerInEvent<TBehaviour>,
7,276✔
1320
        ToBehaviour = THandlerOutEvent<TBehaviour>,
7,276✔
1321
    >,
7,276✔
1322
{
1323
    let mut pending = SmallVec::new();
7,276✔
1324
    let mut event = Some(event); // (1)
7,276✔
1325
    for id in ids.into_iter() {
7,276✔
1326
        if let Some(conn) = pool.get_established(id) {
7,276✔
1327
            match conn.poll_ready_notify_handler(cx) {
7,276✔
1328
                Poll::Pending => pending.push(id),
×
1329
                Poll::Ready(Err(())) => {} // connection is closing
×
1330
                Poll::Ready(Ok(())) => {
1331
                    let e = event.take().expect("by (1),(2)");
7,276✔
1332
                    if let Err(e) = conn.notify_handler(e) {
7,276✔
1333
                        event = Some(e) // (2)
×
1334
                    } else {
1335
                        break;
7,276✔
1336
                    }
1337
                }
1338
            }
1339
        }
×
1340
    }
1341

1342
    event.and_then(|e| {
7,276✔
1343
        if !pending.is_empty() {
×
1344
            Some((e, pending))
×
1345
        } else {
1346
            None
×
1347
        }
1348
    })
×
1349
}
7,276✔
1350

1351
/// Stream of events returned by [`Swarm`].
1352
///
1353
/// Includes events from the [`NetworkBehaviour`] as well as events about
1354
/// connection and listener status. See [`SwarmEvent`] for details.
1355
///
1356
/// Note: This stream is infinite and it is guaranteed that
1357
/// [`futures::Stream::poll_next`] will never return `Poll::Ready(None)`.
1358
impl<TBehaviour> futures::Stream for Swarm<TBehaviour>
1359
where
1360
    TBehaviour: NetworkBehaviour,
1361
{
1362
    type Item = SwarmEvent<TBehaviourOutEvent<TBehaviour>>;
1363

1364
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2,725,017✔
1365
        self.as_mut().poll_next_event(cx).map(Some)
2,725,017✔
1366
    }
2,725,017✔
1367
}
1368

1369
/// The stream of swarm events never terminates, so we can implement fused for it.
1370
impl<TBehaviour> FusedStream for Swarm<TBehaviour>
1371
where
1372
    TBehaviour: NetworkBehaviour,
1373
{
1374
    fn is_terminated(&self) -> bool {
36,160✔
1375
        false
36,160✔
1376
    }
36,160✔
1377
}
1378

1379
pub struct Config {
1380
    pool_config: PoolConfig,
1381
}
1382

1383
impl Config {
1384
    /// Creates a new [`Config`] from the given executor. The [`Swarm`] is obtained via
1385
    /// [`Swarm::new`].
1386
    pub fn with_executor(executor: impl Executor + Send + 'static) -> Self {
15,847✔
1387
        Self {
15,847✔
1388
            pool_config: PoolConfig::new(Some(Box::new(executor))),
15,847✔
1389
        }
15,847✔
1390
    }
15,847✔
1391

1392
    #[doc(hidden)]
1393
    /// Used on connection benchmarks.
1394
    pub fn without_executor() -> Self {
×
1395
        Self {
×
1396
            pool_config: PoolConfig::new(None),
×
1397
        }
×
1398
    }
×
1399

1400
    /// Sets executor to the `wasm` executor.
1401
    /// Background tasks will be executed by the browser on the next micro-tick.
1402
    ///
1403
    /// Spawning a task is similar too:
1404
    /// ```typescript
1405
    /// function spawn(task: () => Promise<void>) {
1406
    ///     task()
1407
    /// }
1408
    /// ```
1409
    #[cfg(feature = "wasm-bindgen")]
1410
    pub fn with_wasm_executor() -> Self {
1411
        Self::with_executor(crate::executor::WasmBindgenExecutor)
1412
    }
1413

1414
    /// Builds a new [`Config`] from the given `tokio` executor.
1415
    #[cfg(all(
1416
        feature = "tokio",
1417
        not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown"))
1418
    ))]
1419
    pub fn with_tokio_executor() -> Self {
15,847✔
1420
        Self::with_executor(crate::executor::TokioExecutor)
15,847✔
1421
    }
15,847✔
1422

1423
    /// Configures the number of events from the [`NetworkBehaviour`] in
1424
    /// destination to the [`ConnectionHandler`] that can be buffered before
1425
    /// the [`Swarm`] has to wait. An individual buffer with this number of
1426
    /// events exists for each individual connection.
1427
    ///
1428
    /// The ideal value depends on the executor used, the CPU speed, and the
1429
    /// volume of events. If this value is too low, then the [`Swarm`] will
1430
    /// be sleeping more often than necessary. Increasing this value increases
1431
    /// the overall memory usage.
1432
    pub fn with_notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
×
1433
        self.pool_config = self.pool_config.with_notify_handler_buffer_size(n);
×
1434
        self
×
1435
    }
×
1436

1437
    /// Configures the size of the buffer for events sent by a [`ConnectionHandler`] to the
1438
    /// [`NetworkBehaviour`].
1439
    ///
1440
    /// Each connection has its own buffer.
1441
    ///
1442
    /// The ideal value depends on the executor used, the CPU speed and the volume of events.
1443
    /// If this value is too low, then the [`ConnectionHandler`]s will be sleeping more often
1444
    /// than necessary. Increasing this value increases the overall memory
1445
    /// usage, and more importantly the latency between the moment when an
1446
    /// event is emitted and the moment when it is received by the
1447
    /// [`NetworkBehaviour`].
1448
    pub fn with_per_connection_event_buffer_size(mut self, n: usize) -> Self {
×
1449
        self.pool_config = self.pool_config.with_per_connection_event_buffer_size(n);
×
1450
        self
×
1451
    }
×
1452

1453
    /// Number of addresses concurrently dialed for a single outbound connection attempt.
1454
    pub fn with_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self {
10✔
1455
        self.pool_config = self.pool_config.with_dial_concurrency_factor(factor);
10✔
1456
        self
10✔
1457
    }
10✔
1458

1459
    /// Configures an override for the substream upgrade protocol to use.
1460
    ///
1461
    /// The subtream upgrade protocol is the multistream-select protocol
1462
    /// used for protocol negotiation on substreams. Since a listener
1463
    /// supports all existing versions, the choice of upgrade protocol
1464
    /// only effects the "dialer", i.e. the peer opening a substream.
1465
    ///
1466
    /// > **Note**: If configured, specific upgrade protocols for
1467
    /// > individual [`SubstreamProtocol`]s emitted by the `NetworkBehaviour`
1468
    /// > are ignored.
1469
    pub fn with_substream_upgrade_protocol_override(
×
1470
        mut self,
×
1471
        v: libp2p_core::upgrade::Version,
×
1472
    ) -> Self {
×
1473
        self.pool_config = self.pool_config.with_substream_upgrade_protocol_override(v);
×
1474
        self
×
1475
    }
×
1476

1477
    /// The maximum number of inbound streams concurrently negotiating on a
1478
    /// connection. New inbound streams exceeding the limit are dropped and thus
1479
    /// reset.
1480
    ///
1481
    /// Note: This only enforces a limit on the number of concurrently
1482
    /// negotiating inbound streams. The total number of inbound streams on a
1483
    /// connection is the sum of negotiating and negotiated streams. A limit on
1484
    /// the total number of streams can be enforced at the
1485
    /// [`StreamMuxerBox`] level.
1486
    pub fn with_max_negotiating_inbound_streams(mut self, v: usize) -> Self {
×
1487
        self.pool_config = self.pool_config.with_max_negotiating_inbound_streams(v);
×
1488
        self
×
1489
    }
×
1490

1491
    /// How long to keep a connection alive once it is idling.
1492
    ///
1493
    /// Defaults to 10s.
1494
    ///
1495
    /// Typically, you shouldn't _need_ to modify this default as connections will be kept alive
1496
    /// whilst they are "in use" (see below). Depending on the application's usecase, it may be
1497
    /// desirable to keep connections alive despite them not being in use.
1498
    ///
1499
    /// A connection is considered idle if:
1500
    /// - There are no active inbound streams.
1501
    /// - There are no active outbounds streams.
1502
    /// - There are no pending outbound streams (i.e. all streams requested via
1503
    ///   [`ConnectionHandlerEvent::OutboundSubstreamRequest`] have completed).
1504
    /// - Every [`ConnectionHandler`] returns `false` from
1505
    ///   [`ConnectionHandler::connection_keep_alive`].
1506
    ///
1507
    /// Once all these conditions are true, the idle connection timeout starts ticking.
1508
    pub fn with_idle_connection_timeout(mut self, timeout: Duration) -> Self {
×
1509
        self.pool_config.idle_connection_timeout = timeout;
×
1510
        self
×
1511
    }
×
1512
}
1513

1514
/// Possible errors when trying to establish or upgrade an outbound connection.
1515
#[derive(Debug)]
1516
pub enum DialError {
1517
    /// The peer identity obtained on the connection matches the local peer.
1518
    LocalPeerId { address: Multiaddr },
1519
    /// No addresses have been provided by [`NetworkBehaviour::handle_pending_outbound_connection`]
1520
    /// and [`DialOpts`].
1521
    NoAddresses,
1522
    /// The provided [`dial_opts::PeerCondition`] evaluated to false and thus
1523
    /// the dial was aborted.
1524
    DialPeerConditionFalse(dial_opts::PeerCondition),
1525
    /// Pending connection attempt has been aborted.
1526
    Aborted,
1527
    /// The peer identity obtained on the connection did not match the one that was expected.
1528
    WrongPeerId {
1529
        obtained: PeerId,
1530
        address: Multiaddr,
1531
    },
1532
    /// One of the [`NetworkBehaviour`]s rejected the outbound connection
1533
    /// via [`NetworkBehaviour::handle_pending_outbound_connection`] or
1534
    /// [`NetworkBehaviour::handle_established_outbound_connection`].
1535
    Denied { cause: ConnectionDenied },
1536
    /// An error occurred while negotiating the transport protocol(s) on a connection.
1537
    Transport(Vec<(Multiaddr, TransportError<io::Error>)>),
1538
}
1539

1540
impl From<PendingOutboundConnectionError> for DialError {
1541
    fn from(error: PendingOutboundConnectionError) -> Self {
146✔
1542
        match error {
146✔
1543
            PendingOutboundConnectionError::Aborted => DialError::Aborted,
1✔
1544
            PendingOutboundConnectionError::WrongPeerId { obtained, address } => {
10✔
1545
                DialError::WrongPeerId { obtained, address }
10✔
1546
            }
1547
            PendingOutboundConnectionError::LocalPeerId { address } => {
1✔
1548
                DialError::LocalPeerId { address }
1✔
1549
            }
1550
            PendingOutboundConnectionError::Transport(e) => DialError::Transport(e),
134✔
1551
        }
1552
    }
146✔
1553
}
1554

1555
impl fmt::Display for DialError {
1556
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4✔
1557
        match self {
×
1558
            DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."),
3✔
1559
            DialError::LocalPeerId { address } => write!(
×
1560
                f,
×
1561
                "Dial error: tried to dial local peer id at {address:?}."
×
1562
            ),
1563
            DialError::DialPeerConditionFalse(PeerCondition::Disconnected) => write!(f, "Dial error: dial condition was configured to only happen when disconnected (`PeerCondition::Disconnected`), but node is already connected, thus cancelling new dial."),
×
1564
            DialError::DialPeerConditionFalse(PeerCondition::NotDialing) => write!(f, "Dial error: dial condition was configured to only happen if there is currently no ongoing dialing attempt (`PeerCondition::NotDialing`), but a dial is in progress, thus cancelling new dial."),
×
1565
            DialError::DialPeerConditionFalse(PeerCondition::DisconnectedAndNotDialing) => write!(f, "Dial error: dial condition was configured to only happen when both disconnected (`PeerCondition::Disconnected`) and there is currently no ongoing dialing attempt (`PeerCondition::NotDialing`), but node is already connected or dial is in progress, thus cancelling new dial."),
×
1566
            DialError::DialPeerConditionFalse(PeerCondition::Always) => unreachable!("Dial peer condition is by definition true."),
×
1567
            DialError::Aborted => write!(
×
1568
                f,
×
1569
                "Dial error: Pending connection attempt has been aborted."
×
1570
            ),
1571
            DialError::WrongPeerId { obtained, address } => write!(
×
1572
                f,
×
1573
                "Dial error: Unexpected peer ID {obtained} at {address:?}."
×
1574
            ),
1575
            DialError::Transport(errors) => {
1✔
1576
                write!(f, "Failed to negotiate transport protocol(s): [")?;
1✔
1577

1578
                for (addr, error) in errors {
2✔
1579
                    write!(f, "({addr}")?;
1✔
1580
                    print_error_chain(f, error)?;
1✔
1581
                    write!(f, ")")?;
1✔
1582
                }
1583
                write!(f, "]")?;
1✔
1584

1585
                Ok(())
1✔
1586
            }
1587
            DialError::Denied { .. } => {
1588
                write!(f, "Dial error")
×
1589
            }
1590
        }
1591
    }
4✔
1592
}
1593

1594
fn print_error_chain(f: &mut fmt::Formatter<'_>, e: &dyn error::Error) -> fmt::Result {
2✔
1595
    write!(f, ": {e}")?;
2✔
1596

1597
    if let Some(source) = e.source() {
2✔
1598
        print_error_chain(f, source)?;
1✔
1599
    }
1✔
1600

1601
    Ok(())
2✔
1602
}
2✔
1603

1604
impl error::Error for DialError {
1605
    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
×
1606
        match self {
×
1607
            DialError::LocalPeerId { .. } => None,
×
1608
            DialError::NoAddresses => None,
×
1609
            DialError::DialPeerConditionFalse(_) => None,
×
1610
            DialError::Aborted => None,
×
1611
            DialError::WrongPeerId { .. } => None,
×
1612
            DialError::Transport(_) => None,
×
1613
            DialError::Denied { cause } => Some(cause),
×
1614
        }
1615
    }
×
1616
}
1617

1618
/// Possible errors when upgrading an inbound connection.
1619
#[derive(Debug)]
1620
pub enum ListenError {
1621
    /// Pending connection attempt has been aborted.
1622
    Aborted,
1623
    /// The peer identity obtained on the connection did not match the one that was expected.
1624
    WrongPeerId {
1625
        obtained: PeerId,
1626
        endpoint: ConnectedPoint,
1627
    },
1628
    /// The connection was dropped because it resolved to our own [`PeerId`].
1629
    LocalPeerId {
1630
        address: Multiaddr,
1631
    },
1632
    Denied {
1633
        cause: ConnectionDenied,
1634
    },
1635
    /// An error occurred while negotiating the transport protocol(s) on a connection.
1636
    Transport(TransportError<io::Error>),
1637
}
1638

1639
impl From<PendingInboundConnectionError> for ListenError {
1640
    fn from(error: PendingInboundConnectionError) -> Self {
1✔
1641
        match error {
1✔
1642
            PendingInboundConnectionError::Transport(inner) => ListenError::Transport(inner),
×
1643
            PendingInboundConnectionError::Aborted => ListenError::Aborted,
×
1644
            PendingInboundConnectionError::LocalPeerId { address } => {
1✔
1645
                ListenError::LocalPeerId { address }
1✔
1646
            }
1647
        }
1648
    }
1✔
1649
}
1650

1651
impl fmt::Display for ListenError {
1652
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
1653
        match self {
×
1654
            ListenError::Aborted => write!(
×
1655
                f,
×
1656
                "Listen error: Pending connection attempt has been aborted."
×
1657
            ),
1658
            ListenError::WrongPeerId { obtained, endpoint } => write!(
×
1659
                f,
×
1660
                "Listen error: Unexpected peer ID {obtained} at {endpoint:?}."
×
1661
            ),
1662
            ListenError::Transport(_) => {
1663
                write!(f, "Listen error: Failed to negotiate transport protocol(s)")
×
1664
            }
1665
            ListenError::Denied { cause } => {
×
1666
                write!(f, "Listen error: Denied: {cause}")
×
1667
            }
1668
            ListenError::LocalPeerId { address } => {
×
1669
                write!(f, "Listen error: Local peer ID at {address:?}.")
×
1670
            }
1671
        }
1672
    }
×
1673
}
1674

1675
impl error::Error for ListenError {
1676
    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
×
1677
        match self {
×
1678
            ListenError::WrongPeerId { .. } => None,
×
1679
            ListenError::Transport(err) => Some(err),
×
1680
            ListenError::Aborted => None,
×
1681
            ListenError::Denied { cause } => Some(cause),
×
1682
            ListenError::LocalPeerId { .. } => None,
×
1683
        }
1684
    }
×
1685
}
1686

1687
/// A connection was denied.
1688
///
1689
/// To figure out which [`NetworkBehaviour`] denied the connection, use
1690
/// [`ConnectionDenied::downcast`].
1691
#[derive(Debug)]
1692
pub struct ConnectionDenied {
1693
    inner: Box<dyn error::Error + Send + Sync + 'static>,
1694
}
1695

1696
impl ConnectionDenied {
1697
    pub fn new(cause: impl Into<Box<dyn error::Error + Send + Sync + 'static>>) -> Self {
330✔
1698
        Self {
330✔
1699
            inner: cause.into(),
330✔
1700
        }
330✔
1701
    }
330✔
1702

1703
    /// Attempt to downcast to a particular reason for why the connection was denied.
1704
    pub fn downcast<E>(self) -> Result<E, Self>
327✔
1705
    where
327✔
1706
        E: error::Error + Send + Sync + 'static,
327✔
1707
    {
1708
        let inner = self
327✔
1709
            .inner
327✔
1710
            .downcast::<E>()
327✔
1711
            .map_err(|inner| ConnectionDenied { inner })?;
327✔
1712

1713
        Ok(*inner)
327✔
1714
    }
327✔
1715

1716
    /// Attempt to downcast to a particular reason for why the connection was denied.
1717
    pub fn downcast_ref<E>(&self) -> Option<&E>
×
1718
    where
×
1719
        E: error::Error + Send + Sync + 'static,
×
1720
    {
1721
        self.inner.downcast_ref::<E>()
×
1722
    }
×
1723
}
1724

1725
impl fmt::Display for ConnectionDenied {
1726
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
1727
        write!(f, "connection denied")
×
1728
    }
×
1729
}
1730

1731
impl error::Error for ConnectionDenied {
1732
    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
×
1733
        Some(self.inner.as_ref())
×
1734
    }
×
1735
}
1736

1737
/// Information about the connections obtained by [`Swarm::network_info()`].
1738
#[derive(Clone, Debug)]
1739
pub struct NetworkInfo {
1740
    /// The total number of connected peers.
1741
    num_peers: usize,
1742
    /// Counters of ongoing network connections.
1743
    connection_counters: ConnectionCounters,
1744
}
1745

1746
impl NetworkInfo {
1747
    /// The number of connected peers, i.e. peers with whom at least
1748
    /// one established connection exists.
1749
    pub fn num_peers(&self) -> usize {
3✔
1750
        self.num_peers
3✔
1751
    }
3✔
1752

1753
    /// Gets counters for ongoing network connections.
1754
    pub fn connection_counters(&self) -> &ConnectionCounters {
3✔
1755
        &self.connection_counters
3✔
1756
    }
3✔
1757
}
1758

1759
#[cfg(test)]
1760
mod tests {
1761
    use libp2p_core::{
1762
        multiaddr,
1763
        multiaddr::multiaddr,
1764
        transport,
1765
        transport::{memory::MemoryTransportError, TransportEvent},
1766
        upgrade,
1767
    };
1768
    use libp2p_identity as identity;
1769
    use libp2p_plaintext as plaintext;
1770
    use libp2p_yamux as yamux;
1771
    use quickcheck::*;
1772

1773
    use super::*;
1774
    use crate::test::{CallTraceBehaviour, MockBehaviour};
1775

1776
    // Test execution state.
1777
    // Connection => Disconnecting => Connecting.
1778
    enum State {
1779
        Connecting,
1780
        Disconnecting,
1781
    }
1782

1783
    fn new_test_swarm(
23✔
1784
        config: Config,
23✔
1785
    ) -> Swarm<CallTraceBehaviour<MockBehaviour<dummy::ConnectionHandler, ()>>> {
23✔
1786
        let id_keys = identity::Keypair::generate_ed25519();
23✔
1787
        let local_public_key = id_keys.public();
23✔
1788
        let transport = transport::MemoryTransport::default()
23✔
1789
            .upgrade(upgrade::Version::V1)
23✔
1790
            .authenticate(plaintext::Config::new(&id_keys))
23✔
1791
            .multiplex(yamux::Config::default())
23✔
1792
            .boxed();
23✔
1793
        let behaviour = CallTraceBehaviour::new(MockBehaviour::new(dummy::ConnectionHandler));
23✔
1794

1795
        Swarm::new(transport, behaviour, local_public_key.into(), config)
23✔
1796
    }
23✔
1797

1798
    fn swarms_connected<TBehaviour>(
113✔
1799
        swarm1: &Swarm<CallTraceBehaviour<TBehaviour>>,
113✔
1800
        swarm2: &Swarm<CallTraceBehaviour<TBehaviour>>,
113✔
1801
        num_connections: usize,
113✔
1802
    ) -> bool
113✔
1803
    where
113✔
1804
        TBehaviour: NetworkBehaviour,
113✔
1805
        THandlerOutEvent<TBehaviour>: Clone,
113✔
1806
    {
1807
        swarm1
113✔
1808
            .behaviour()
113✔
1809
            .num_connections_to_peer(*swarm2.local_peer_id())
113✔
1810
            == num_connections
113✔
1811
            && swarm2
5✔
1812
                .behaviour()
5✔
1813
                .num_connections_to_peer(*swarm1.local_peer_id())
5✔
1814
                == num_connections
5✔
1815
            && swarm1.is_connected(swarm2.local_peer_id())
5✔
1816
            && swarm2.is_connected(swarm1.local_peer_id())
5✔
1817
    }
113✔
1818

1819
    fn swarms_disconnected<TBehaviour>(
22✔
1820
        swarm1: &Swarm<CallTraceBehaviour<TBehaviour>>,
22✔
1821
        swarm2: &Swarm<CallTraceBehaviour<TBehaviour>>,
22✔
1822
    ) -> bool
22✔
1823
    where
22✔
1824
        TBehaviour: NetworkBehaviour,
22✔
1825
        THandlerOutEvent<TBehaviour>: Clone,
22✔
1826
    {
1827
        swarm1
22✔
1828
            .behaviour()
22✔
1829
            .num_connections_to_peer(*swarm2.local_peer_id())
22✔
1830
            == 0
22✔
1831
            && swarm2
2✔
1832
                .behaviour()
2✔
1833
                .num_connections_to_peer(*swarm1.local_peer_id())
2✔
1834
                == 0
2✔
1835
            && !swarm1.is_connected(swarm2.local_peer_id())
2✔
1836
            && !swarm2.is_connected(swarm1.local_peer_id())
2✔
1837
    }
22✔
1838

1839
    /// Establishes multiple connections between two peers,
1840
    /// after which one peer disconnects the other using [`Swarm::disconnect_peer_id`].
1841
    ///
1842
    /// The test expects both behaviours to be notified via calls to
1843
    /// [`NetworkBehaviour::on_swarm_event`] with pairs of [`FromSwarm::ConnectionEstablished`]
1844
    /// / [`FromSwarm::ConnectionClosed`]
1845
    #[tokio::test]
1846
    async fn test_swarm_disconnect() {
1✔
1847
        let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1✔
1848
        let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1✔
1849

1850
        let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1✔
1851
        let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1✔
1852

1853
        swarm1.listen_on(addr1.clone()).unwrap();
1✔
1854
        swarm2.listen_on(addr2.clone()).unwrap();
1✔
1855

1856
        let swarm1_id = *swarm1.local_peer_id();
1✔
1857

1858
        let mut reconnected = false;
1✔
1859
        let num_connections = 10;
1✔
1860

1861
        for _ in 0..num_connections {
10✔
1862
            swarm1.dial(addr2.clone()).unwrap();
10✔
1863
        }
10✔
1864
        let mut state = State::Connecting;
1✔
1865

1866
        future::poll_fn(move |cx| loop {
1✔
1867
            let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
56✔
1868
            let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
56✔
1869
            match state {
56✔
1870
                State::Connecting => {
1✔
1871
                    if swarms_connected(&swarm1, &swarm2, num_connections) {
45✔
1872
                        if reconnected {
2✔
1873
                            return Poll::Ready(());
1✔
1874
                        }
1✔
1875
                        swarm2
1✔
1876
                            .disconnect_peer_id(swarm1_id)
1✔
1877
                            .expect("Error disconnecting");
1✔
1878
                        state = State::Disconnecting;
1✔
1879
                    }
43✔
1880
                }
1✔
1881
                State::Disconnecting => {
1✔
1882
                    if swarms_disconnected(&swarm1, &swarm2) {
11✔
1883
                        if reconnected {
1✔
1884
                            return Poll::Ready(());
1✔
1885
                        }
1✔
1886
                        reconnected = true;
1✔
1887
                        for _ in 0..num_connections {
10✔
1888
                            swarm2.dial(addr1.clone()).unwrap();
10✔
1889
                        }
10✔
1890
                        state = State::Connecting;
1✔
1891
                    }
10✔
1892
                }
1✔
1893
            }
1✔
1894

1✔
1895
            if poll1.is_pending() && poll2.is_pending() {
55✔
1896
                return Poll::Pending;
5✔
1897
            }
50✔
1898
        })
6✔
1899
        .await
1✔
1900
    }
1✔
1901

1902
    /// Establishes multiple connections between two peers,
1903
    /// after which one peer disconnects the other
1904
    /// using [`ToSwarm::CloseConnection`] returned by a [`NetworkBehaviour`].
1905
    ///
1906
    /// The test expects both behaviours to be notified via calls to
1907
    /// [`NetworkBehaviour::on_swarm_event`] with pairs of [`FromSwarm::ConnectionEstablished`]
1908
    /// / [`FromSwarm::ConnectionClosed`]
1909
    #[tokio::test]
1910
    async fn test_behaviour_disconnect_all() {
1✔
1911
        let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1✔
1912
        let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1✔
1913

1914
        let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1✔
1915
        let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1✔
1916

1917
        swarm1.listen_on(addr1.clone()).unwrap();
1✔
1918
        swarm2.listen_on(addr2.clone()).unwrap();
1✔
1919

1920
        let swarm1_id = *swarm1.local_peer_id();
1✔
1921

1922
        let mut reconnected = false;
1✔
1923
        let num_connections = 10;
1✔
1924

1925
        for _ in 0..num_connections {
10✔
1926
            swarm1.dial(addr2.clone()).unwrap();
10✔
1927
        }
10✔
1928
        let mut state = State::Connecting;
1✔
1929

1930
        future::poll_fn(move |cx| loop {
1✔
1931
            let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
56✔
1932
            let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
56✔
1933
            match state {
56✔
1934
                State::Connecting => {
1✔
1935
                    if swarms_connected(&swarm1, &swarm2, num_connections) {
45✔
1936
                        if reconnected {
2✔
1937
                            return Poll::Ready(());
1✔
1938
                        }
1✔
1939
                        swarm2
1✔
1940
                            .behaviour
1✔
1941
                            .inner()
1✔
1942
                            .next_action
1✔
1943
                            .replace(ToSwarm::CloseConnection {
1✔
1944
                                peer_id: swarm1_id,
1✔
1945
                                connection: CloseConnection::All,
1✔
1946
                            });
1✔
1947
                        state = State::Disconnecting;
1✔
1948
                        continue;
1✔
1949
                    }
43✔
1950
                }
1✔
1951
                State::Disconnecting => {
1✔
1952
                    if swarms_disconnected(&swarm1, &swarm2) {
11✔
1953
                        reconnected = true;
1✔
1954
                        for _ in 0..num_connections {
10✔
1955
                            swarm2.dial(addr1.clone()).unwrap();
10✔
1956
                        }
10✔
1957
                        state = State::Connecting;
1✔
1958
                        continue;
1✔
1959
                    }
10✔
1960
                }
1✔
1961
            }
1✔
1962

1✔
1963
            if poll1.is_pending() && poll2.is_pending() {
53✔
1964
                return Poll::Pending;
5✔
1965
            }
48✔
1966
        })
6✔
1967
        .await
1✔
1968
    }
1✔
1969

1970
    /// Establishes multiple connections between two peers,
1971
    /// after which one peer closes a single connection
1972
    /// using [`ToSwarm::CloseConnection`] returned by a [`NetworkBehaviour`].
1973
    ///
1974
    /// The test expects both behaviours to be notified via calls to
1975
    /// [`NetworkBehaviour::on_swarm_event`] with pairs of [`FromSwarm::ConnectionEstablished`]
1976
    /// / [`FromSwarm::ConnectionClosed`]
1977
    #[tokio::test]
1978
    async fn test_behaviour_disconnect_one() {
1✔
1979
        let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1✔
1980
        let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1✔
1981

1982
        let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1✔
1983
        let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1✔
1984

1985
        swarm1.listen_on(addr1).unwrap();
1✔
1986
        swarm2.listen_on(addr2.clone()).unwrap();
1✔
1987

1988
        let swarm1_id = *swarm1.local_peer_id();
1✔
1989

1990
        let num_connections = 10;
1✔
1991

1992
        for _ in 0..num_connections {
10✔
1993
            swarm1.dial(addr2.clone()).unwrap();
10✔
1994
        }
10✔
1995
        let mut state = State::Connecting;
1✔
1996
        let mut disconnected_conn_id = None;
1✔
1997

1998
        future::poll_fn(move |cx| loop {
1✔
1999
            let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
25✔
2000
            let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
25✔
2001
            match state {
25✔
2002
                State::Connecting => {
1✔
2003
                    if swarms_connected(&swarm1, &swarm2, num_connections) {
23✔
2004
                        disconnected_conn_id = {
1✔
2005
                            let conn_id =
1✔
2006
                                swarm2.behaviour.on_connection_established[num_connections / 2].1;
1✔
2007
                            swarm2.behaviour.inner().next_action.replace(
1✔
2008
                                ToSwarm::CloseConnection {
1✔
2009
                                    peer_id: swarm1_id,
1✔
2010
                                    connection: CloseConnection::One(conn_id),
1✔
2011
                                },
1✔
2012
                            );
1✔
2013
                            Some(conn_id)
1✔
2014
                        };
1✔
2015
                        state = State::Disconnecting;
1✔
2016
                    }
22✔
2017
                }
1✔
2018
                State::Disconnecting => {
1✔
2019
                    for s in &[&swarm1, &swarm2] {
4✔
2020
                        assert!(s
4✔
2021
                            .behaviour
4✔
2022
                            .on_connection_closed
4✔
2023
                            .iter()
4✔
2024
                            .all(|(.., remaining_conns)| *remaining_conns > 0));
4✔
2025
                        assert_eq!(s.behaviour.on_connection_established.len(), num_connections);
4✔
2026
                        s.behaviour.assert_connected(num_connections, 1);
4✔
2027
                    }
1✔
2028
                    if [&swarm1, &swarm2]
2✔
2029
                        .iter()
2✔
2030
                        .all(|s| s.behaviour.on_connection_closed.len() == 1)
3✔
2031
                    {
1✔
2032
                        let conn_id = swarm2.behaviour.on_connection_closed[0].1;
1✔
2033
                        assert_eq!(Some(conn_id), disconnected_conn_id);
1✔
2034
                        return Poll::Ready(());
1✔
2035
                    }
1✔
2036
                }
1✔
2037
            }
1✔
2038

1✔
2039
            if poll1.is_pending() && poll2.is_pending() {
24✔
2040
                return Poll::Pending;
3✔
2041
            }
21✔
2042
        })
4✔
2043
        .await
1✔
2044
    }
1✔
2045

2046
    #[test]
2047
    fn concurrent_dialing() {
1✔
2048
        #[derive(Clone, Debug)]
2049
        struct DialConcurrencyFactor(NonZeroU8);
2050

2051
        impl Arbitrary for DialConcurrencyFactor {
2052
            fn arbitrary(g: &mut Gen) -> Self {
10✔
2053
                Self(NonZeroU8::new(g.gen_range(1..11)).unwrap())
10✔
2054
            }
10✔
2055
        }
2056

2057
        fn prop(concurrency_factor: DialConcurrencyFactor) {
10✔
2058
            tokio::runtime::Runtime::new().unwrap().block_on(async {
10✔
2059
                let mut swarm = new_test_swarm(
10✔
2060
                    Config::with_tokio_executor()
10✔
2061
                        .with_dial_concurrency_factor(concurrency_factor.0),
10✔
2062
                );
2063

2064
                // Listen on `concurrency_factor + 1` addresses.
2065
                //
2066
                // `+ 2` to ensure a subset of addresses is dialed by network_2.
2067
                let num_listen_addrs = concurrency_factor.0.get() + 2;
10✔
2068
                let mut listen_addresses = Vec::new();
10✔
2069
                let mut transports = Vec::new();
10✔
2070
                for _ in 0..num_listen_addrs {
10✔
2071
                    let mut transport = transport::MemoryTransport::default().boxed();
73✔
2072
                    transport
73✔
2073
                        .listen_on(ListenerId::next(), "/memory/0".parse().unwrap())
73✔
2074
                        .unwrap();
73✔
2075

2076
                    match transport.select_next_some().await {
73✔
2077
                        TransportEvent::NewAddress { listen_addr, .. } => {
73✔
2078
                            listen_addresses.push(listen_addr);
73✔
2079
                        }
73✔
2080
                        _ => panic!("Expected `NewListenAddr` event."),
2081
                    }
2082

2083
                    transports.push(transport);
73✔
2084
                }
2085

2086
                // Have swarm dial each listener and wait for each listener to receive the incoming
2087
                // connections.
2088
                swarm
10✔
2089
                    .dial(
10✔
2090
                        DialOpts::peer_id(PeerId::random())
10✔
2091
                            .addresses(listen_addresses)
10✔
2092
                            .build(),
10✔
2093
                    )
2094
                    .unwrap();
10✔
2095
                for mut transport in transports.into_iter() {
73✔
2096
                    match futures::future::select(transport.select_next_some(), swarm.next()).await
73✔
2097
                    {
2098
                        future::Either::Left((TransportEvent::Incoming { .. }, _)) => {}
73✔
2099
                        future::Either::Left(_) => {
2100
                            panic!("Unexpected transport event.")
2101
                        }
2102
                        future::Either::Right((e, _)) => {
2103
                            panic!("Expect swarm to not emit any event {e:?}")
2104
                        }
2105
                    }
2106
                }
2107

2108
                match swarm.next().await.unwrap() {
10✔
2109
                    SwarmEvent::OutgoingConnectionError { .. } => {}
10✔
2110
                    e => panic!("Unexpected swarm event {e:?}"),
2111
                }
2112
            })
10✔
2113
        }
10✔
2114

2115
        QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _);
1✔
2116
    }
1✔
2117

2118
    #[tokio::test]
2119
    async fn invalid_peer_id() {
1✔
2120
        // Checks whether dialing an address containing the wrong peer id raises an error
2121
        // for the expected peer id instead of the obtained peer id.
2122

2123
        let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1✔
2124
        let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1✔
2125

2126
        swarm1.listen_on("/memory/0".parse().unwrap()).unwrap();
1✔
2127

2128
        let address = future::poll_fn(|cx| match swarm1.poll_next_unpin(cx) {
1✔
2129
            Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => Poll::Ready(address),
1✔
2130
            Poll::Pending => Poll::Pending,
2131
            _ => panic!("Was expecting the listen address to be reported"),
2132
        })
1✔
2133
        .await;
1✔
2134

2135
        let other_id = PeerId::random();
1✔
2136
        let other_addr = address.with(multiaddr::Protocol::P2p(other_id));
1✔
2137

2138
        swarm2.dial(other_addr.clone()).unwrap();
1✔
2139

2140
        let (peer_id, error) = future::poll_fn(|cx| {
3✔
2141
            if let Poll::Ready(Some(SwarmEvent::IncomingConnection { .. })) =
2142
                swarm1.poll_next_unpin(cx)
3✔
2143
            {}
2✔
2144

2145
            match swarm2.poll_next_unpin(cx) {
3✔
2146
                Poll::Ready(Some(SwarmEvent::OutgoingConnectionError {
2147
                    peer_id, error, ..
1✔
2148
                })) => Poll::Ready((peer_id, error)),
1✔
2149
                Poll::Ready(x) => panic!("unexpected {x:?}"),
2150
                Poll::Pending => Poll::Pending,
2✔
2151
            }
2152
        })
3✔
2153
        .await;
1✔
2154
        assert_eq!(peer_id.unwrap(), other_id);
1✔
2155
        match error {
1✔
2156
            DialError::WrongPeerId { obtained, address } => {
1✔
2157
                assert_eq!(obtained, *swarm1.local_peer_id());
1✔
2158
                assert_eq!(address, other_addr);
1✔
2159
            }
1✔
2160
            x => panic!("wrong error {x:?}"),
1✔
2161
        }
1✔
2162
    }
1✔
2163

2164
    #[tokio::test]
2165
    async fn dial_self() {
1✔
2166
        // Check whether dialing ourselves correctly fails.
2167
        //
2168
        // Dialing the same address we're listening should result in three events:
2169
        //
2170
        // - The incoming connection notification (before we know the incoming peer ID).
2171
        // - The connection error for the dialing endpoint (once we've determined that it's our own
2172
        //   ID).
2173
        // - The connection error for the listening endpoint (once we've determined that it's our
2174
        //   own ID).
2175
        //
2176
        // The last two can happen in any order.
2177

2178
        let mut swarm = new_test_swarm(Config::with_tokio_executor());
1✔
2179
        swarm.listen_on("/memory/0".parse().unwrap()).unwrap();
1✔
2180

2181
        let local_address = future::poll_fn(|cx| match swarm.poll_next_unpin(cx) {
1✔
2182
            Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => Poll::Ready(address),
1✔
2183
            Poll::Pending => Poll::Pending,
2184
            _ => panic!("Was expecting the listen address to be reported"),
2185
        })
1✔
2186
        .await;
1✔
2187

2188
        // This is a hack to actually execute the dial
2189
        // to ourselves which would otherwise be filtered.
2190
        swarm.listened_addrs.clear();
1✔
2191
        swarm.dial(local_address.clone()).unwrap();
1✔
2192

2193
        let mut got_dial_err = false;
1✔
2194
        let mut got_inc_err = false;
1✔
2195
        future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {
3✔
2196
            loop {
1✔
2197
                match swarm.poll_next_unpin(cx) {
5✔
2198
                    Poll::Ready(Some(SwarmEvent::OutgoingConnectionError {
1✔
2199
                        peer_id,
1✔
2200
                        error: DialError::LocalPeerId { .. },
1✔
2201
                        ..
1✔
2202
                    })) => {
1✔
2203
                        assert_eq!(&peer_id.unwrap(), swarm.local_peer_id());
1✔
2204
                        assert!(!got_dial_err);
1✔
2205
                        got_dial_err = true;
1✔
2206
                        if got_inc_err {
1✔
2207
                            return Poll::Ready(Ok(()));
1✔
2208
                        }
1✔
2209
                    }
1✔
2210
                    Poll::Ready(Some(SwarmEvent::IncomingConnectionError {
1✔
2211
                        local_addr, ..
1✔
2212
                    })) => {
1✔
2213
                        assert!(!got_inc_err);
1✔
2214
                        assert_eq!(local_addr, local_address);
1✔
2215
                        got_inc_err = true;
1✔
2216
                        if got_dial_err {
1✔
2217
                            return Poll::Ready(Ok(()));
1✔
2218
                        }
1✔
2219
                    }
1✔
2220
                    Poll::Ready(Some(SwarmEvent::IncomingConnection { local_addr, .. })) => {
1✔
2221
                        assert_eq!(local_addr, local_address);
1✔
2222
                    }
1✔
2223
                    Poll::Ready(ev) => {
1✔
2224
                        panic!("Unexpected event: {ev:?}")
1✔
2225
                    }
1✔
2226
                    Poll::Pending => break Poll::Pending,
2✔
2227
                }
1✔
2228
            }
1✔
2229
        })
3✔
2230
        .await
1✔
2231
        .unwrap();
1✔
2232
    }
1✔
2233

2234
    #[tokio::test]
2235
    async fn dial_self_by_id() {
1✔
2236
        // Trying to dial self by passing the same `PeerId` shouldn't even be possible in the first
2237
        // place.
2238
        let swarm = new_test_swarm(Config::with_tokio_executor());
1✔
2239
        let peer_id = *swarm.local_peer_id();
1✔
2240
        assert!(!swarm.is_connected(&peer_id));
1✔
2241
    }
1✔
2242

2243
    #[tokio::test]
2244
    async fn multiple_addresses_err() {
1✔
2245
        // Tries dialing multiple addresses, and makes sure there's one dialing error per address.
2246

2247
        let target = PeerId::random();
1✔
2248

2249
        let mut swarm = new_test_swarm(Config::with_tokio_executor());
1✔
2250

2251
        let addresses = HashSet::from([
1✔
2252
            multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
1✔
2253
            multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
1✔
2254
            multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
1✔
2255
            multiaddr![Udp(rand::random::<u16>())],
1✔
2256
            multiaddr![Udp(rand::random::<u16>())],
1✔
2257
            multiaddr![Udp(rand::random::<u16>())],
1✔
2258
            multiaddr![Udp(rand::random::<u16>())],
1✔
2259
            multiaddr![Udp(rand::random::<u16>())],
1✔
2260
        ]);
1✔
2261

2262
        swarm
1✔
2263
            .dial(
1✔
2264
                DialOpts::peer_id(target)
1✔
2265
                    .addresses(addresses.iter().cloned().collect())
1✔
2266
                    .build(),
1✔
2267
            )
2268
            .unwrap();
1✔
2269

2270
        match swarm.next().await.unwrap() {
1✔
2271
            SwarmEvent::OutgoingConnectionError {
1✔
2272
                peer_id,
1✔
2273
                // multiaddr,
1✔
2274
                error: DialError::Transport(errors),
1✔
2275
                ..
1✔
2276
            } => {
1✔
2277
                assert_eq!(target, peer_id.unwrap());
1✔
2278

1✔
2279
                let failed_addresses = errors.into_iter().map(|(addr, _)| addr).collect::<Vec<_>>();
1✔
2280
                let expected_addresses = addresses
1✔
2281
                    .into_iter()
1✔
2282
                    .map(|addr| addr.with(multiaddr::Protocol::P2p(target)))
8✔
2283
                    .collect::<Vec<_>>();
1✔
2284

1✔
2285
                assert_eq!(expected_addresses, failed_addresses);
1✔
2286
            }
1✔
2287
            e => panic!("Unexpected event: {e:?}"),
1✔
2288
        }
1✔
2289
    }
1✔
2290

2291
    #[tokio::test]
2292
    async fn aborting_pending_connection_surfaces_error() {
1✔
2293
        let _ = tracing_subscriber::fmt()
1✔
2294
            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1✔
2295
            .try_init();
1✔
2296

2297
        let mut dialer = new_test_swarm(Config::with_tokio_executor());
1✔
2298
        let mut listener = new_test_swarm(Config::with_tokio_executor());
1✔
2299

2300
        let listener_peer_id = *listener.local_peer_id();
1✔
2301
        listener.listen_on(multiaddr![Memory(0u64)]).unwrap();
1✔
2302
        let listener_address = match listener.next().await.unwrap() {
1✔
2303
            SwarmEvent::NewListenAddr { address, .. } => address,
1✔
2304
            e => panic!("Unexpected network event: {e:?}"),
2305
        };
2306

2307
        dialer
1✔
2308
            .dial(
1✔
2309
                DialOpts::peer_id(listener_peer_id)
1✔
2310
                    .addresses(vec![listener_address])
1✔
2311
                    .build(),
1✔
2312
            )
2313
            .unwrap();
1✔
2314

2315
        dialer
1✔
2316
            .disconnect_peer_id(listener_peer_id)
1✔
2317
            .expect_err("Expect peer to not yet be connected.");
1✔
2318

2319
        match dialer.next().await.unwrap() {
1✔
2320
            SwarmEvent::OutgoingConnectionError {
1✔
2321
                error: DialError::Aborted,
1✔
2322
                ..
1✔
2323
            } => {}
1✔
2324
            e => panic!("Unexpected swarm event {e:?}."),
1✔
2325
        }
1✔
2326
    }
1✔
2327

2328
    #[test]
2329
    fn dial_error_prints_sources() {
1✔
2330
        // This constitutes a fairly typical error for chained transports.
2331
        let error = DialError::Transport(vec![(
1✔
2332
            "/ip4/127.0.0.1/tcp/80".parse().unwrap(),
1✔
2333
            TransportError::Other(io::Error::other(MemoryTransportError::Unreachable)),
1✔
2334
        )]);
1✔
2335

2336
        let string = format!("{error}");
1✔
2337

2338
        // Unfortunately, we have some "empty" errors
2339
        // that lead to multiple colons without text but that is the best we can do.
2340
        assert_eq!("Failed to negotiate transport protocol(s): [(/ip4/127.0.0.1/tcp/80: : No listener on the given port.)]", string)
1✔
2341
    }
1✔
2342
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc