• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

butlergroup / rust-libp2p / 18610913338

18 Oct 2025 04:41AM UTC coverage: 78.379% (+2.5%) from 75.842%
18610913338

push

github

butlergroup
	modified:   .github/workflows/ci.yml

36944 of 47135 relevant lines covered (78.38%)

37728.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.2
/swarm/src/handler.rs
1
// Copyright 2018 Parity Technologies (UK) Ltd.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a
4
// copy of this software and associated documentation files (the "Software"),
5
// to deal in the Software without restriction, including without limitation
6
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7
// and/or sell copies of the Software, and to permit persons to whom the
8
// Software is furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19
// DEALINGS IN THE SOFTWARE.
20

21
//! Once a connection to a remote peer is established, a [`ConnectionHandler`] negotiates
22
//! and handles one or more specific protocols on the connection.
23
//!
24
//! Protocols are negotiated and used on individual substreams of the connection. Thus a
25
//! [`ConnectionHandler`] defines the inbound and outbound upgrades to apply when creating a new
26
//! inbound or outbound substream, respectively, and is notified by a [`Swarm`](crate::Swarm) when
27
//! these upgrades have been successfully applied, including the final output of the upgrade. A
28
//! [`ConnectionHandler`] can then continue communicating with the peer over the substream using the
29
//! negotiated protocol(s).
30
//!
31
//! Two [`ConnectionHandler`]s can be composed with [`ConnectionHandler::select()`]
32
//! in order to build a new handler supporting the combined set of protocols,
33
//! with methods being dispatched to the appropriate handler according to the
34
//! used protocol(s) determined by the associated types of the handlers.
35
//!
36
//! > **Note**: A [`ConnectionHandler`] handles one or more protocols in the context of a single
37
//! > connection with a remote. In order to handle a protocol that requires knowledge of
38
//! > the network as a whole, see the
39
//! > [`NetworkBehaviour`](crate::behaviour::NetworkBehaviour) trait.
40

41
pub mod either;
42
mod map_in;
43
mod map_out;
44
pub mod multi;
45
mod one_shot;
46
mod pending;
47
mod select;
48

49
use core::slice;
50
use std::{
51
    collections::{HashMap, HashSet},
52
    error, fmt, io,
53
    task::{Context, Poll},
54
    time::Duration,
55
};
56

57
use libp2p_core::Multiaddr;
58
pub use map_in::MapInEvent;
59
pub use map_out::MapOutEvent;
60
pub use one_shot::{OneShotHandler, OneShotHandlerConfig};
61
pub use pending::PendingConnectionHandler;
62
pub use select::ConnectionHandlerSelect;
63
use smallvec::SmallVec;
64

65
pub use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper, UpgradeInfoSend};
66
use crate::{connection::AsStrHashEq, StreamProtocol};
67

68
/// A handler for a set of protocols used on a connection with a remote.
69
///
70
/// This trait should be implemented for a type that maintains the state for
71
/// the execution of a specific protocol with a remote.
72
///
73
/// # Handling a protocol
74
///
75
/// Communication with a remote over a set of protocols is initiated in one of two ways:
76
///
77
///   1. Dialing by initiating a new outbound substream. In order to do so,
78
///      [`ConnectionHandler::poll()`] must return an
79
///      [`ConnectionHandlerEvent::OutboundSubstreamRequest`], providing an instance of
80
///      [`libp2p_core::upgrade::OutboundUpgrade`] that is used to negotiate the protocol(s). Upon
81
///      success, [`ConnectionHandler::on_connection_event`] is called with
82
///      [`ConnectionEvent::FullyNegotiatedOutbound`] translating the final output of the upgrade.
83
///
84
///   2. Listening by accepting a new inbound substream. When a new inbound substream is created on
85
///      a connection, [`ConnectionHandler::listen_protocol`] is called to obtain an instance of
86
///      [`libp2p_core::upgrade::InboundUpgrade`] that is used to negotiate the protocol(s). Upon
87
///      success, [`ConnectionHandler::on_connection_event`] is called with
88
///      [`ConnectionEvent::FullyNegotiatedInbound`] translating the final output of the upgrade.
89
///
90
///
91
/// # Connection Keep-Alive
92
///
93
/// A [`ConnectionHandler`] can influence the lifetime of the underlying connection
94
/// through [`ConnectionHandler::connection_keep_alive`]. That is, the protocol
95
/// implemented by the handler can include conditions for terminating the connection.
96
/// The lifetime of successfully negotiated substreams is fully controlled by the handler.
97
///
98
/// Implementers of this trait should keep in mind that the connection can be closed at any time.
99
/// When a connection is closed gracefully, the substreams used by the handler may still
100
/// continue reading data until the remote closes its side of the connection.
101
pub trait ConnectionHandler: Send + 'static {
102
    /// A type representing the message(s) a
103
    /// [`NetworkBehaviour`](crate::behaviour::NetworkBehaviour) can send to a [`ConnectionHandler`]
104
    /// via [`ToSwarm::NotifyHandler`](crate::behaviour::ToSwarm::NotifyHandler)
105
    type FromBehaviour: fmt::Debug + Send + 'static;
106
    /// A type representing message(s) a [`ConnectionHandler`] can send to a
107
    /// [`NetworkBehaviour`](crate::behaviour::NetworkBehaviour) via
108
    /// [`ConnectionHandlerEvent::NotifyBehaviour`].
109
    type ToBehaviour: fmt::Debug + Send + 'static;
110
    /// The inbound upgrade for the protocol(s) used by the handler.
111
    type InboundProtocol: InboundUpgradeSend;
112
    /// The outbound upgrade for the protocol(s) used by the handler.
113
    type OutboundProtocol: OutboundUpgradeSend;
114
    /// The type of additional information returned from `listen_protocol`.
115
    type InboundOpenInfo: Send + 'static;
116
    /// The type of additional information passed to an `OutboundSubstreamRequest`.
117
    type OutboundOpenInfo: Send + 'static;
118

119
    /// The [`InboundUpgrade`](libp2p_core::upgrade::InboundUpgrade) to apply on inbound
120
    /// substreams to negotiate the desired protocols.
121
    ///
122
    /// > **Note**: The returned `InboundUpgrade` should always accept all the generally
123
    /// > supported protocols, even if in a specific context a particular one is
124
    /// > not supported, (eg. when only allowing one substream at a time for a protocol).
125
    /// > This allows a remote to put the list of supported protocols in a cache.
126
    fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo>;
127

128
    /// Returns whether the connection should be kept alive.
129
    ///
130
    /// ## Keep alive algorithm
131
    ///
132
    /// A connection is always kept alive:
133
    ///
134
    /// - Whilst a [`ConnectionHandler`] returns [`Poll::Ready`].
135
    /// - We are negotiating inbound or outbound streams.
136
    /// - There are active [`Stream`](crate::Stream)s on the connection.
137
    ///
138
    /// The combination of the above means that _most_ protocols will not need to override this
139
    /// method. This method is only invoked when all of the above are `false`, i.e. when the
140
    /// connection is entirely idle.
141
    ///
142
    /// ## Exceptions
143
    ///
144
    /// - Protocols like [circuit-relay v2](https://github.com/libp2p/specs/blob/master/relay/circuit-v2.md)
145
    ///   need to keep a connection alive beyond these circumstances and can thus override this
146
    ///   method.
147
    /// - Protocols like [ping](https://github.com/libp2p/specs/blob/master/ping/ping.md) **don't**
148
    ///   want to keep a connection alive despite an active streams.
149
    ///
150
    /// In that case, protocol authors can use
151
    /// [`Stream::ignore_for_keep_alive`](crate::Stream::ignore_for_keep_alive) to opt-out a
152
    /// particular stream from the keep-alive algorithm.
153
    fn connection_keep_alive(&self) -> bool {
94,347✔
154
        false
94,347✔
155
    }
94,347✔
156

157
    /// Should behave like `Stream::poll()`.
158
    fn poll(
159
        &mut self,
160
        cx: &mut Context<'_>,
161
    ) -> Poll<
162
        ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
163
    >;
164

165
    /// Gracefully close the [`ConnectionHandler`].
166
    ///
167
    /// The contract for this function is equivalent to a [`Stream`](futures::Stream).
168
    /// When a connection is being shut down, we will first poll this function to completion.
169
    /// Following that, the physical connection will be shut down.
170
    ///
171
    /// This is also called when the shutdown was initiated due to an error on the connection.
172
    /// We therefore cannot guarantee that performing IO within here will succeed.
173
    ///
174
    /// To signal completion, [`Poll::Ready(None)`] should be returned.
175
    ///
176
    /// Implementations MUST have a [`fuse`](futures::StreamExt::fuse)-like behaviour.
177
    /// That is, [`Poll::Ready(None)`] MUST be returned on repeated calls to
178
    /// [`ConnectionHandler::poll_close`].
179
    fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<Option<Self::ToBehaviour>> {
2,314✔
180
        Poll::Ready(None)
2,314✔
181
    }
2,314✔
182

183
    /// Adds a closure that turns the input event into something else.
184
    fn map_in_event<TNewIn, TMap>(self, map: TMap) -> MapInEvent<Self, TNewIn, TMap>
×
185
    where
×
186
        Self: Sized,
×
187
        TMap: Fn(&TNewIn) -> Option<&Self::FromBehaviour>,
×
188
    {
189
        MapInEvent::new(self, map)
×
190
    }
×
191

192
    /// Adds a closure that turns the output event into something else.
193
    fn map_out_event<TMap, TNewOut>(self, map: TMap) -> MapOutEvent<Self, TMap>
×
194
    where
×
195
        Self: Sized,
×
196
        TMap: FnMut(Self::ToBehaviour) -> TNewOut,
×
197
    {
198
        MapOutEvent::new(self, map)
×
199
    }
×
200

201
    /// Creates a new [`ConnectionHandler`] that selects either this handler or
202
    /// `other` by delegating methods calls appropriately.
203
    fn select<TProto2>(self, other: TProto2) -> ConnectionHandlerSelect<Self, TProto2>
6,680✔
204
    where
6,680✔
205
        Self: Sized,
6,680✔
206
    {
207
        ConnectionHandlerSelect::new(self, other)
6,680✔
208
    }
6,680✔
209

210
    /// Informs the handler about an event from the [`NetworkBehaviour`](super::NetworkBehaviour).
211
    fn on_behaviour_event(&mut self, _event: Self::FromBehaviour);
212

213
    fn on_connection_event(
214
        &mut self,
215
        event: ConnectionEvent<
216
            Self::InboundProtocol,
217
            Self::OutboundProtocol,
218
            Self::InboundOpenInfo,
219
            Self::OutboundOpenInfo,
220
        >,
221
    );
222
}
223

224
/// Enumeration with the list of the possible stream events
225
/// to pass to [`on_connection_event`](ConnectionHandler::on_connection_event).
226
#[non_exhaustive]
227
pub enum ConnectionEvent<'a, IP: InboundUpgradeSend, OP: OutboundUpgradeSend, IOI = (), OOI = ()> {
228
    /// Informs the handler about the output of a successful upgrade on a new inbound substream.
229
    FullyNegotiatedInbound(FullyNegotiatedInbound<IP, IOI>),
230
    /// Informs the handler about the output of a successful upgrade on a new outbound stream.
231
    FullyNegotiatedOutbound(FullyNegotiatedOutbound<OP, OOI>),
232
    /// Informs the handler about a change in the address of the remote.
233
    AddressChange(AddressChange<'a>),
234
    /// Informs the handler that upgrading an outbound substream to the given protocol has failed.
235
    DialUpgradeError(DialUpgradeError<OOI, OP>),
236
    /// Informs the handler that upgrading an inbound substream to the given protocol has failed.
237
    ListenUpgradeError(ListenUpgradeError<IOI, IP>),
238
    /// The local [`ConnectionHandler`] added or removed support for one or more protocols.
239
    LocalProtocolsChange(ProtocolsChange<'a>),
240
    /// The remote [`ConnectionHandler`] now supports a different set of protocols.
241
    RemoteProtocolsChange(ProtocolsChange<'a>),
242
}
243

244
impl<IP, OP, IOI, OOI> fmt::Debug for ConnectionEvent<'_, IP, OP, IOI, OOI>
245
where
246
    IP: InboundUpgradeSend + fmt::Debug,
247
    IP::Output: fmt::Debug,
248
    IP::Error: fmt::Debug,
249
    OP: OutboundUpgradeSend + fmt::Debug,
250
    OP::Output: fmt::Debug,
251
    OP::Error: fmt::Debug,
252
    IOI: fmt::Debug,
253
    OOI: fmt::Debug,
254
{
255
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
256
        match self {
×
257
            ConnectionEvent::FullyNegotiatedInbound(v) => {
×
258
                f.debug_tuple("FullyNegotiatedInbound").field(v).finish()
×
259
            }
260
            ConnectionEvent::FullyNegotiatedOutbound(v) => {
×
261
                f.debug_tuple("FullyNegotiatedOutbound").field(v).finish()
×
262
            }
263
            ConnectionEvent::AddressChange(v) => f.debug_tuple("AddressChange").field(v).finish(),
×
264
            ConnectionEvent::DialUpgradeError(v) => {
×
265
                f.debug_tuple("DialUpgradeError").field(v).finish()
×
266
            }
267
            ConnectionEvent::ListenUpgradeError(v) => {
×
268
                f.debug_tuple("ListenUpgradeError").field(v).finish()
×
269
            }
270
            ConnectionEvent::LocalProtocolsChange(v) => {
×
271
                f.debug_tuple("LocalProtocolsChange").field(v).finish()
×
272
            }
273
            ConnectionEvent::RemoteProtocolsChange(v) => {
×
274
                f.debug_tuple("RemoteProtocolsChange").field(v).finish()
×
275
            }
276
        }
277
    }
×
278
}
279

280
impl<IP: InboundUpgradeSend, OP: OutboundUpgradeSend, IOI, OOI>
281
    ConnectionEvent<'_, IP, OP, IOI, OOI>
282
{
283
    /// Whether the event concerns an outbound stream.
284
    pub fn is_outbound(&self) -> bool {
18✔
285
        match self {
18✔
286
            ConnectionEvent::DialUpgradeError(_) | ConnectionEvent::FullyNegotiatedOutbound(_) => {
287
                true
6✔
288
            }
289
            ConnectionEvent::FullyNegotiatedInbound(_)
290
            | ConnectionEvent::AddressChange(_)
291
            | ConnectionEvent::LocalProtocolsChange(_)
292
            | ConnectionEvent::RemoteProtocolsChange(_)
293
            | ConnectionEvent::ListenUpgradeError(_) => false,
12✔
294
        }
295
    }
18✔
296

297
    /// Whether the event concerns an inbound stream.
298
    pub fn is_inbound(&self) -> bool {
18✔
299
        match self {
18✔
300
            ConnectionEvent::FullyNegotiatedInbound(_) | ConnectionEvent::ListenUpgradeError(_) => {
301
                true
6✔
302
            }
303
            ConnectionEvent::FullyNegotiatedOutbound(_)
304
            | ConnectionEvent::AddressChange(_)
305
            | ConnectionEvent::LocalProtocolsChange(_)
306
            | ConnectionEvent::RemoteProtocolsChange(_)
307
            | ConnectionEvent::DialUpgradeError(_) => false,
12✔
308
        }
309
    }
18✔
310
}
311

312
/// [`ConnectionEvent`] variant that informs the handler about
313
/// the output of a successful upgrade on a new inbound substream.
314
///
315
/// Note that it is up to the [`ConnectionHandler`] implementation to manage the lifetime of the
316
/// negotiated inbound substreams. E.g. the implementation has to enforce a limit on the number
317
/// of simultaneously open negotiated inbound substreams. In other words it is up to the
318
/// [`ConnectionHandler`] implementation to stop a malicious remote node to open and keep alive
319
/// an excessive amount of inbound substreams.
320
#[derive(Debug)]
321
pub struct FullyNegotiatedInbound<IP: InboundUpgradeSend, IOI = ()> {
322
    pub protocol: IP::Output,
323
    pub info: IOI,
324
}
325

326
/// [`ConnectionEvent`] variant that informs the handler about successful upgrade on a new outbound
327
/// stream.
328
///
329
/// The `protocol` field is the information that was previously passed to
330
/// [`ConnectionHandlerEvent::OutboundSubstreamRequest`].
331
#[derive(Debug)]
332
pub struct FullyNegotiatedOutbound<OP: OutboundUpgradeSend, OOI = ()> {
333
    pub protocol: OP::Output,
334
    pub info: OOI,
335
}
336

337
/// [`ConnectionEvent`] variant that informs the handler about a change in the address of the
338
/// remote.
339
#[derive(Debug)]
340
pub struct AddressChange<'a> {
341
    pub new_address: &'a Multiaddr,
342
}
343

344
/// [`ConnectionEvent`] variant that informs the handler about a change in the protocols supported
345
/// on the connection.
346
#[derive(Debug, Clone)]
347
pub enum ProtocolsChange<'a> {
348
    Added(ProtocolsAdded<'a>),
349
    Removed(ProtocolsRemoved<'a>),
350
}
351

352
impl<'a> ProtocolsChange<'a> {
353
    /// Compute the protocol change for the initial set of protocols.
354
    pub(crate) fn from_initial_protocols<'b, T: AsRef<str> + 'b>(
22,241✔
355
        new_protocols: impl IntoIterator<Item = &'b T>,
22,241✔
356
        buffer: &'a mut Vec<StreamProtocol>,
22,241✔
357
    ) -> Self {
22,241✔
358
        buffer.clear();
22,241✔
359
        buffer.extend(
22,241✔
360
            new_protocols
22,241✔
361
                .into_iter()
22,241✔
362
                .filter_map(|i| StreamProtocol::try_from_owned(i.as_ref().to_owned()).ok()),
22,390✔
363
        );
364

365
        ProtocolsChange::Added(ProtocolsAdded {
22,241✔
366
            protocols: buffer.iter(),
22,241✔
367
        })
22,241✔
368
    }
22,241✔
369

370
    /// Compute the [`ProtocolsChange`] that results from adding `to_add` to `existing_protocols`.
371
    ///
372
    /// Returns `None` if the change is a no-op, i.e. `to_add` is a subset of `existing_protocols`.
373
    pub(crate) fn add(
212✔
374
        existing_protocols: &HashSet<StreamProtocol>,
212✔
375
        to_add: HashSet<StreamProtocol>,
212✔
376
        buffer: &'a mut Vec<StreamProtocol>,
212✔
377
    ) -> Option<Self> {
212✔
378
        buffer.clear();
212✔
379
        buffer.extend(
212✔
380
            to_add
212✔
381
                .into_iter()
212✔
382
                .filter(|i| !existing_protocols.contains(i)),
509✔
383
        );
384

385
        if buffer.is_empty() {
212✔
386
            return None;
×
387
        }
212✔
388

389
        Some(Self::Added(ProtocolsAdded {
212✔
390
            protocols: buffer.iter(),
212✔
391
        }))
212✔
392
    }
212✔
393

394
    /// Compute the [`ProtocolsChange`] that results from removing `to_remove` from
395
    /// `existing_protocols`. Removes the protocols from `existing_protocols`.
396
    ///
397
    /// Returns `None` if the change is a no-op, i.e. none of the protocols in `to_remove` are in
398
    /// `existing_protocols`.
399
    pub(crate) fn remove(
7✔
400
        existing_protocols: &mut HashSet<StreamProtocol>,
7✔
401
        to_remove: HashSet<StreamProtocol>,
7✔
402
        buffer: &'a mut Vec<StreamProtocol>,
7✔
403
    ) -> Option<Self> {
7✔
404
        buffer.clear();
7✔
405
        buffer.extend(
7✔
406
            to_remove
7✔
407
                .into_iter()
7✔
408
                .filter_map(|i| existing_protocols.take(&i)),
13✔
409
        );
410

411
        if buffer.is_empty() {
7✔
412
            return None;
3✔
413
        }
4✔
414

415
        Some(Self::Removed(ProtocolsRemoved {
4✔
416
            protocols: buffer.iter(),
4✔
417
        }))
4✔
418
    }
7✔
419

420
    /// Compute the [`ProtocolsChange`]s required to go from `existing_protocols` to
421
    /// `new_protocols`.
422
    pub(crate) fn from_full_sets<T: AsRef<str>>(
231,107✔
423
        existing_protocols: &mut HashMap<AsStrHashEq<T>, bool>,
231,107✔
424
        new_protocols: impl IntoIterator<Item = T>,
231,107✔
425
        buffer: &'a mut Vec<StreamProtocol>,
231,107✔
426
    ) -> SmallVec<[Self; 2]> {
231,107✔
427
        buffer.clear();
231,107✔
428

429
        // Initially, set the boolean for all protocols to `false`, meaning "not visited".
430
        for v in existing_protocols.values_mut() {
233,271✔
431
            *v = false;
220,830✔
432
        }
220,830✔
433

434
        let mut new_protocol_count = 0; // We can only iterate `new_protocols` once, so keep track of its length separately.
231,107✔
435
        for new_protocol in new_protocols {
451,939✔
436
            existing_protocols
220,832✔
437
                .entry(AsStrHashEq(new_protocol))
220,832✔
438
                .and_modify(|v| *v = true) // Mark protocol as visited (i.e. we still support it)
220,832✔
439
                .or_insert_with_key(|k| {
220,832✔
440
                    // Encountered a previously unsupported protocol, remember it in `buffer`.
441
                    buffer.extend(StreamProtocol::try_from_owned(k.0.as_ref().to_owned()).ok());
10✔
442
                    true
10✔
443
                });
10✔
444
            new_protocol_count += 1;
220,832✔
445
        }
446

447
        if new_protocol_count == existing_protocols.len() && buffer.is_empty() {
231,107✔
448
            return SmallVec::new();
231,095✔
449
        }
12✔
450

451
        let num_new_protocols = buffer.len();
12✔
452
        // Drain all protocols that we haven't visited.
453
        // For existing protocols that are not in `new_protocols`, the boolean will be false,
454
        // meaning we need to remove it.
455
        existing_protocols.retain(|p, &mut is_supported| {
32✔
456
            if !is_supported {
32✔
457
                buffer.extend(StreamProtocol::try_from_owned(p.0.as_ref().to_owned()).ok());
8✔
458
            }
26✔
459

460
            is_supported
32✔
461
        });
32✔
462

463
        let (added, removed) = buffer.split_at(num_new_protocols);
12✔
464
        let mut changes = SmallVec::new();
12✔
465
        if !added.is_empty() {
12✔
466
            changes.push(ProtocolsChange::Added(ProtocolsAdded {
8✔
467
                protocols: added.iter(),
8✔
468
            }));
8✔
469
        }
10✔
470
        if !removed.is_empty() {
12✔
471
            changes.push(ProtocolsChange::Removed(ProtocolsRemoved {
6✔
472
                protocols: removed.iter(),
6✔
473
            }));
6✔
474
        }
9✔
475
        changes
12✔
476
    }
231,107✔
477
}
478

479
/// An [`Iterator`] over all protocols that have been added.
480
#[derive(Debug, Clone)]
481
pub struct ProtocolsAdded<'a> {
482
    pub(crate) protocols: slice::Iter<'a, StreamProtocol>,
483
}
484

485
/// An [`Iterator`] over all protocols that have been removed.
486
#[derive(Debug, Clone)]
487
pub struct ProtocolsRemoved<'a> {
488
    pub(crate) protocols: slice::Iter<'a, StreamProtocol>,
489
}
490

491
impl<'a> Iterator for ProtocolsAdded<'a> {
492
    type Item = &'a StreamProtocol;
493
    fn next(&mut self) -> Option<Self::Item> {
952✔
494
        self.protocols.next()
952✔
495
    }
952✔
496
}
497

498
impl<'a> Iterator for ProtocolsRemoved<'a> {
499
    type Item = &'a StreamProtocol;
500
    fn next(&mut self) -> Option<Self::Item> {
27✔
501
        self.protocols.next()
27✔
502
    }
27✔
503
}
504

505
/// [`ConnectionEvent`] variant that informs the handler
506
/// that upgrading an outbound substream to the given protocol has failed.
507
#[derive(Debug)]
508
pub struct DialUpgradeError<OOI, OP: OutboundUpgradeSend> {
509
    pub info: OOI,
510
    pub error: StreamUpgradeError<OP::Error>,
511
}
512

513
/// [`ConnectionEvent`] variant that informs the handler
514
/// that upgrading an inbound substream to the given protocol has failed.
515
#[derive(Debug)]
516
pub struct ListenUpgradeError<IOI, IP: InboundUpgradeSend> {
517
    pub info: IOI,
518
    pub error: IP::Error,
519
}
520

521
/// Configuration of inbound or outbound substream protocol(s)
522
/// for a [`ConnectionHandler`].
523
///
524
/// The inbound substream protocol(s) are defined by [`ConnectionHandler::listen_protocol`]
525
/// and the outbound substream protocol(s) by [`ConnectionHandlerEvent::OutboundSubstreamRequest`].
526
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
527
pub struct SubstreamProtocol<TUpgrade, TInfo = ()> {
528
    upgrade: TUpgrade,
529
    info: TInfo,
530
    timeout: Duration,
531
}
532

533
impl<TUpgrade, TInfo> SubstreamProtocol<TUpgrade, TInfo> {
534
    /// Create a new `SubstreamProtocol` from the given upgrade.
535
    ///
536
    /// The default timeout for applying the given upgrade on a substream is
537
    /// 10 seconds.
538
    pub fn new(upgrade: TUpgrade, info: TInfo) -> Self {
395,156✔
539
        SubstreamProtocol {
395,156✔
540
            upgrade,
395,156✔
541
            info,
395,156✔
542
            timeout: Duration::from_secs(10),
395,156✔
543
        }
395,156✔
544
    }
395,156✔
545

546
    /// Maps a function over the protocol upgrade.
547
    pub fn map_upgrade<U, F>(self, f: F) -> SubstreamProtocol<U, TInfo>
1,886✔
548
    where
1,886✔
549
        F: FnOnce(TUpgrade) -> U,
1,886✔
550
    {
551
        SubstreamProtocol {
1,886✔
552
            upgrade: f(self.upgrade),
1,886✔
553
            info: self.info,
1,886✔
554
            timeout: self.timeout,
1,886✔
555
        }
1,886✔
556
    }
1,886✔
557

558
    /// Maps a function over the protocol info.
559
    pub fn map_info<U, F>(self, f: F) -> SubstreamProtocol<TUpgrade, U>
1,886✔
560
    where
1,886✔
561
        F: FnOnce(TInfo) -> U,
1,886✔
562
    {
563
        SubstreamProtocol {
1,886✔
564
            upgrade: self.upgrade,
1,886✔
565
            info: f(self.info),
1,886✔
566
            timeout: self.timeout,
1,886✔
567
        }
1,886✔
568
    }
1,886✔
569

570
    /// Sets a new timeout for the protocol upgrade.
571
    pub fn with_timeout(mut self, timeout: Duration) -> Self {
38,686✔
572
        self.timeout = timeout;
38,686✔
573
        self
38,686✔
574
    }
38,686✔
575

576
    /// Borrows the contained protocol upgrade.
577
    pub fn upgrade(&self) -> &TUpgrade {
289,126✔
578
        &self.upgrade
289,126✔
579
    }
289,126✔
580

581
    /// Borrows the contained protocol info.
582
    pub fn info(&self) -> &TInfo {
×
583
        &self.info
×
584
    }
×
585

586
    /// Borrows the timeout for the protocol upgrade.
587
    pub fn timeout(&self) -> &Duration {
104,242✔
588
        &self.timeout
104,242✔
589
    }
104,242✔
590

591
    /// Converts the substream protocol configuration into the contained upgrade.
592
    pub fn into_upgrade(self) -> (TUpgrade, TInfo) {
104,242✔
593
        (self.upgrade, self.info)
104,242✔
594
    }
104,242✔
595
}
596

597
/// Event produced by a handler.
598
#[derive(Debug, Clone, PartialEq, Eq)]
599
#[non_exhaustive]
600
pub enum ConnectionHandlerEvent<TConnectionUpgrade, TOutboundOpenInfo, TCustom> {
601
    /// Request a new outbound substream to be opened with the remote.
602
    OutboundSubstreamRequest {
603
        /// The protocol(s) to apply on the substream.
604
        protocol: SubstreamProtocol<TConnectionUpgrade, TOutboundOpenInfo>,
605
    },
606
    /// We learned something about the protocols supported by the remote.
607
    ReportRemoteProtocols(ProtocolSupport),
608

609
    /// Event that is sent to a [`NetworkBehaviour`](crate::behaviour::NetworkBehaviour).
610
    NotifyBehaviour(TCustom),
611
}
612

613
#[derive(Debug, Clone, PartialEq, Eq)]
614
pub enum ProtocolSupport {
615
    /// The remote now supports these additional protocols.
616
    Added(HashSet<StreamProtocol>),
617
    /// The remote no longer supports these protocols.
618
    Removed(HashSet<StreamProtocol>),
619
}
620

621
/// Event produced by a handler.
622
impl<TConnectionUpgrade, TOutboundOpenInfo, TCustom>
623
    ConnectionHandlerEvent<TConnectionUpgrade, TOutboundOpenInfo, TCustom>
624
{
625
    /// If this is an `OutboundSubstreamRequest`, maps the `info` member from a
626
    /// `TOutboundOpenInfo` to something else.
627
    pub fn map_outbound_open_info<F, I>(
208✔
628
        self,
208✔
629
        map: F,
208✔
630
    ) -> ConnectionHandlerEvent<TConnectionUpgrade, I, TCustom>
208✔
631
    where
208✔
632
        F: FnOnce(TOutboundOpenInfo) -> I,
208✔
633
    {
634
        match self {
208✔
635
            ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => {
59✔
636
                ConnectionHandlerEvent::OutboundSubstreamRequest {
59✔
637
                    protocol: protocol.map_info(map),
59✔
638
                }
59✔
639
            }
640
            ConnectionHandlerEvent::NotifyBehaviour(val) => {
149✔
641
                ConnectionHandlerEvent::NotifyBehaviour(val)
149✔
642
            }
643
            ConnectionHandlerEvent::ReportRemoteProtocols(support) => {
×
644
                ConnectionHandlerEvent::ReportRemoteProtocols(support)
×
645
            }
646
        }
647
    }
208✔
648

649
    /// If this is an `OutboundSubstreamRequest`, maps the protocol (`TConnectionUpgrade`)
650
    /// to something else.
651
    pub fn map_protocol<F, I>(self, map: F) -> ConnectionHandlerEvent<I, TOutboundOpenInfo, TCustom>
208✔
652
    where
208✔
653
        F: FnOnce(TConnectionUpgrade) -> I,
208✔
654
    {
655
        match self {
208✔
656
            ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => {
59✔
657
                ConnectionHandlerEvent::OutboundSubstreamRequest {
59✔
658
                    protocol: protocol.map_upgrade(map),
59✔
659
                }
59✔
660
            }
661
            ConnectionHandlerEvent::NotifyBehaviour(val) => {
149✔
662
                ConnectionHandlerEvent::NotifyBehaviour(val)
149✔
663
            }
664
            ConnectionHandlerEvent::ReportRemoteProtocols(support) => {
×
665
                ConnectionHandlerEvent::ReportRemoteProtocols(support)
×
666
            }
667
        }
668
    }
208✔
669

670
    /// If this is a `Custom` event, maps the content to something else.
671
    pub fn map_custom<F, I>(
208✔
672
        self,
208✔
673
        map: F,
208✔
674
    ) -> ConnectionHandlerEvent<TConnectionUpgrade, TOutboundOpenInfo, I>
208✔
675
    where
208✔
676
        F: FnOnce(TCustom) -> I,
208✔
677
    {
678
        match self {
208✔
679
            ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => {
59✔
680
                ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }
59✔
681
            }
682
            ConnectionHandlerEvent::NotifyBehaviour(val) => {
149✔
683
                ConnectionHandlerEvent::NotifyBehaviour(map(val))
149✔
684
            }
685
            ConnectionHandlerEvent::ReportRemoteProtocols(support) => {
×
686
                ConnectionHandlerEvent::ReportRemoteProtocols(support)
×
687
            }
688
        }
689
    }
208✔
690
}
691

692
/// Error that can happen on an outbound substream opening attempt.
693
#[derive(Debug)]
694
pub enum StreamUpgradeError<TUpgrErr> {
695
    /// The opening attempt timed out before the negotiation was fully completed.
696
    Timeout,
697
    /// The upgrade produced an error.
698
    Apply(TUpgrErr),
699
    /// No protocol could be agreed upon.
700
    NegotiationFailed,
701
    /// An IO or otherwise unrecoverable error happened.
702
    Io(io::Error),
703
}
704

705
impl<TUpgrErr> StreamUpgradeError<TUpgrErr> {
706
    /// Map the inner [`StreamUpgradeError`] type.
707
    pub fn map_upgrade_err<F, E>(self, f: F) -> StreamUpgradeError<E>
×
708
    where
×
709
        F: FnOnce(TUpgrErr) -> E,
×
710
    {
711
        match self {
×
712
            StreamUpgradeError::Timeout => StreamUpgradeError::Timeout,
×
713
            StreamUpgradeError::Apply(e) => StreamUpgradeError::Apply(f(e)),
×
714
            StreamUpgradeError::NegotiationFailed => StreamUpgradeError::NegotiationFailed,
×
715
            StreamUpgradeError::Io(e) => StreamUpgradeError::Io(e),
×
716
        }
717
    }
×
718
}
719

720
impl<TUpgrErr> fmt::Display for StreamUpgradeError<TUpgrErr>
721
where
722
    TUpgrErr: error::Error + 'static,
723
{
724
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
725
        match self {
×
726
            StreamUpgradeError::Timeout => {
727
                write!(f, "Timeout error while opening a substream")
×
728
            }
729
            StreamUpgradeError::Apply(err) => {
×
730
                write!(f, "Apply: ")?;
×
731
                crate::print_error_chain(f, err)
×
732
            }
733
            StreamUpgradeError::NegotiationFailed => {
734
                write!(f, "no protocols could be agreed upon")
×
735
            }
736
            StreamUpgradeError::Io(e) => {
×
737
                write!(f, "IO error: ")?;
×
738
                crate::print_error_chain(f, e)
×
739
            }
740
        }
741
    }
×
742
}
743

744
impl<TUpgrErr> error::Error for StreamUpgradeError<TUpgrErr>
745
where
746
    TUpgrErr: error::Error + 'static,
747
{
748
    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
×
749
        None
×
750
    }
×
751
}
752

753
#[cfg(test)]
754
mod test {
755
    use super::*;
756

757
    fn protocol_set_of(s: &'static str) -> HashSet<StreamProtocol> {
40✔
758
        s.split_whitespace()
40✔
759
            .map(|p| StreamProtocol::try_from_owned(format!("/{p}")).unwrap())
67✔
760
            .collect()
40✔
761
    }
40✔
762

763
    fn test_remove(
5✔
764
        existing: &mut HashSet<StreamProtocol>,
5✔
765
        to_remove: HashSet<StreamProtocol>,
5✔
766
    ) -> HashSet<StreamProtocol> {
5✔
767
        ProtocolsChange::remove(existing, to_remove, &mut Vec::new())
5✔
768
            .into_iter()
5✔
769
            .flat_map(|c| match c {
5✔
770
                ProtocolsChange::Added(_) => panic!("unexpected added"),
771
                ProtocolsChange::Removed(r) => r.cloned(),
3✔
772
            })
3✔
773
            .collect::<HashSet<_>>()
5✔
774
    }
5✔
775

776
    #[test]
777
    fn test_protocol_remove_subset() {
1✔
778
        let mut existing = protocol_set_of("a b c");
1✔
779
        let to_remove = protocol_set_of("a b");
1✔
780

781
        let change = test_remove(&mut existing, to_remove);
1✔
782

783
        assert_eq!(existing, protocol_set_of("c"));
1✔
784
        assert_eq!(change, protocol_set_of("a b"));
1✔
785
    }
1✔
786

787
    #[test]
788
    fn test_protocol_remove_all() {
1✔
789
        let mut existing = protocol_set_of("a b c");
1✔
790
        let to_remove = protocol_set_of("a b c");
1✔
791

792
        let change = test_remove(&mut existing, to_remove);
1✔
793

794
        assert_eq!(existing, protocol_set_of(""));
1✔
795
        assert_eq!(change, protocol_set_of("a b c"));
1✔
796
    }
1✔
797

798
    #[test]
799
    fn test_protocol_remove_superset() {
1✔
800
        let mut existing = protocol_set_of("a b c");
1✔
801
        let to_remove = protocol_set_of("a b c d");
1✔
802

803
        let change = test_remove(&mut existing, to_remove);
1✔
804

805
        assert_eq!(existing, protocol_set_of(""));
1✔
806
        assert_eq!(change, protocol_set_of("a b c"));
1✔
807
    }
1✔
808

809
    #[test]
810
    fn test_protocol_remove_none() {
1✔
811
        let mut existing = protocol_set_of("a b c");
1✔
812
        let to_remove = protocol_set_of("d");
1✔
813

814
        let change = test_remove(&mut existing, to_remove);
1✔
815

816
        assert_eq!(existing, protocol_set_of("a b c"));
1✔
817
        assert_eq!(change, protocol_set_of(""));
1✔
818
    }
1✔
819

820
    #[test]
821
    fn test_protocol_remove_none_from_empty() {
1✔
822
        let mut existing = protocol_set_of("");
1✔
823
        let to_remove = protocol_set_of("d");
1✔
824

825
        let change = test_remove(&mut existing, to_remove);
1✔
826

827
        assert_eq!(existing, protocol_set_of(""));
1✔
828
        assert_eq!(change, protocol_set_of(""));
1✔
829
    }
1✔
830

831
    fn test_from_full_sets(
5✔
832
        existing: HashSet<StreamProtocol>,
5✔
833
        new: HashSet<StreamProtocol>,
5✔
834
    ) -> [HashSet<StreamProtocol>; 2] {
5✔
835
        let mut buffer = Vec::new();
5✔
836
        let mut existing = existing
5✔
837
            .iter()
5✔
838
            .map(|p| (AsStrHashEq(p.as_ref()), true))
11✔
839
            .collect::<HashMap<_, _>>();
5✔
840

841
        let changes = ProtocolsChange::from_full_sets(
5✔
842
            &mut existing,
5✔
843
            new.iter().map(AsRef::as_ref),
5✔
844
            &mut buffer,
5✔
845
        );
846

847
        let mut added_changes = HashSet::new();
5✔
848
        let mut removed_changes = HashSet::new();
5✔
849

850
        for change in changes {
11✔
851
            match change {
6✔
852
                ProtocolsChange::Added(a) => {
3✔
853
                    added_changes.extend(a.cloned());
3✔
854
                }
3✔
855
                ProtocolsChange::Removed(r) => {
3✔
856
                    removed_changes.extend(r.cloned());
3✔
857
                }
3✔
858
            }
859
        }
860

861
        [removed_changes, added_changes]
5✔
862
    }
5✔
863

864
    #[test]
865
    fn test_from_full_stes_subset() {
1✔
866
        let existing = protocol_set_of("a b c");
1✔
867
        let new = protocol_set_of("a b");
1✔
868

869
        let [removed_changes, added_changes] = test_from_full_sets(existing, new);
1✔
870

871
        assert_eq!(added_changes, protocol_set_of(""));
1✔
872
        assert_eq!(removed_changes, protocol_set_of("c"));
1✔
873
    }
1✔
874

875
    #[test]
876
    fn test_from_full_sets_superset() {
1✔
877
        let existing = protocol_set_of("a b");
1✔
878
        let new = protocol_set_of("a b c");
1✔
879

880
        let [removed_changes, added_changes] = test_from_full_sets(existing, new);
1✔
881

882
        assert_eq!(added_changes, protocol_set_of("c"));
1✔
883
        assert_eq!(removed_changes, protocol_set_of(""));
1✔
884
    }
1✔
885

886
    #[test]
887
    fn test_from_full_sets_intersection() {
1✔
888
        let existing = protocol_set_of("a b c");
1✔
889
        let new = protocol_set_of("b c d");
1✔
890

891
        let [removed_changes, added_changes] = test_from_full_sets(existing, new);
1✔
892

893
        assert_eq!(added_changes, protocol_set_of("d"));
1✔
894
        assert_eq!(removed_changes, protocol_set_of("a"));
1✔
895
    }
1✔
896

897
    #[test]
898
    fn test_from_full_sets_disjoint() {
1✔
899
        let existing = protocol_set_of("a b c");
1✔
900
        let new = protocol_set_of("d e f");
1✔
901

902
        let [removed_changes, added_changes] = test_from_full_sets(existing, new);
1✔
903

904
        assert_eq!(added_changes, protocol_set_of("d e f"));
1✔
905
        assert_eq!(removed_changes, protocol_set_of("a b c"));
1✔
906
    }
1✔
907

908
    #[test]
909
    fn test_from_full_sets_empty() {
1✔
910
        let existing = protocol_set_of("");
1✔
911
        let new = protocol_set_of("");
1✔
912

913
        let [removed_changes, added_changes] = test_from_full_sets(existing, new);
1✔
914

915
        assert_eq!(added_changes, protocol_set_of(""));
1✔
916
        assert_eq!(removed_changes, protocol_set_of(""));
1✔
917
    }
1✔
918
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc