• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

butlergroup / rust-libp2p / 18610913338

18 Oct 2025 04:41AM UTC coverage: 78.379% (+2.5%) from 75.842%
18610913338

push

github

butlergroup
	modified:   .github/workflows/ci.yml

36944 of 47135 relevant lines covered (78.38%)

37728.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.11
/protocols/gossipsub/src/handler.rs
1
// Copyright 2020 Sigma Prime Pty Ltd.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a
4
// copy of this software and associated documentation files (the "Software"),
5
// to deal in the Software without restriction, including without limitation
6
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7
// and/or sell copies of the Software, and to permit persons to whom the
8
// Software is furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19
// DEALINGS IN THE SOFTWARE.
20

21
use std::{
22
    pin::Pin,
23
    task::{Context, Poll},
24
};
25

26
use asynchronous_codec::Framed;
27
use futures::{future::Either, prelude::*, StreamExt};
28
use libp2p_core::upgrade::DeniedUpgrade;
29
use libp2p_swarm::{
30
    handler::{
31
        ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError,
32
        FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol,
33
    },
34
    Stream,
35
};
36
use web_time::Instant;
37

38
use crate::{
39
    protocol::{GossipsubCodec, ProtocolConfig},
40
    queue::Queue,
41
    rpc_proto::proto,
42
    types::{PeerKind, RawMessage, RpcIn, RpcOut},
43
    ValidationError,
44
};
45

46
/// The event emitted by the Handler. This informs the behaviour of various events created
47
/// by the handler.
48
#[derive(Debug)]
49
pub enum HandlerEvent {
50
    /// A GossipsubRPC message has been received. This also contains a list of invalid messages (if
51
    /// any) that were received.
52
    Message {
53
        /// The GossipsubRPC message excluding any invalid messages.
54
        rpc: RpcIn,
55
        /// Any invalid messages that were received in the RPC, along with the associated
56
        /// validation error.
57
        invalid_messages: Vec<(RawMessage, ValidationError)>,
58
    },
59
    /// An inbound or outbound substream has been established with the peer and this informs over
60
    /// which protocol. This message only occurs once per connection.
61
    PeerKind(PeerKind),
62
    /// A message to be published was dropped because it could not be sent in time.
63
    MessageDropped(RpcOut),
64
}
65

66
/// A message sent from the behaviour to the handler.
67
#[allow(clippy::large_enum_variant)]
68
#[derive(Debug)]
69
pub enum HandlerIn {
70
    /// The peer has joined the mesh.
71
    JoinedMesh,
72
    /// The peer has left the mesh.
73
    LeftMesh,
74
}
75

76
/// The maximum number of inbound or outbound substreams attempts we allow.
77
///
78
/// Gossipsub is supposed to have a single long-lived inbound and outbound substream. On failure we
79
/// attempt to recreate these. This imposes an upper bound of new substreams before we consider the
80
/// connection faulty and disable the handler. This also prevents against potential substream
81
/// creation loops.
82
const MAX_SUBSTREAM_ATTEMPTS: usize = 5;
83

84
#[allow(clippy::large_enum_variant)]
85
pub enum Handler {
86
    Enabled(EnabledHandler),
87
    Disabled(DisabledHandler),
88
}
89

90
/// Protocol Handler that manages a single long-lived substream with a peer.
91
pub struct EnabledHandler {
92
    /// Upgrade configuration for the gossipsub protocol.
93
    listen_protocol: ProtocolConfig,
94

95
    /// The single long-lived outbound substream.
96
    outbound_substream: Option<OutboundSubstreamState>,
97

98
    /// The single long-lived inbound substream.
99
    inbound_substream: Option<InboundSubstreamState>,
100

101
    /// Queue of dispatched Rpc messages to send.
102
    message_queue: Queue,
103

104
    /// Flag indicating that an outbound substream is being established to prevent duplicate
105
    /// requests.
106
    outbound_substream_establishing: bool,
107

108
    /// The number of outbound substreams we have requested.
109
    outbound_substream_attempts: usize,
110

111
    /// The number of inbound substreams that have been created by the peer.
112
    inbound_substream_attempts: usize,
113

114
    /// The type of peer this handler is associated to.
115
    peer_kind: Option<PeerKind>,
116

117
    /// Keeps track on whether we have sent the peer kind to the behaviour.
118
    // NOTE: Use this flag rather than checking the substream count each poll.
119
    peer_kind_sent: bool,
120

121
    last_io_activity: Instant,
122

123
    /// Keeps track of whether this connection is for a peer in the mesh. This is used to make
124
    /// decisions about the keep alive state for this connection.
125
    in_mesh: bool,
126
}
127

128
pub enum DisabledHandler {
129
    /// If the peer doesn't support the gossipsub protocol we do not immediately disconnect.
130
    /// Rather, we disable the handler and prevent any incoming or outgoing substreams from being
131
    /// established.
132
    ProtocolUnsupported {
133
        /// Keeps track on whether we have sent the peer kind to the behaviour.
134
        peer_kind_sent: bool,
135
    },
136
    /// The maximum number of inbound or outbound substream attempts have happened and thereby the
137
    /// handler has been disabled.
138
    MaxSubstreamAttempts,
139
}
140

141
/// State of the inbound substream, opened either by us or by the remote.
142
enum InboundSubstreamState {
143
    /// Waiting for a message from the remote. The idle state for an inbound substream.
144
    WaitingInput(Framed<Stream, GossipsubCodec>),
145
    /// The substream is being closed.
146
    Closing(Framed<Stream, GossipsubCodec>),
147
    /// An error occurred during processing.
148
    Poisoned,
149
}
150

151
/// State of the outbound substream, opened either by us or by the remote.
152
enum OutboundSubstreamState {
153
    /// Waiting for the user to send a message. The idle state for an outbound substream.
154
    WaitingOutput(Framed<Stream, GossipsubCodec>),
155
    /// Waiting to send a message to the remote.
156
    PendingSend(Framed<Stream, GossipsubCodec>, proto::RPC),
157
    /// Waiting to flush the substream so that the data arrives to the remote.
158
    PendingFlush(Framed<Stream, GossipsubCodec>),
159
    /// An error occurred during processing.
160
    Poisoned,
161
}
162

163
impl Handler {
164
    /// Builds a new [`Handler`].
165
    pub(crate) fn new(protocol_config: ProtocolConfig, message_queue: Queue) -> Self {
9✔
166
        Handler::Enabled(EnabledHandler {
9✔
167
            listen_protocol: protocol_config,
9✔
168
            inbound_substream: None,
9✔
169
            outbound_substream: None,
9✔
170
            outbound_substream_establishing: false,
9✔
171
            outbound_substream_attempts: 0,
9✔
172
            inbound_substream_attempts: 0,
9✔
173
            message_queue,
9✔
174
            peer_kind: None,
9✔
175
            peer_kind_sent: false,
9✔
176
            last_io_activity: Instant::now(),
9✔
177
            in_mesh: false,
9✔
178
        })
9✔
179
    }
9✔
180
}
181

182
impl EnabledHandler {
183
    fn on_fully_negotiated_inbound(
6✔
184
        &mut self,
6✔
185
        (substream, peer_kind): (Framed<Stream, GossipsubCodec>, PeerKind),
6✔
186
    ) {
6✔
187
        // update the known kind of peer
188
        if self.peer_kind.is_none() {
6✔
189
            self.peer_kind = Some(peer_kind);
6✔
190
        }
6✔
191

192
        // new inbound substream. Replace the current one, if it exists.
193
        tracing::trace!("New inbound substream request");
6✔
194
        self.inbound_substream = Some(InboundSubstreamState::WaitingInput(substream));
6✔
195
    }
6✔
196

197
    fn on_fully_negotiated_outbound(
6✔
198
        &mut self,
6✔
199
        FullyNegotiatedOutbound { protocol, .. }: FullyNegotiatedOutbound<
6✔
200
            <Handler as ConnectionHandler>::OutboundProtocol,
6✔
201
        >,
6✔
202
    ) {
6✔
203
        let (substream, peer_kind) = protocol;
6✔
204

205
        // update the known kind of peer
206
        if self.peer_kind.is_none() {
6✔
207
            self.peer_kind = Some(peer_kind);
×
208
        }
6✔
209

210
        assert!(
6✔
211
            self.outbound_substream.is_none(),
6✔
212
            "Established an outbound substream with one already available"
×
213
        );
214
        self.outbound_substream = Some(OutboundSubstreamState::WaitingOutput(substream));
6✔
215
    }
6✔
216

217
    fn poll(
113✔
218
        &mut self,
113✔
219
        cx: &mut Context<'_>,
113✔
220
    ) -> Poll<
113✔
221
        ConnectionHandlerEvent<
113✔
222
            <Handler as ConnectionHandler>::OutboundProtocol,
113✔
223
            (),
113✔
224
            <Handler as ConnectionHandler>::ToBehaviour,
113✔
225
        >,
113✔
226
    > {
113✔
227
        if !self.peer_kind_sent {
113✔
228
            if let Some(peer_kind) = self.peer_kind.as_ref() {
45✔
229
                self.peer_kind_sent = true;
6✔
230
                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
6✔
231
                    HandlerEvent::PeerKind(*peer_kind),
6✔
232
                ));
6✔
233
            }
39✔
234
        }
68✔
235

236
        // determine if we need to create the outbound stream
237
        if !self.message_queue.is_empty()
107✔
238
            && self.outbound_substream.is_none()
55✔
239
            && !self.outbound_substream_establishing
40✔
240
        {
241
            self.outbound_substream_establishing = true;
6✔
242
            return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
6✔
243
                protocol: SubstreamProtocol::new(self.listen_protocol.clone(), ()),
6✔
244
            });
6✔
245
        }
101✔
246

247
        // process outbound stream
248
        loop {
249
            match self
146✔
250
                .outbound_substream
146✔
251
                .replace(OutboundSubstreamState::Poisoned)
146✔
252
            {
253
                // outbound idle state
254
                Some(OutboundSubstreamState::WaitingOutput(substream)) => {
71✔
255
                    if let Poll::Ready(mut message) = Pin::new(&mut self.message_queue).poll_pop(cx)
71✔
256
                    {
257
                        match message {
15✔
258
                            RpcOut::Publish {
259
                                message: _,
260
                                ref mut timeout,
1✔
261
                                ..
262
                            }
263
                            | RpcOut::Forward {
264
                                message: _,
265
                                ref mut timeout,
2✔
266
                                ..
267
                            } => {
268
                                if Pin::new(timeout).poll(cx).is_ready() {
3✔
269
                                    // Inform the behaviour and end the poll.
270
                                    self.outbound_substream =
×
271
                                        Some(OutboundSubstreamState::WaitingOutput(substream));
×
272
                                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
×
273
                                        HandlerEvent::MessageDropped(message),
×
274
                                    ));
×
275
                                }
3✔
276
                            }
277
                            _ => {} // All other messages are not time-bound.
12✔
278
                        }
279
                        self.outbound_substream = Some(OutboundSubstreamState::PendingSend(
15✔
280
                            substream,
15✔
281
                            message.into_protobuf(),
15✔
282
                        ));
15✔
283
                        continue;
15✔
284
                    }
56✔
285

286
                    self.outbound_substream =
56✔
287
                        Some(OutboundSubstreamState::WaitingOutput(substream));
56✔
288
                    break;
56✔
289
                }
290
                Some(OutboundSubstreamState::PendingSend(mut substream, message)) => {
15✔
291
                    match Sink::poll_ready(Pin::new(&mut substream), cx) {
15✔
292
                        Poll::Ready(Ok(())) => {
293
                            match Sink::start_send(Pin::new(&mut substream), message) {
15✔
294
                                Ok(()) => {
295
                                    self.outbound_substream =
15✔
296
                                        Some(OutboundSubstreamState::PendingFlush(substream))
15✔
297
                                }
298
                                Err(e) => {
×
299
                                    tracing::debug!(
×
300
                                        "Failed to send message on outbound stream: {e}"
×
301
                                    );
302
                                    self.outbound_substream = None;
×
303
                                    break;
×
304
                                }
305
                            }
306
                        }
307
                        Poll::Ready(Err(e)) => {
×
308
                            tracing::debug!("Failed to send message on outbound stream: {e}");
×
309
                            self.outbound_substream = None;
×
310
                            break;
×
311
                        }
312
                        Poll::Pending => {
313
                            self.outbound_substream =
×
314
                                Some(OutboundSubstreamState::PendingSend(substream, message));
×
315
                            break;
×
316
                        }
317
                    }
318
                }
319
                Some(OutboundSubstreamState::PendingFlush(mut substream)) => {
15✔
320
                    match Sink::poll_flush(Pin::new(&mut substream), cx) {
15✔
321
                        Poll::Ready(Ok(())) => {
322
                            self.last_io_activity = Instant::now();
15✔
323
                            self.outbound_substream =
15✔
324
                                Some(OutboundSubstreamState::WaitingOutput(substream))
15✔
325
                        }
326
                        Poll::Ready(Err(e)) => {
×
327
                            tracing::debug!("Failed to flush outbound stream: {e}");
×
328
                            self.outbound_substream = None;
×
329
                            break;
×
330
                        }
331
                        Poll::Pending => {
332
                            self.outbound_substream =
×
333
                                Some(OutboundSubstreamState::PendingFlush(substream));
×
334
                            break;
×
335
                        }
336
                    }
337
                }
338
                None => {
339
                    self.outbound_substream = None;
45✔
340
                    break;
45✔
341
                }
342
                Some(OutboundSubstreamState::Poisoned) => {
343
                    unreachable!("Error occurred during outbound stream processing")
×
344
                }
345
            }
346
        }
347

348
        // Handle inbound messages.
349
        loop {
350
            match self
101✔
351
                .inbound_substream
101✔
352
                .replace(InboundSubstreamState::Poisoned)
101✔
353
            {
354
                // inbound idle state
355
                Some(InboundSubstreamState::WaitingInput(mut substream)) => {
68✔
356
                    match substream.poll_next_unpin(cx) {
68✔
357
                        Poll::Ready(Some(Ok(message))) => {
15✔
358
                            self.last_io_activity = Instant::now();
15✔
359
                            self.inbound_substream =
15✔
360
                                Some(InboundSubstreamState::WaitingInput(substream));
15✔
361
                            return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(message));
15✔
362
                        }
363
                        Poll::Ready(Some(Err(error))) => {
×
364
                            tracing::debug!("Failed to read from inbound stream: {error}");
×
365
                            // Close this side of the stream. If the
366
                            // peer is still around, they will re-establish their
367
                            // outbound stream i.e. our inbound stream.
368
                            self.inbound_substream =
×
369
                                Some(InboundSubstreamState::Closing(substream));
×
370
                        }
371
                        // peer closed the stream
372
                        Poll::Ready(None) => {
373
                            tracing::debug!("Inbound stream closed by remote");
×
374
                            self.inbound_substream =
×
375
                                Some(InboundSubstreamState::Closing(substream));
×
376
                        }
377
                        Poll::Pending => {
378
                            self.inbound_substream =
53✔
379
                                Some(InboundSubstreamState::WaitingInput(substream));
53✔
380
                            break;
53✔
381
                        }
382
                    }
383
                }
384
                Some(InboundSubstreamState::Closing(mut substream)) => {
×
385
                    match Sink::poll_close(Pin::new(&mut substream), cx) {
×
386
                        Poll::Ready(res) => {
×
387
                            if let Err(e) = res {
×
388
                                // Don't close the connection but just drop the inbound substream.
389
                                // In case the remote has more to send, they will open up a new
390
                                // substream.
391
                                tracing::debug!("Inbound substream error while closing: {e}");
×
392
                            }
×
393
                            self.inbound_substream = None;
×
394
                            break;
×
395
                        }
396
                        Poll::Pending => {
397
                            self.inbound_substream =
×
398
                                Some(InboundSubstreamState::Closing(substream));
×
399
                            break;
×
400
                        }
401
                    }
402
                }
403
                None => {
404
                    self.inbound_substream = None;
33✔
405
                    break;
33✔
406
                }
407
                Some(InboundSubstreamState::Poisoned) => {
408
                    unreachable!("Error occurred during inbound stream processing")
×
409
                }
410
            }
411
        }
412

413
        Poll::Pending
86✔
414
    }
113✔
415
}
416

417
impl ConnectionHandler for Handler {
418
    type FromBehaviour = HandlerIn;
419
    type ToBehaviour = HandlerEvent;
420
    type InboundOpenInfo = ();
421
    type InboundProtocol = either::Either<ProtocolConfig, DeniedUpgrade>;
422
    type OutboundOpenInfo = ();
423
    type OutboundProtocol = ProtocolConfig;
424

425
    fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
80✔
426
        match self {
80✔
427
            Handler::Enabled(handler) => {
80✔
428
                SubstreamProtocol::new(either::Either::Left(handler.listen_protocol.clone()), ())
80✔
429
            }
430
            Handler::Disabled(_) => {
431
                SubstreamProtocol::new(either::Either::Right(DeniedUpgrade), ())
×
432
            }
433
        }
434
    }
80✔
435

436
    fn on_behaviour_event(&mut self, message: HandlerIn) {
6✔
437
        match self {
6✔
438
            Handler::Enabled(handler) => match message {
6✔
439
                HandlerIn::JoinedMesh => {
6✔
440
                    handler.in_mesh = true;
6✔
441
                }
6✔
442
                HandlerIn::LeftMesh => {
×
443
                    handler.in_mesh = false;
×
444
                }
×
445
            },
446
            Handler::Disabled(_) => {
447
                tracing::debug!(?message, "Handler is disabled. Dropping message");
×
448
            }
449
        }
450
    }
6✔
451

452
    fn connection_keep_alive(&self) -> bool {
11✔
453
        matches!(self, Handler::Enabled(h) if h.in_mesh)
11✔
454
    }
11✔
455

456
    #[tracing::instrument(level = "trace", name = "ConnectionHandler::poll", skip(self, cx))]
457
    fn poll(
113✔
458
        &mut self,
113✔
459
        cx: &mut Context<'_>,
113✔
460
    ) -> Poll<ConnectionHandlerEvent<Self::OutboundProtocol, (), Self::ToBehaviour>> {
113✔
461
        match self {
×
462
            Handler::Enabled(handler) => handler.poll(cx),
113✔
463
            Handler::Disabled(DisabledHandler::ProtocolUnsupported { peer_kind_sent }) => {
×
464
                if !*peer_kind_sent {
×
465
                    *peer_kind_sent = true;
×
466
                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
×
467
                        HandlerEvent::PeerKind(PeerKind::NotSupported),
×
468
                    ));
×
469
                }
×
470

471
                Poll::Pending
×
472
            }
473
            Handler::Disabled(DisabledHandler::MaxSubstreamAttempts) => Poll::Pending,
×
474
        }
475
    }
113✔
476

477
    fn on_connection_event(
18✔
478
        &mut self,
18✔
479
        event: ConnectionEvent<Self::InboundProtocol, Self::OutboundProtocol>,
18✔
480
    ) {
18✔
481
        match self {
18✔
482
            Handler::Enabled(handler) => {
18✔
483
                if event.is_inbound() {
18✔
484
                    handler.inbound_substream_attempts += 1;
6✔
485

486
                    if handler.inbound_substream_attempts == MAX_SUBSTREAM_ATTEMPTS {
6✔
487
                        tracing::warn!(
×
488
                            "The maximum number of inbound substreams attempts has been exceeded"
×
489
                        );
490
                        *self = Handler::Disabled(DisabledHandler::MaxSubstreamAttempts);
×
491
                        return;
×
492
                    }
6✔
493
                }
12✔
494

495
                if event.is_outbound() {
18✔
496
                    handler.outbound_substream_establishing = false;
6✔
497

498
                    handler.outbound_substream_attempts += 1;
6✔
499

500
                    if handler.outbound_substream_attempts == MAX_SUBSTREAM_ATTEMPTS {
6✔
501
                        tracing::warn!(
×
502
                            "The maximum number of outbound substream attempts has been exceeded"
×
503
                        );
504
                        *self = Handler::Disabled(DisabledHandler::MaxSubstreamAttempts);
×
505
                        return;
×
506
                    }
6✔
507
                }
12✔
508

509
                match event {
×
510
                    ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
511
                        protocol,
6✔
512
                        ..
513
                    }) => match protocol {
6✔
514
                        Either::Left(protocol) => handler.on_fully_negotiated_inbound(protocol),
6✔
515
                        Either::Right(v) => libp2p_core::util::unreachable(v),
516
                    },
517
                    ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => {
6✔
518
                        handler.on_fully_negotiated_outbound(fully_negotiated_outbound)
6✔
519
                    }
520
                    ConnectionEvent::DialUpgradeError(DialUpgradeError {
521
                        error: StreamUpgradeError::Timeout,
522
                        ..
523
                    }) => {
524
                        tracing::debug!("Dial upgrade error: Protocol negotiation timeout");
×
525
                    }
526
                    ConnectionEvent::DialUpgradeError(DialUpgradeError {
527
                        error: StreamUpgradeError::Apply(e),
×
528
                        ..
529
                    }) => libp2p_core::util::unreachable(e),
×
530
                    ConnectionEvent::DialUpgradeError(DialUpgradeError {
531
                        error: StreamUpgradeError::NegotiationFailed,
532
                        ..
533
                    }) => {
534
                        // The protocol is not supported
535
                        tracing::debug!(
×
536
                            "The remote peer does not support gossipsub on this connection"
×
537
                        );
538
                        *self = Handler::Disabled(DisabledHandler::ProtocolUnsupported {
×
539
                            peer_kind_sent: false,
×
540
                        });
×
541
                    }
542
                    ConnectionEvent::DialUpgradeError(DialUpgradeError {
543
                        error: StreamUpgradeError::Io(e),
×
544
                        ..
545
                    }) => {
546
                        tracing::debug!("Protocol negotiation failed: {e}")
×
547
                    }
548
                    _ => {}
6✔
549
                }
550
            }
551
            Handler::Disabled(_) => {}
×
552
        }
553
    }
18✔
554
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc