• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

butlergroup / rust-libp2p / 18610913338

18 Oct 2025 04:41AM UTC coverage: 78.379% (+2.5%) from 75.842%
18610913338

push

github

butlergroup
	modified:   .github/workflows/ci.yml

36944 of 47135 relevant lines covered (78.38%)

37728.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.13
/protocols/ping/src/handler.rs
1
// Copyright 2019 Parity Technologies (UK) Ltd.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a
4
// copy of this software and associated documentation files (the "Software"),
5
// to deal in the Software without restriction, including without limitation
6
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7
// and/or sell copies of the Software, and to permit persons to whom the
8
// Software is furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19
// DEALINGS IN THE SOFTWARE.
20

21
use std::{
22
    collections::VecDeque,
23
    convert::Infallible,
24
    error::Error,
25
    fmt, io,
26
    task::{Context, Poll},
27
    time::Duration,
28
};
29

30
use futures::{
31
    future::{BoxFuture, Either},
32
    prelude::*,
33
};
34
use futures_timer::Delay;
35
use libp2p_core::upgrade::ReadyUpgrade;
36
use libp2p_swarm::{
37
    handler::{ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound},
38
    ConnectionHandler, ConnectionHandlerEvent, Stream, StreamProtocol, StreamUpgradeError,
39
    SubstreamProtocol,
40
};
41

42
use crate::{protocol, PROTOCOL_NAME};
43

44
/// The configuration for outbound pings.
45
#[derive(Debug, Clone)]
46
pub struct Config {
47
    /// The timeout of an outbound ping.
48
    timeout: Duration,
49
    /// The duration between outbound pings.
50
    interval: Duration,
51
}
52

53
impl Config {
54
    /// Creates a new [`Config`] with the following default settings:
55
    ///
56
    ///   * [`Config::with_interval`] 15s
57
    ///   * [`Config::with_timeout`] 20s
58
    ///
59
    /// These settings have the following effect:
60
    ///
61
    ///   * A ping is sent every 15 seconds on a healthy connection.
62
    ///   * Every ping sent must yield a response within 20 seconds in order to be successful.
63
    pub fn new() -> Self {
25✔
64
        Self {
25✔
65
            timeout: Duration::from_secs(20),
25✔
66
            interval: Duration::from_secs(15),
25✔
67
        }
25✔
68
    }
25✔
69

70
    /// Sets the ping timeout.
71
    pub fn with_timeout(mut self, d: Duration) -> Self {
×
72
        self.timeout = d;
×
73
        self
×
74
    }
×
75

76
    /// Sets the ping interval.
77
    pub fn with_interval(mut self, d: Duration) -> Self {
10✔
78
        self.interval = d;
10✔
79
        self
10✔
80
    }
10✔
81
}
82

83
impl Default for Config {
84
    fn default() -> Self {
×
85
        Self::new()
×
86
    }
×
87
}
88

89
/// An outbound ping failure.
90
#[derive(Debug)]
91
pub enum Failure {
92
    /// The ping timed out, i.e. no response was received within the
93
    /// configured ping timeout.
94
    Timeout,
95
    /// The peer does not support the ping protocol.
96
    Unsupported,
97
    /// The ping failed for reasons other than a timeout.
98
    Other {
99
        error: Box<dyn std::error::Error + Send + Sync + 'static>,
100
    },
101
}
102

103
impl Failure {
104
    fn other(e: impl std::error::Error + Send + Sync + 'static) -> Self {
×
105
        Self::Other { error: Box::new(e) }
×
106
    }
×
107
}
108

109
impl fmt::Display for Failure {
110
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
111
        match self {
×
112
            Failure::Timeout => f.write_str("Ping timeout"),
×
113
            Failure::Other { error } => write!(f, "Ping error: {error}"),
×
114
            Failure::Unsupported => write!(f, "Ping protocol not supported"),
×
115
        }
116
    }
×
117
}
118

119
impl Error for Failure {
120
    fn source(&self) -> Option<&(dyn Error + 'static)> {
×
121
        match self {
×
122
            Failure::Timeout => None,
×
123
            Failure::Other { error } => Some(&**error),
×
124
            Failure::Unsupported => None,
×
125
        }
126
    }
×
127
}
128

129
/// Protocol handler that handles pinging the remote at a regular period
130
/// and answering ping queries.
131
pub struct Handler {
132
    /// Configuration options.
133
    config: Config,
134
    /// The timer used for the delay to the next ping.
135
    interval: Delay,
136
    /// Outbound ping failures that are pending to be processed by `poll()`.
137
    pending_errors: VecDeque<Failure>,
138
    /// The number of consecutive ping failures that occurred.
139
    ///
140
    /// Each successful ping resets this counter to 0.
141
    failures: u32,
142
    /// The outbound ping state.
143
    outbound: Option<OutboundState>,
144
    /// The inbound pong handler, i.e. if there is an inbound
145
    /// substream, this is always a future that waits for the
146
    /// next inbound ping to be answered.
147
    inbound: Option<PongFuture>,
148
    /// Tracks the state of our handler.
149
    state: State,
150
}
151

152
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
153
enum State {
154
    /// We are inactive because the other peer doesn't support ping.
155
    Inactive {
156
        /// Whether or not we've reported the missing support yet.
157
        ///
158
        /// This is used to avoid repeated events being emitted for a specific connection.
159
        reported: bool,
160
    },
161
    /// We are actively pinging the other peer.
162
    Active,
163
}
164

165
impl Handler {
166
    /// Builds a new [`Handler`] with the given configuration.
167
    pub fn new(config: Config) -> Self {
17✔
168
        Handler {
17✔
169
            config,
17✔
170
            interval: Delay::new(Duration::new(0, 0)),
17✔
171
            pending_errors: VecDeque::with_capacity(2),
17✔
172
            failures: 0,
17✔
173
            outbound: None,
17✔
174
            inbound: None,
17✔
175
            state: State::Active,
17✔
176
        }
17✔
177
    }
17✔
178

179
    fn on_dial_upgrade_error(
1✔
180
        &mut self,
1✔
181
        DialUpgradeError { error, .. }: DialUpgradeError<
1✔
182
            (),
1✔
183
            <Self as ConnectionHandler>::OutboundProtocol,
1✔
184
        >,
1✔
185
    ) {
1✔
186
        self.outbound = None; // Request a new substream on the next `poll`.
1✔
187

188
        // Timer is already polled and expired before substream request is initiated
189
        // and will be polled again later on in our `poll` because we reset `self.outbound`.
190
        //
191
        // `futures-timer` allows an expired timer to be polled again and returns
192
        // immediately `Poll::Ready`. However in its WASM implementation there is
193
        // a bug that causes the expired timer to panic.
194
        // This is a workaround until a proper fix is merged and released.
195
        // See libp2p/rust-libp2p#5447 for more info.
196
        //
197
        // TODO: remove when async-rs/futures-timer#74 gets merged.
198
        self.interval.reset(Duration::new(0, 0));
1✔
199

200
        let error = match error {
1✔
201
            StreamUpgradeError::NegotiationFailed => {
202
                debug_assert_eq!(self.state, State::Active);
1✔
203

204
                self.state = State::Inactive { reported: false };
1✔
205
                return;
1✔
206
            }
207
            // Note: This timeout only covers protocol negotiation.
208
            StreamUpgradeError::Timeout => Failure::Other {
×
209
                error: Box::new(std::io::Error::new(
×
210
                    std::io::ErrorKind::TimedOut,
×
211
                    "ping protocol negotiation timed out",
×
212
                )),
×
213
            },
×
214
            StreamUpgradeError::Apply(e) => libp2p_core::util::unreachable(e),
×
215
            StreamUpgradeError::Io(e) => Failure::Other { error: Box::new(e) },
×
216
        };
217

218
        self.pending_errors.push_front(error);
×
219
    }
1✔
220
}
221

222
impl ConnectionHandler for Handler {
223
    type FromBehaviour = Infallible;
224
    type ToBehaviour = Result<Duration, Failure>;
225
    type InboundProtocol = ReadyUpgrade<StreamProtocol>;
226
    type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
227
    type OutboundOpenInfo = ();
228
    type InboundOpenInfo = ();
229

230
    fn listen_protocol(&self) -> SubstreamProtocol<ReadyUpgrade<StreamProtocol>> {
268✔
231
        SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ())
268✔
232
    }
268✔
233

234
    fn on_behaviour_event(&mut self, _: Infallible) {}
×
235

236
    #[tracing::instrument(level = "trace", name = "ConnectionHandler::poll", skip(self, cx))]
237
    fn poll(
328✔
238
        &mut self,
328✔
239
        cx: &mut Context<'_>,
328✔
240
    ) -> Poll<ConnectionHandlerEvent<ReadyUpgrade<StreamProtocol>, (), Result<Duration, Failure>>>
328✔
241
    {
242
        match self.state {
328✔
243
            State::Inactive { reported: true } => {
244
                return Poll::Pending; // nothing to do on this connection
2✔
245
            }
246
            State::Inactive { reported: false } => {
247
                self.state = State::Inactive { reported: true };
1✔
248
                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Err(
1✔
249
                    Failure::Unsupported,
1✔
250
                )));
1✔
251
            }
252
            State::Active => {}
325✔
253
        }
254

255
        // Respond to inbound pings.
256
        if let Some(fut) = self.inbound.as_mut() {
325✔
257
            match fut.poll_unpin(cx) {
192✔
258
                Poll::Pending => {}
180✔
259
                Poll::Ready(Err(e)) => {
×
260
                    tracing::debug!("Inbound ping error: {:?}", e);
×
261
                    self.inbound = None;
×
262
                }
263
                Poll::Ready(Ok(stream)) => {
12✔
264
                    tracing::trace!("answered inbound ping from peer");
12✔
265

266
                    // A ping from a remote peer has been answered, wait for the next.
267
                    self.inbound = Some(protocol::recv_ping(stream).boxed());
12✔
268
                }
269
            }
270
        }
133✔
271

272
        loop {
273
            // Check for outbound ping failures.
274
            if let Some(error) = self.pending_errors.pop_back() {
325✔
275
                tracing::debug!("Ping failure: {:?}", error);
×
276

277
                self.failures += 1;
×
278

279
                // Note: For backward-compatibility the first failure is always "free"
280
                // and silent. This allows peers who use a new substream
281
                // for each ping to have successful ping exchanges with peers
282
                // that use a single substream, since every successful ping
283
                // resets `failures` to `0`.
284
                if self.failures > 1 {
×
285
                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Err(error)));
×
286
                }
×
287
            }
325✔
288

289
            // Continue outbound pings.
290
            match self.outbound.take() {
325✔
291
                Some(OutboundState::Ping(mut ping)) => match ping.poll_unpin(cx) {
50✔
292
                    Poll::Pending => {
293
                        self.outbound = Some(OutboundState::Ping(ping));
38✔
294
                        break;
38✔
295
                    }
296
                    Poll::Ready(Ok((stream, rtt))) => {
12✔
297
                        tracing::debug!(?rtt, "ping succeeded");
12✔
298
                        self.failures = 0;
12✔
299
                        self.interval.reset(self.config.interval);
12✔
300
                        self.outbound = Some(OutboundState::Idle(stream));
12✔
301
                        return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Ok(rtt)));
12✔
302
                    }
303
                    Poll::Ready(Err(e)) => {
×
304
                        self.interval.reset(self.config.interval);
×
305
                        self.pending_errors.push_front(e);
×
306
                    }
×
307
                },
308
                Some(OutboundState::Idle(stream)) => match self.interval.poll_unpin(cx) {
108✔
309
                    Poll::Pending => {
310
                        self.outbound = Some(OutboundState::Idle(stream));
108✔
311
                        break;
108✔
312
                    }
313
                    Poll::Ready(()) => {
×
314
                        self.outbound = Some(OutboundState::Ping(
×
315
                            send_ping(stream, self.config.timeout).boxed(),
×
316
                        ));
×
317
                    }
×
318
                },
319
                Some(OutboundState::OpenStream) => {
320
                    self.outbound = Some(OutboundState::OpenStream);
86✔
321
                    break;
86✔
322
                }
323
                None => match self.interval.poll_unpin(cx) {
81✔
324
                    Poll::Pending => break,
68✔
325
                    Poll::Ready(()) => {
326
                        self.outbound = Some(OutboundState::OpenStream);
13✔
327
                        let protocol = SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ());
13✔
328
                        return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
13✔
329
                            protocol,
13✔
330
                        });
13✔
331
                    }
332
                },
333
            }
334
        }
335

336
        Poll::Pending
300✔
337
    }
328✔
338

339
    fn on_connection_event(
42✔
340
        &mut self,
42✔
341
        event: ConnectionEvent<Self::InboundProtocol, Self::OutboundProtocol>,
42✔
342
    ) {
42✔
343
        match event {
42✔
344
            ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
345
                protocol: mut stream,
12✔
346
                ..
347
            }) => {
12✔
348
                stream.ignore_for_keep_alive();
12✔
349
                self.inbound = Some(protocol::recv_ping(stream).boxed());
12✔
350
            }
12✔
351
            ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
352
                protocol: mut stream,
12✔
353
                ..
354
            }) => {
12✔
355
                stream.ignore_for_keep_alive();
12✔
356
                self.outbound = Some(OutboundState::Ping(
12✔
357
                    send_ping(stream, self.config.timeout).boxed(),
12✔
358
                ));
12✔
359
            }
12✔
360
            ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
1✔
361
                self.on_dial_upgrade_error(dial_upgrade_error)
1✔
362
            }
363
            _ => {}
17✔
364
        }
365
    }
42✔
366
}
367

368
type PingFuture = BoxFuture<'static, Result<(Stream, Duration), Failure>>;
369
type PongFuture = BoxFuture<'static, Result<Stream, io::Error>>;
370

371
/// The current state w.r.t. outbound pings.
372
enum OutboundState {
373
    /// A new substream is being negotiated for the ping protocol.
374
    OpenStream,
375
    /// The substream is idle, waiting to send the next ping.
376
    Idle(Stream),
377
    /// A ping is being sent and the response awaited.
378
    Ping(PingFuture),
379
}
380

381
/// A wrapper around [`protocol::send_ping`] that enforces a time out.
382
async fn send_ping(stream: Stream, timeout: Duration) -> Result<(Stream, Duration), Failure> {
12✔
383
    let ping = protocol::send_ping(stream);
12✔
384
    futures::pin_mut!(ping);
12✔
385

386
    match future::select(ping, Delay::new(timeout)).await {
12✔
387
        Either::Left((Ok((stream, rtt)), _)) => Ok((stream, rtt)),
12✔
388
        Either::Left((Err(e), _)) => Err(Failure::other(e)),
×
389
        Either::Right(((), _)) => Err(Failure::Timeout),
×
390
    }
391
}
12✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc