• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

butlergroup / rust-libp2p / 18610913338

18 Oct 2025 04:41AM UTC coverage: 78.379% (+2.5%) from 75.842%
18610913338

push

github

butlergroup
	modified:   .github/workflows/ci.yml

36944 of 47135 relevant lines covered (78.38%)

37728.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.01
/protocols/request-response/src/lib.rs
1
// Copyright 2020 Parity Technologies (UK) Ltd.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a
4
// copy of this software and associated documentation files (the "Software"),
5
// to deal in the Software without restriction, including without limitation
6
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7
// and/or sell copies of the Software, and to permit persons to whom the
8
// Software is furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19
// DEALINGS IN THE SOFTWARE.
20

21
//! Generic request/response protocols.
22
//!
23
//! ## General Usage
24
//!
25
//! The [`Behaviour`] struct is a [`NetworkBehaviour`] that implements a generic
26
//! request/response protocol or protocol family, whereby each request is
27
//! sent over a new substream on a connection. `Behaviour` is generic
28
//! over the actual messages being sent, which are defined in terms of a
29
//! [`Codec`]. Creating a request/response protocol thus amounts
30
//! to providing an implementation of this trait which can then be
31
//! given to [`Behaviour::with_codec`]. Further configuration options are
32
//! available via the [`Config`].
33
//!
34
//! Requests are sent using [`Behaviour::send_request`] and the
35
//! responses received as [`Message::Response`] via
36
//! [`Event::Message`].
37
//!
38
//! Responses are sent using [`Behaviour::send_response`] upon
39
//! receiving a [`Message::Request`] via
40
//! [`Event::Message`].
41
//!
42
//! ## Predefined codecs
43
//!
44
//! In case your message types implement [`serde::Serialize`] and [`serde::Deserialize`],
45
//! you can use two predefined behaviours:
46
//!
47
//! - [`cbor::Behaviour`] for CBOR-encoded messages
48
//! - [`json::Behaviour`] for JSON-encoded messages
49
//!
50
//! ## Protocol Families
51
//!
52
//! A single [`Behaviour`] instance can be used with an entire
53
//! protocol family that share the same request and response types.
54
//! For that purpose, [`Codec::Protocol`] is typically
55
//! instantiated with a sum type.
56
//!
57
//! ## Limited Protocol Support
58
//!
59
//! It is possible to only support inbound or outbound requests for
60
//! a particular protocol. This is achieved by instantiating `Behaviour`
61
//! with protocols using [`ProtocolSupport::Inbound`] or
62
//! [`ProtocolSupport::Outbound`]. Any subset of protocols of a protocol
63
//! family can be configured in this way. Such protocols will not be
64
//! advertised during inbound respectively outbound protocol negotiation
65
//! on the substreams.
66

67
#![cfg_attr(docsrs, feature(doc_cfg))]
68

69
#[cfg(feature = "cbor")]
70
pub mod cbor;
71
mod codec;
72
mod handler;
73
#[cfg(feature = "json")]
74
pub mod json;
75

76
use std::{
77
    collections::{HashMap, HashSet, VecDeque},
78
    fmt, io,
79
    sync::{atomic::AtomicU64, Arc},
80
    task::{Context, Poll},
81
    time::Duration,
82
};
83

84
pub use codec::Codec;
85
use futures::channel::oneshot;
86
use handler::Handler;
87
pub use handler::ProtocolSupport;
88
use libp2p_core::{transport::PortUse, ConnectedPoint, Endpoint, Multiaddr};
89
use libp2p_identity::PeerId;
90
use libp2p_swarm::{
91
    behaviour::{AddressChange, ConnectionClosed, DialFailure, FromSwarm},
92
    dial_opts::DialOpts,
93
    ConnectionDenied, ConnectionHandler, ConnectionId, DialError, NetworkBehaviour, NotifyHandler,
94
    PeerAddresses, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
95
};
96
use smallvec::SmallVec;
97

98
use crate::handler::OutboundMessage;
99

100
/// An inbound request or response.
101
#[derive(Debug)]
102
pub enum Message<TRequest, TResponse, TChannelResponse = TResponse> {
103
    /// A request message.
104
    Request {
105
        /// The ID of this request.
106
        request_id: InboundRequestId,
107
        /// The request message.
108
        request: TRequest,
109
        /// The channel waiting for the response.
110
        ///
111
        /// If this channel is dropped instead of being used to send a response
112
        /// via [`Behaviour::send_response`], a [`Event::InboundFailure`]
113
        /// with [`InboundFailure::ResponseOmission`] is emitted.
114
        channel: ResponseChannel<TChannelResponse>,
115
    },
116
    /// A response message.
117
    Response {
118
        /// The ID of the request that produced this response.
119
        ///
120
        /// See [`Behaviour::send_request`].
121
        request_id: OutboundRequestId,
122
        /// The response message.
123
        response: TResponse,
124
    },
125
}
126

127
/// The events emitted by a request-response [`Behaviour`].
128
#[derive(Debug)]
129
pub enum Event<TRequest, TResponse, TChannelResponse = TResponse> {
130
    /// An incoming message (request or response).
131
    Message {
132
        /// The peer who sent the message.
133
        peer: PeerId,
134
        /// The connection used.
135
        connection_id: ConnectionId,
136
        /// The incoming message.
137
        message: Message<TRequest, TResponse, TChannelResponse>,
138
    },
139
    /// An outbound request failed.
140
    OutboundFailure {
141
        /// The peer to whom the request was sent.
142
        peer: PeerId,
143
        /// The connection used.
144
        connection_id: ConnectionId,
145
        /// The (local) ID of the failed request.
146
        request_id: OutboundRequestId,
147
        /// The error that occurred.
148
        error: OutboundFailure,
149
    },
150
    /// An inbound request failed.
151
    InboundFailure {
152
        /// The peer from whom the request was received.
153
        peer: PeerId,
154
        /// The connection used.
155
        connection_id: ConnectionId,
156
        /// The ID of the failed inbound request.
157
        request_id: InboundRequestId,
158
        /// The error that occurred.
159
        error: InboundFailure,
160
    },
161
    /// A response to an inbound request has been sent.
162
    ///
163
    /// When this event is received, the response has been flushed on
164
    /// the underlying transport connection.
165
    ResponseSent {
166
        /// The peer to whom the response was sent.
167
        peer: PeerId,
168
        /// The connection used.
169
        connection_id: ConnectionId,
170
        /// The ID of the inbound request whose response was sent.
171
        request_id: InboundRequestId,
172
    },
173
}
174

175
/// Possible failures occurring in the context of sending
176
/// an outbound request and receiving the response.
177
#[derive(Debug)]
178
pub enum OutboundFailure {
179
    /// The request could not be sent because a dialing attempt failed.
180
    DialFailure,
181
    /// The request timed out before a response was received.
182
    ///
183
    /// It is not known whether the request may have been
184
    /// received (and processed) by the remote peer.
185
    Timeout,
186
    /// The connection closed before a response was received.
187
    ///
188
    /// It is not known whether the request may have been
189
    /// received (and processed) by the remote peer.
190
    ConnectionClosed,
191
    /// The remote supports none of the requested protocols.
192
    UnsupportedProtocols,
193
    /// An IO failure happened on an outbound stream.
194
    Io(io::Error),
195
}
196

197
impl fmt::Display for OutboundFailure {
198
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
199
        match self {
×
200
            OutboundFailure::DialFailure => write!(f, "Failed to dial the requested peer"),
×
201
            OutboundFailure::Timeout => write!(f, "Timeout while waiting for a response"),
×
202
            OutboundFailure::ConnectionClosed => {
203
                write!(f, "Connection was closed before a response was received")
×
204
            }
205
            OutboundFailure::UnsupportedProtocols => {
206
                write!(f, "The remote supports none of the requested protocols")
×
207
            }
208
            OutboundFailure::Io(e) => write!(f, "IO error on outbound stream: {e}"),
×
209
        }
210
    }
×
211
}
212

213
impl std::error::Error for OutboundFailure {}
214

215
/// Possible failures occurring in the context of receiving an
216
/// inbound request and sending a response.
217
#[derive(Debug)]
218
pub enum InboundFailure {
219
    /// The inbound request timed out, either while reading the
220
    /// incoming request or before a response is sent, e.g. if
221
    /// [`Behaviour::send_response`] is not called in a
222
    /// timely manner.
223
    Timeout,
224
    /// The connection closed before a response could be send.
225
    ConnectionClosed,
226
    /// The local peer supports none of the protocols requested
227
    /// by the remote.
228
    UnsupportedProtocols,
229
    /// The local peer failed to respond to an inbound request
230
    /// due to the [`ResponseChannel`] being dropped instead of
231
    /// being passed to [`Behaviour::send_response`].
232
    ResponseOmission,
233
    /// An IO failure happened on an inbound stream.
234
    Io(io::Error),
235
}
236

237
impl fmt::Display for InboundFailure {
238
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
239
        match self {
×
240
            InboundFailure::Timeout => {
241
                write!(f, "Timeout while receiving request or sending response")
×
242
            }
243
            InboundFailure::ConnectionClosed => {
244
                write!(f, "Connection was closed before a response could be sent")
×
245
            }
246
            InboundFailure::UnsupportedProtocols => write!(
×
247
                f,
×
248
                "The local peer supports none of the protocols requested by the remote"
×
249
            ),
250
            InboundFailure::ResponseOmission => write!(
×
251
                f,
×
252
                "The response channel was dropped without sending a response to the remote"
×
253
            ),
254
            InboundFailure::Io(e) => write!(f, "IO error on inbound stream: {e}"),
×
255
        }
256
    }
×
257
}
258

259
impl std::error::Error for InboundFailure {}
260

261
/// A channel for sending a response to an inbound request.
262
///
263
/// See [`Behaviour::send_response`].
264
#[derive(Debug)]
265
pub struct ResponseChannel<TResponse> {
266
    sender: oneshot::Sender<TResponse>,
267
}
268

269
impl<TResponse> ResponseChannel<TResponse> {
270
    /// Checks whether the response channel is still open, i.e.
271
    /// the `Behaviour` is still waiting for a
272
    /// a response to be sent via [`Behaviour::send_response`]
273
    /// and this response channel.
274
    ///
275
    /// If the response channel is no longer open then the inbound
276
    /// request timed out waiting for the response.
277
    pub fn is_open(&self) -> bool {
×
278
        !self.sender.is_canceled()
×
279
    }
×
280
}
281

282
/// The ID of an inbound request.
283
///
284
/// Note: [`InboundRequestId`]'s uniqueness is only guaranteed between
285
/// inbound requests of the same originating [`Behaviour`].
286
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
287
pub struct InboundRequestId(u64);
288

289
impl fmt::Display for InboundRequestId {
290
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
×
291
        write!(f, "{}", self.0)
×
292
    }
×
293
}
294

295
/// The ID of an outbound request.
296
///
297
/// Note: [`OutboundRequestId`]'s uniqueness is only guaranteed between
298
/// outbound requests of the same originating [`Behaviour`].
299
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
300
pub struct OutboundRequestId(u64);
301

302
impl fmt::Display for OutboundRequestId {
303
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
×
304
        write!(f, "{}", self.0)
×
305
    }
×
306
}
307

308
/// The configuration for a `Behaviour` protocol.
309
#[derive(Debug, Clone)]
310
pub struct Config {
311
    request_timeout: Duration,
312
    max_concurrent_streams: usize,
313
}
314

315
impl Default for Config {
316
    fn default() -> Self {
324✔
317
        Self {
324✔
318
            request_timeout: Duration::from_secs(10),
324✔
319
            max_concurrent_streams: 100,
324✔
320
        }
324✔
321
    }
324✔
322
}
323

324
impl Config {
325
    /// Sets the timeout for inbound and outbound requests.
326
    #[deprecated(note = "Use `Config::with_request_timeout` for one-liner constructions.")]
327
    pub fn set_request_timeout(&mut self, v: Duration) -> &mut Self {
×
328
        self.request_timeout = v;
×
329
        self
×
330
    }
×
331

332
    /// Sets the timeout for inbound and outbound requests.
333
    pub fn with_request_timeout(mut self, v: Duration) -> Self {
303✔
334
        self.request_timeout = v;
303✔
335
        self
303✔
336
    }
303✔
337

338
    /// Sets the upper bound for the number of concurrent inbound + outbound streams.
339
    pub fn with_max_concurrent_streams(mut self, num_streams: usize) -> Self {
3✔
340
        self.max_concurrent_streams = num_streams;
3✔
341
        self
3✔
342
    }
3✔
343
}
344

345
/// A request/response protocol for some message codec.
346
pub struct Behaviour<TCodec>
347
where
348
    TCodec: Codec + Clone + Send + 'static,
349
{
350
    /// The supported inbound protocols.
351
    inbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
352
    /// The supported outbound protocols.
353
    outbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
354
    /// The next (local) request ID.
355
    next_outbound_request_id: OutboundRequestId,
356
    /// The next (inbound) request ID.
357
    next_inbound_request_id: Arc<AtomicU64>,
358
    /// The protocol configuration.
359
    config: Config,
360
    /// The protocol codec for reading and writing requests and responses.
361
    codec: TCodec,
362
    /// Pending events to return from `poll`.
363
    pending_events:
364
        VecDeque<ToSwarm<Event<TCodec::Request, TCodec::Response>, OutboundMessage<TCodec>>>,
365
    /// The currently connected peers, their pending outbound and inbound responses and their
366
    /// known, reachable addresses, if any.
367
    connected: HashMap<PeerId, SmallVec<[Connection; 2]>>,
368
    /// Externally managed addresses via `add_address` and `remove_address`.
369
    addresses: PeerAddresses,
370
    /// Requests that have not yet been sent and are waiting for a connection
371
    /// to be established.
372
    pending_outbound_requests: HashMap<PeerId, SmallVec<[OutboundMessage<TCodec>; 10]>>,
373
}
374

375
impl<TCodec> Behaviour<TCodec>
376
where
377
    TCodec: Codec + Default + Clone + Send + 'static,
378
{
379
    /// Creates a new `Behaviour` for the given protocols and configuration, using [`Default`] to
380
    /// construct the codec.
381
    pub fn new<I>(protocols: I, cfg: Config) -> Self
27✔
382
    where
27✔
383
        I: IntoIterator<Item = (TCodec::Protocol, ProtocolSupport)>,
27✔
384
    {
385
        Self::with_codec(TCodec::default(), protocols, cfg)
27✔
386
    }
27✔
387
}
388

389
impl<TCodec> Behaviour<TCodec>
390
where
391
    TCodec: Codec + Clone + Send + 'static,
392
{
393
    /// Creates a new `Behaviour` for the given
394
    /// protocols, codec and configuration.
395
    pub fn with_codec<I>(codec: TCodec, protocols: I, cfg: Config) -> Self
230✔
396
    where
230✔
397
        I: IntoIterator<Item = (TCodec::Protocol, ProtocolSupport)>,
230✔
398
    {
399
        let mut inbound_protocols = SmallVec::new();
230✔
400
        let mut outbound_protocols = SmallVec::new();
230✔
401
        for (p, s) in protocols {
460✔
402
            if s.inbound() {
230✔
403
                inbound_protocols.push(p.clone());
230✔
404
            }
230✔
405
            if s.outbound() {
230✔
406
                outbound_protocols.push(p.clone());
230✔
407
            }
230✔
408
        }
409
        Behaviour {
230✔
410
            inbound_protocols,
230✔
411
            outbound_protocols,
230✔
412
            next_outbound_request_id: OutboundRequestId(1),
230✔
413
            next_inbound_request_id: Arc::new(AtomicU64::new(1)),
230✔
414
            config: cfg,
230✔
415
            codec,
230✔
416
            pending_events: VecDeque::new(),
230✔
417
            connected: HashMap::new(),
230✔
418
            pending_outbound_requests: HashMap::new(),
230✔
419
            addresses: PeerAddresses::default(),
230✔
420
        }
230✔
421
    }
230✔
422

423
    /// Initiates sending a request.
424
    ///
425
    /// If the targeted peer is currently not connected, a dialing
426
    /// attempt is initiated and the request is sent as soon as a
427
    /// connection is established.
428
    ///
429
    /// > **Note**: In order for such a dialing attempt to succeed,
430
    /// > the `RequestResponse` protocol must either be embedded
431
    /// > in another `NetworkBehaviour` that provides peer and
432
    /// > address discovery, or known addresses of peers must be
433
    /// > managed via [`libp2p_swarm::Swarm::add_peer_address`].
434
    /// > Addresses are automatically removed when dial attempts
435
    /// > to them fail.
436
    /// > Alternatively, [`Behaviour::send_request_with_addresses`]
437
    /// > can be used.
438
    pub fn send_request(&mut self, peer: &PeerId, request: TCodec::Request) -> OutboundRequestId {
329✔
439
        self.send_request_with_addresses(peer, request, Vec::new())
329✔
440
    }
329✔
441

442
    /// Like [`Behaviour::send_request`], but additionally using the provided addresses
443
    /// if a connection needs to be established.
444
    pub fn send_request_with_addresses(
330✔
445
        &mut self,
330✔
446
        peer: &PeerId,
330✔
447
        request: TCodec::Request,
330✔
448
        addresses: Vec<Multiaddr>,
330✔
449
    ) -> OutboundRequestId {
330✔
450
        let request_id = self.next_outbound_request_id();
330✔
451
        let request = OutboundMessage {
330✔
452
            request_id,
330✔
453
            request,
330✔
454
            protocols: self.outbound_protocols.clone(),
330✔
455
        };
330✔
456

457
        if let Some(request) = self.try_send_request(peer, request) {
330✔
458
            self.pending_events.push_back(ToSwarm::Dial {
202✔
459
                opts: DialOpts::peer_id(*peer)
202✔
460
                    .addresses(addresses)
202✔
461
                    .extend_addresses_through_behaviour()
202✔
462
                    .build(),
202✔
463
            });
202✔
464
            self.pending_outbound_requests
202✔
465
                .entry(*peer)
202✔
466
                .or_default()
202✔
467
                .push(request);
202✔
468
        }
220✔
469

470
        request_id
330✔
471
    }
330✔
472

473
    /// Initiates sending a response to an inbound request.
474
    ///
475
    /// If the [`ResponseChannel`] is already closed due to a timeout or the
476
    /// connection being closed, the response is returned as an `Err` for
477
    /// further handling. Once the response has been successfully sent on the
478
    /// corresponding connection, [`Event::ResponseSent`] is
479
    /// emitted. In all other cases [`Event::InboundFailure`]
480
    /// will be or has been emitted.
481
    ///
482
    /// The provided `ResponseChannel` is obtained from an inbound
483
    /// [`Message::Request`].
484
    pub fn send_response(
271✔
485
        &mut self,
271✔
486
        ch: ResponseChannel<TCodec::Response>,
271✔
487
        rs: TCodec::Response,
271✔
488
    ) -> Result<(), TCodec::Response> {
271✔
489
        ch.sender.send(rs)
271✔
490
    }
271✔
491

492
    /// Adds a known address for a peer that can be used for
493
    /// dialing attempts by the `Swarm`, i.e. is returned
494
    /// by [`NetworkBehaviour::handle_pending_outbound_connection`].
495
    ///
496
    /// Addresses added in this way are only removed by `remove_address`.
497
    ///
498
    /// Returns true if the address was added, false otherwise (i.e. if the
499
    /// address is already in the list).
500
    #[deprecated(note = "Use `Swarm::add_peer_address` instead.")]
501
    pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) -> bool {
105✔
502
        self.addresses.add(*peer, address)
105✔
503
    }
105✔
504

505
    /// Removes an address of a peer previously added via [`Behaviour::add_address`].
506
    #[deprecated(note = "Will be removed with the next breaking release and won't be replaced.")]
507
    pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) {
×
508
        self.addresses.remove(peer, address);
×
509
    }
×
510

511
    /// Checks whether a peer is currently connected.
512
    pub fn is_connected(&self, peer: &PeerId) -> bool {
×
513
        if let Some(connections) = self.connected.get(peer) {
×
514
            !connections.is_empty()
×
515
        } else {
516
            false
×
517
        }
518
    }
×
519

520
    /// Checks whether an outbound request to the peer with the provided
521
    /// [`PeerId`] initiated by [`Behaviour::send_request`] is still
522
    /// pending, i.e. waiting for a response.
523
    pub fn is_pending_outbound(&self, peer: &PeerId, request_id: &OutboundRequestId) -> bool {
76✔
524
        // Check if request is already sent on established connection.
525
        let est_conn = self
76✔
526
            .connected
76✔
527
            .get(peer)
76✔
528
            .map(|cs| {
76✔
529
                cs.iter()
1✔
530
                    .any(|c| c.pending_outbound_responses.contains(request_id))
1✔
531
            })
1✔
532
            .unwrap_or(false);
76✔
533
        // Check if request is still pending to be sent.
534
        let pen_conn = self
76✔
535
            .pending_outbound_requests
76✔
536
            .get(peer)
76✔
537
            .map(|rps| rps.iter().any(|rp| rp.request_id == *request_id))
2,560✔
538
            .unwrap_or(false);
76✔
539

540
        est_conn || pen_conn
76✔
541
    }
76✔
542

543
    /// Checks whether an inbound request from the peer with the provided
544
    /// [`PeerId`] is still pending, i.e. waiting for a response by the local
545
    /// node through [`Behaviour::send_response`].
546
    pub fn is_pending_inbound(&self, peer: &PeerId, request_id: &InboundRequestId) -> bool {
×
547
        self.connected
×
548
            .get(peer)
×
549
            .map(|cs| {
×
550
                cs.iter()
×
551
                    .any(|c| c.pending_inbound_responses.contains(request_id))
×
552
            })
×
553
            .unwrap_or(false)
×
554
    }
×
555

556
    /// Returns the next outbound request ID.
557
    fn next_outbound_request_id(&mut self) -> OutboundRequestId {
330✔
558
        let request_id = self.next_outbound_request_id;
330✔
559
        self.next_outbound_request_id.0 += 1;
330✔
560
        request_id
330✔
561
    }
330✔
562

563
    /// Tries to send a request by queueing an appropriate event to be
564
    /// emitted to the `Swarm`. If the peer is not currently connected,
565
    /// the given request is return unchanged.
566
    fn try_send_request(
330✔
567
        &mut self,
330✔
568
        peer: &PeerId,
330✔
569
        request: OutboundMessage<TCodec>,
330✔
570
    ) -> Option<OutboundMessage<TCodec>> {
330✔
571
        if let Some(connections) = self.connected.get_mut(peer) {
330✔
572
            if connections.is_empty() {
128✔
573
                return Some(request);
×
574
            }
128✔
575
            let ix = (request.request_id.0 as usize) % connections.len();
128✔
576
            let conn = &mut connections[ix];
128✔
577
            conn.pending_outbound_responses.insert(request.request_id);
128✔
578
            self.pending_events.push_back(ToSwarm::NotifyHandler {
128✔
579
                peer_id: *peer,
128✔
580
                handler: NotifyHandler::One(conn.id),
128✔
581
                event: request,
128✔
582
            });
128✔
583
            None
128✔
584
        } else {
585
            Some(request)
202✔
586
        }
587
    }
330✔
588

589
    /// Remove pending outbound response for the given peer and connection.
590
    ///
591
    /// Returns `true` if the provided connection to the given peer is still
592
    /// alive and the [`OutboundRequestId`] was previously present and is now removed.
593
    /// Returns `false` otherwise.
594
    fn remove_pending_outbound_response(
231✔
595
        &mut self,
231✔
596
        peer: &PeerId,
231✔
597
        connection_id: ConnectionId,
231✔
598
        request: OutboundRequestId,
231✔
599
    ) -> bool {
231✔
600
        self.get_connection_mut(peer, connection_id)
231✔
601
            .map(|c| c.pending_outbound_responses.remove(&request))
231✔
602
            .unwrap_or(false)
231✔
603
    }
231✔
604

605
    /// Remove pending inbound response for the given peer and connection.
606
    ///
607
    /// Returns `true` if the provided connection to the given peer is still
608
    /// alive and the [`InboundRequestId`] was previously present and is now removed.
609
    /// Returns `false` otherwise.
610
    fn remove_pending_inbound_response(
232✔
611
        &mut self,
232✔
612
        peer: &PeerId,
232✔
613
        connection_id: ConnectionId,
232✔
614
        request: InboundRequestId,
232✔
615
    ) -> bool {
232✔
616
        self.get_connection_mut(peer, connection_id)
232✔
617
            .map(|c| c.pending_inbound_responses.remove(&request))
232✔
618
            .unwrap_or(false)
232✔
619
    }
232✔
620

621
    /// Returns a mutable reference to the connection in `self.connected`
622
    /// corresponding to the given [`PeerId`] and [`ConnectionId`].
623
    fn get_connection_mut(
744✔
624
        &mut self,
744✔
625
        peer: &PeerId,
744✔
626
        connection_id: ConnectionId,
744✔
627
    ) -> Option<&mut Connection> {
744✔
628
        self.connected
744✔
629
            .get_mut(peer)
744✔
630
            .and_then(|connections| connections.iter_mut().find(|c| c.id == connection_id))
744✔
631
    }
744✔
632

633
    fn on_address_change(
×
634
        &mut self,
×
635
        AddressChange {
×
636
            peer_id,
×
637
            connection_id,
×
638
            new,
×
639
            ..
×
640
        }: AddressChange,
×
641
    ) {
×
642
        let new_address = match new {
×
643
            ConnectedPoint::Dialer { address, .. } => Some(address.clone()),
×
644
            ConnectedPoint::Listener { .. } => None,
×
645
        };
646
        let connections = self
×
647
            .connected
×
648
            .get_mut(&peer_id)
×
649
            .expect("Address change can only happen on an established connection.");
×
650

651
        let connection = connections
×
652
            .iter_mut()
×
653
            .find(|c| c.id == connection_id)
×
654
            .expect("Address change can only happen on an established connection.");
×
655
        connection.remote_address = new_address;
×
656
    }
×
657

658
    fn on_connection_closed(
15✔
659
        &mut self,
15✔
660
        ConnectionClosed {
15✔
661
            peer_id,
15✔
662
            connection_id,
15✔
663
            remaining_established,
15✔
664
            ..
15✔
665
        }: ConnectionClosed,
15✔
666
    ) {
15✔
667
        let connections = self
15✔
668
            .connected
15✔
669
            .get_mut(&peer_id)
15✔
670
            .expect("Expected some established connection to peer before closing.");
15✔
671

672
        let connection = connections
15✔
673
            .iter()
15✔
674
            .position(|c| c.id == connection_id)
22✔
675
            .map(|p: usize| connections.remove(p))
15✔
676
            .expect("Expected connection to be established before closing.");
15✔
677

678
        debug_assert_eq!(connections.is_empty(), remaining_established == 0);
15✔
679
        if connections.is_empty() {
15✔
680
            self.connected.remove(&peer_id);
8✔
681
        }
8✔
682

683
        for request_id in connection.pending_inbound_responses {
16✔
684
            self.pending_events
1✔
685
                .push_back(ToSwarm::GenerateEvent(Event::InboundFailure {
1✔
686
                    peer: peer_id,
1✔
687
                    connection_id,
1✔
688
                    request_id,
1✔
689
                    error: InboundFailure::ConnectionClosed,
1✔
690
                }));
1✔
691
        }
1✔
692

693
        for request_id in connection.pending_outbound_responses {
15✔
694
            self.pending_events
×
695
                .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
×
696
                    peer: peer_id,
×
697
                    connection_id,
×
698
                    request_id,
×
699
                    error: OutboundFailure::ConnectionClosed,
×
700
                }));
×
701
        }
×
702
    }
15✔
703

704
    fn on_dial_failure(
134✔
705
        &mut self,
134✔
706
        DialFailure {
134✔
707
            peer_id,
134✔
708
            connection_id,
134✔
709
            error,
134✔
710
        }: DialFailure,
134✔
711
    ) {
134✔
712
        if let DialError::DialPeerConditionFalse(_) = error {
134✔
713
            // Dial-condition fails because there is already another ongoing dial.
714
            return;
70✔
715
        }
64✔
716
        if let Some(peer) = peer_id {
64✔
717
            // If there are pending outgoing requests when a dial failure occurs,
718
            // it is implied that we are not connected to the peer, since pending
719
            // outgoing requests are drained when a connection is established and
720
            // only created when a peer is not connected when a request is made.
721
            // Thus these requests must be considered failed, even if there is
722
            // another, concurrent dialing attempt ongoing.
723
            if let Some(pending) = self.pending_outbound_requests.remove(&peer) {
64✔
724
                for request in pending {
90✔
725
                    self.pending_events
45✔
726
                        .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
45✔
727
                            peer,
45✔
728
                            connection_id,
45✔
729
                            request_id: request.request_id,
45✔
730
                            error: OutboundFailure::DialFailure,
45✔
731
                        }));
45✔
732
                }
45✔
733
            }
19✔
734
        }
×
735
    }
134✔
736

737
    /// Preloads a new [`Handler`] with requests that are
738
    /// waiting to be sent to the newly connected peer.
739
    fn preload_new_handler(
366✔
740
        &mut self,
366✔
741
        handler: &mut Handler<TCodec>,
366✔
742
        peer: PeerId,
366✔
743
        connection_id: ConnectionId,
366✔
744
        remote_address: Option<Multiaddr>,
366✔
745
    ) {
366✔
746
        let mut connection = Connection::new(connection_id, remote_address);
366✔
747

748
        if let Some(pending_requests) = self.pending_outbound_requests.remove(&peer) {
366✔
749
            for request in pending_requests {
242✔
750
                connection
156✔
751
                    .pending_outbound_responses
156✔
752
                    .insert(request.request_id);
156✔
753
                handler.on_behaviour_event(request);
156✔
754
            }
156✔
755
        }
280✔
756

757
        self.connected.entry(peer).or_default().push(connection);
366✔
758
    }
366✔
759
}
760

761
impl<TCodec> NetworkBehaviour for Behaviour<TCodec>
762
where
763
    TCodec: Codec + Send + Clone + 'static,
764
{
765
    type ConnectionHandler = Handler<TCodec>;
766
    type ToSwarm = Event<TCodec::Request, TCodec::Response>;
767

768
    fn handle_established_inbound_connection(
183✔
769
        &mut self,
183✔
770
        connection_id: ConnectionId,
183✔
771
        peer: PeerId,
183✔
772
        _: &Multiaddr,
183✔
773
        _: &Multiaddr,
183✔
774
    ) -> Result<THandler<Self>, ConnectionDenied> {
183✔
775
        let mut handler = Handler::new(
183✔
776
            self.inbound_protocols.clone(),
183✔
777
            self.codec.clone(),
183✔
778
            self.config.request_timeout,
183✔
779
            self.next_inbound_request_id.clone(),
183✔
780
            self.config.max_concurrent_streams,
183✔
781
        );
782

783
        self.preload_new_handler(&mut handler, peer, connection_id, None);
183✔
784

785
        Ok(handler)
183✔
786
    }
183✔
787

788
    fn handle_pending_outbound_connection(
254✔
789
        &mut self,
254✔
790
        _connection_id: ConnectionId,
254✔
791
        maybe_peer: Option<PeerId>,
254✔
792
        _addresses: &[Multiaddr],
254✔
793
        _effective_role: Endpoint,
254✔
794
    ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
254✔
795
        let Some(peer) = maybe_peer else {
254✔
796
            return Ok(vec![]);
14✔
797
        };
798

799
        let mut addresses = Vec::new();
240✔
800
        if let Some(connections) = self.connected.get(&peer) {
240✔
801
            addresses.extend(connections.iter().filter_map(|c| c.remote_address.clone()))
122✔
802
        }
142✔
803

804
        let cached_addrs = self.addresses.get(&peer);
240✔
805
        addresses.extend(cached_addrs);
240✔
806

807
        Ok(addresses)
240✔
808
    }
254✔
809

810
    fn handle_established_outbound_connection(
183✔
811
        &mut self,
183✔
812
        connection_id: ConnectionId,
183✔
813
        peer: PeerId,
183✔
814
        remote_address: &Multiaddr,
183✔
815
        _: Endpoint,
183✔
816
        _: PortUse,
183✔
817
    ) -> Result<THandler<Self>, ConnectionDenied> {
183✔
818
        let mut handler = Handler::new(
183✔
819
            self.inbound_protocols.clone(),
183✔
820
            self.codec.clone(),
183✔
821
            self.config.request_timeout,
183✔
822
            self.next_inbound_request_id.clone(),
183✔
823
            self.config.max_concurrent_streams,
183✔
824
        );
825

826
        self.preload_new_handler(
183✔
827
            &mut handler,
183✔
828
            peer,
183✔
829
            connection_id,
183✔
830
            Some(remote_address.clone()),
183✔
831
        );
832

833
        Ok(handler)
183✔
834
    }
183✔
835

836
    fn on_swarm_event(&mut self, event: FromSwarm) {
1,391✔
837
        self.addresses.on_swarm_event(&event);
1,391✔
838
        match event {
1,391✔
839
            FromSwarm::ConnectionEstablished(_) => {}
366✔
840
            FromSwarm::ConnectionClosed(connection_closed) => {
15✔
841
                self.on_connection_closed(connection_closed)
15✔
842
            }
843
            FromSwarm::AddressChange(address_change) => self.on_address_change(address_change),
×
844
            FromSwarm::DialFailure(dial_failure) => self.on_dial_failure(dial_failure),
134✔
845
            _ => {}
876✔
846
        }
847
    }
1,391✔
848

849
    fn on_connection_handler_event(
744✔
850
        &mut self,
744✔
851
        peer: PeerId,
744✔
852
        connection_id: ConnectionId,
744✔
853
        event: THandlerOutEvent<Self>,
744✔
854
    ) {
744✔
855
        match event {
744✔
856
            handler::Event::Response {
857
                request_id,
225✔
858
                response,
225✔
859
            } => {
860
                let removed =
225✔
861
                    self.remove_pending_outbound_response(&peer, connection_id, request_id);
225✔
862
                debug_assert!(
225✔
863
                    removed,
225✔
864
                    "Expect request_id to be pending before receiving response.",
×
865
                );
866

867
                let message = Message::Response {
225✔
868
                    request_id,
225✔
869
                    response,
225✔
870
                };
225✔
871
                self.pending_events
225✔
872
                    .push_back(ToSwarm::GenerateEvent(Event::Message {
225✔
873
                        peer,
225✔
874
                        connection_id,
225✔
875
                        message,
225✔
876
                    }));
225✔
877
            }
878
            handler::Event::Request {
879
                request_id,
281✔
880
                request,
281✔
881
                sender,
281✔
882
            } => match self.get_connection_mut(&peer, connection_id) {
281✔
883
                Some(connection) => {
281✔
884
                    let inserted = connection.pending_inbound_responses.insert(request_id);
281✔
885
                    debug_assert!(inserted, "Expect id of new request to be unknown.");
281✔
886

887
                    let channel = ResponseChannel { sender };
281✔
888
                    let message = Message::Request {
281✔
889
                        request_id,
281✔
890
                        request,
281✔
891
                        channel,
281✔
892
                    };
281✔
893
                    self.pending_events
281✔
894
                        .push_back(ToSwarm::GenerateEvent(Event::Message {
281✔
895
                            peer,
281✔
896
                            connection_id,
281✔
897
                            message,
281✔
898
                        }));
281✔
899
                }
900
                None => {
901
                    tracing::debug!("Connection ({connection_id}) closed after `Event::Request` ({request_id}) has been emitted.");
×
902
                }
903
            },
904
            handler::Event::ResponseSent(request_id) => {
227✔
905
                let removed =
227✔
906
                    self.remove_pending_inbound_response(&peer, connection_id, request_id);
227✔
907
                debug_assert!(
227✔
908
                    removed,
227✔
909
                    "Expect request_id to be pending before response is sent."
×
910
                );
911

912
                self.pending_events
227✔
913
                    .push_back(ToSwarm::GenerateEvent(Event::ResponseSent {
227✔
914
                        peer,
227✔
915
                        connection_id,
227✔
916
                        request_id,
227✔
917
                    }));
227✔
918
            }
919
            handler::Event::ResponseOmission(request_id) => {
×
920
                let removed =
×
921
                    self.remove_pending_inbound_response(&peer, connection_id, request_id);
×
922
                debug_assert!(
×
923
                    removed,
×
924
                    "Expect request_id to be pending before response is omitted.",
×
925
                );
926

927
                self.pending_events
×
928
                    .push_back(ToSwarm::GenerateEvent(Event::InboundFailure {
×
929
                        peer,
×
930
                        connection_id,
×
931
                        request_id,
×
932
                        error: InboundFailure::ResponseOmission,
×
933
                    }));
×
934
            }
935
            handler::Event::OutboundTimeout(request_id) => {
1✔
936
                let removed =
1✔
937
                    self.remove_pending_outbound_response(&peer, connection_id, request_id);
1✔
938
                debug_assert!(
1✔
939
                    removed,
1✔
940
                    "Expect request_id to be pending before request times out."
×
941
                );
942

943
                self.pending_events
1✔
944
                    .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
1✔
945
                        peer,
1✔
946
                        connection_id,
1✔
947
                        request_id,
1✔
948
                        error: OutboundFailure::Timeout,
1✔
949
                    }));
1✔
950
            }
951
            handler::Event::OutboundUnsupportedProtocols(request_id) => {
×
952
                let removed =
×
953
                    self.remove_pending_outbound_response(&peer, connection_id, request_id);
×
954
                debug_assert!(
×
955
                    removed,
×
956
                    "Expect request_id to be pending before failing to connect.",
×
957
                );
958

959
                self.pending_events
×
960
                    .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
×
961
                        peer,
×
962
                        connection_id,
×
963
                        request_id,
×
964
                        error: OutboundFailure::UnsupportedProtocols,
×
965
                    }));
×
966
            }
967
            handler::Event::OutboundStreamFailed { request_id, error } => {
5✔
968
                let removed =
5✔
969
                    self.remove_pending_outbound_response(&peer, connection_id, request_id);
5✔
970
                debug_assert!(removed, "Expect request_id to be pending upon failure");
5✔
971

972
                self.pending_events
5✔
973
                    .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
5✔
974
                        peer,
5✔
975
                        connection_id,
5✔
976
                        request_id,
5✔
977
                        error: OutboundFailure::Io(error),
5✔
978
                    }))
5✔
979
            }
980
            handler::Event::InboundTimeout(request_id) => {
1✔
981
                let removed =
1✔
982
                    self.remove_pending_inbound_response(&peer, connection_id, request_id);
1✔
983

984
                if removed {
1✔
985
                    self.pending_events
1✔
986
                        .push_back(ToSwarm::GenerateEvent(Event::InboundFailure {
1✔
987
                            peer,
1✔
988
                            connection_id,
1✔
989
                            request_id,
1✔
990
                            error: InboundFailure::Timeout,
1✔
991
                        }));
1✔
992
                } else {
1✔
993
                    // This happens when timeout is emitted before `read_request` finishes.
994
                    tracing::debug!(
×
995
                        "Inbound request timeout for an unknown request_id ({request_id})"
×
996
                    );
997
                }
998
            }
999
            handler::Event::InboundStreamFailed { request_id, error } => {
4✔
1000
                let removed =
4✔
1001
                    self.remove_pending_inbound_response(&peer, connection_id, request_id);
4✔
1002

1003
                if removed {
4✔
1004
                    self.pending_events
1✔
1005
                        .push_back(ToSwarm::GenerateEvent(Event::InboundFailure {
1✔
1006
                            peer,
1✔
1007
                            connection_id,
1✔
1008
                            request_id,
1✔
1009
                            error: InboundFailure::Io(error),
1✔
1010
                        }));
1✔
1011
                } else {
1✔
1012
                    // This happens when `read_request` fails.
1013
                    tracing::debug!("Inbound failure is reported for an unknown request_id ({request_id}): {error}");
3✔
1014
                }
1015
            }
1016
        }
1017
    }
744✔
1018

1019
    #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self))]
1020
    fn poll(&mut self, _: &mut Context<'_>) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
5,287✔
1021
        if let Some(ev) = self.pending_events.pop_front() {
5,287✔
1022
            return Poll::Ready(ev);
1,116✔
1023
        } else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD {
4,171✔
1024
            self.pending_events.shrink_to_fit();
1✔
1025
        }
4,170✔
1026

1027
        Poll::Pending
4,171✔
1028
    }
5,287✔
1029
}
1030

1031
/// Internal threshold for when to shrink the capacity
1032
/// of empty queues. If the capacity of an empty queue
1033
/// exceeds this threshold, the associated memory is
1034
/// released.
1035
const EMPTY_QUEUE_SHRINK_THRESHOLD: usize = 100;
1036

1037
/// Internal information tracked for an established connection.
1038
struct Connection {
1039
    id: ConnectionId,
1040
    remote_address: Option<Multiaddr>,
1041
    /// Pending outbound responses where corresponding inbound requests have
1042
    /// been received on this connection and emitted via `poll` but have not yet
1043
    /// been answered.
1044
    pending_outbound_responses: HashSet<OutboundRequestId>,
1045
    /// Pending inbound responses for previously sent requests on this
1046
    /// connection.
1047
    pending_inbound_responses: HashSet<InboundRequestId>,
1048
}
1049

1050
impl Connection {
1051
    fn new(id: ConnectionId, remote_address: Option<Multiaddr>) -> Self {
514✔
1052
        Self {
514✔
1053
            id,
514✔
1054
            remote_address,
514✔
1055
            pending_outbound_responses: Default::default(),
514✔
1056
            pending_inbound_responses: Default::default(),
514✔
1057
        }
514✔
1058
    }
514✔
1059
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc