• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

butlergroup / rust-libp2p / 18610913338

18 Oct 2025 04:41AM UTC coverage: 78.379% (+2.5%) from 75.842%
18610913338

push

github

butlergroup
	modified:   .github/workflows/ci.yml

36944 of 47135 relevant lines covered (78.38%)

37728.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.06
/protocols/identify/src/handler.rs
1
// Copyright 2018 Parity Technologies (UK) Ltd.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a
4
// copy of this software and associated documentation files (the "Software"),
5
// to deal in the Software without restriction, including without limitation
6
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7
// and/or sell copies of the Software, and to permit persons to whom the
8
// Software is furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19
// DEALINGS IN THE SOFTWARE.
20

21
use std::{
22
    collections::HashSet,
23
    sync::Arc,
24
    task::{Context, Poll},
25
    time::Duration,
26
};
27

28
use either::Either;
29
use futures::prelude::*;
30
use futures_bounded::Timeout;
31
use futures_timer::Delay;
32
use libp2p_core::{
33
    upgrade::{ReadyUpgrade, SelectUpgrade},
34
    Multiaddr,
35
};
36
use libp2p_identity::PeerId;
37
use libp2p_swarm::{
38
    handler::{
39
        ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
40
        ProtocolSupport,
41
    },
42
    ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, StreamUpgradeError,
43
    SubstreamProtocol, SupportedProtocols,
44
};
45
use smallvec::SmallVec;
46
use tracing::Level;
47

48
use crate::{
49
    behaviour::KeyType,
50
    protocol::{self, Info, PushInfo, UpgradeError},
51
    PROTOCOL_NAME, PUSH_PROTOCOL_NAME,
52
};
53

54
const STREAM_TIMEOUT: Duration = Duration::from_secs(60);
55
const MAX_CONCURRENT_STREAMS_PER_CONNECTION: usize = 10;
56

57
/// Protocol handler for sending and receiving identification requests.
58
///
59
/// Outbound requests are sent periodically. The handler performs expects
60
/// at least one identification request to be answered by the remote before
61
/// permitting the underlying connection to be closed.
62
pub struct Handler {
63
    remote_peer_id: PeerId,
64
    /// Pending events to yield.
65
    events: SmallVec<
66
        [ConnectionHandlerEvent<
67
            Either<ReadyUpgrade<StreamProtocol>, ReadyUpgrade<StreamProtocol>>,
68
            (),
69
            Event,
70
        >; 4],
71
    >,
72

73
    active_streams: futures_bounded::FuturesSet<Result<Success, UpgradeError>>,
74

75
    /// Future that fires when we need to identify the node again.
76
    trigger_next_identify: Delay,
77

78
    /// Whether we have exchanged at least one periodic identify.
79
    exchanged_one_periodic_identify: bool,
80

81
    /// The interval of `trigger_next_identify`, i.e. the recurrent delay.
82
    interval: Duration,
83

84
    /// The key of the local peer.
85
    local_key: Arc<KeyType>,
86

87
    /// Application-specific version of the protocol family used by the peer,
88
    /// e.g. `ipfs/1.0.0` or `polkadot/1.0.0`.
89
    protocol_version: String,
90

91
    /// Name and version of the peer, similar to the `User-Agent` header in
92
    /// the HTTP protocol.
93
    agent_version: String,
94

95
    /// Address observed by or for the remote.
96
    observed_addr: Multiaddr,
97

98
    /// Identify information about the remote peer.
99
    remote_info: Option<Info>,
100

101
    local_supported_protocols: SupportedProtocols,
102
    remote_supported_protocols: HashSet<StreamProtocol>,
103
    external_addresses: HashSet<Multiaddr>,
104
}
105

106
/// An event from `Behaviour` with the information requested by the `Handler`.
107
#[derive(Debug)]
108
pub enum InEvent {
109
    AddressesChanged(HashSet<Multiaddr>),
110
    Push,
111
}
112

113
/// Event produced by the `Handler`.
114
#[derive(Debug)]
115
#[allow(clippy::large_enum_variant)]
116
pub enum Event {
117
    /// We obtained identification information from the remote.
118
    Identified(Info),
119
    /// We replied to an identification request from the remote.
120
    Identification,
121
    /// We actively pushed our identification information to the remote.
122
    IdentificationPushed(Info),
123
    /// Failed to identify the remote, or to reply to an identification request.
124
    IdentificationError(StreamUpgradeError<UpgradeError>),
125
}
126

127
impl Handler {
128
    /// Creates a new `Handler`.
129
    pub(crate) fn new(
87✔
130
        interval: Duration,
87✔
131
        remote_peer_id: PeerId,
87✔
132
        local_key: Arc<KeyType>,
87✔
133
        protocol_version: String,
87✔
134
        agent_version: String,
87✔
135
        observed_addr: Multiaddr,
87✔
136
        external_addresses: HashSet<Multiaddr>,
87✔
137
    ) -> Self {
87✔
138
        Self {
87✔
139
            remote_peer_id,
87✔
140
            events: SmallVec::new(),
87✔
141
            active_streams: futures_bounded::FuturesSet::new(
87✔
142
                STREAM_TIMEOUT,
87✔
143
                MAX_CONCURRENT_STREAMS_PER_CONNECTION,
87✔
144
            ),
87✔
145
            trigger_next_identify: Delay::new(Duration::ZERO),
87✔
146
            exchanged_one_periodic_identify: false,
87✔
147
            interval,
87✔
148
            local_key,
87✔
149
            protocol_version,
87✔
150
            agent_version,
87✔
151
            observed_addr,
87✔
152
            local_supported_protocols: SupportedProtocols::default(),
87✔
153
            remote_supported_protocols: HashSet::default(),
87✔
154
            remote_info: Default::default(),
87✔
155
            external_addresses,
87✔
156
        }
87✔
157
    }
87✔
158

159
    fn on_fully_negotiated_inbound(
99✔
160
        &mut self,
99✔
161
        FullyNegotiatedInbound {
99✔
162
            protocol: output, ..
99✔
163
        }: FullyNegotiatedInbound<<Self as ConnectionHandler>::InboundProtocol>,
99✔
164
    ) {
99✔
165
        match output {
99✔
166
            future::Either::Left(stream) => {
95✔
167
                let info = self.build_info();
95✔
168

169
                if self
95✔
170
                    .active_streams
95✔
171
                    .try_push(
95✔
172
                        protocol::send_identify(stream, info).map_ok(|_| Success::SentIdentify),
95✔
173
                    )
174
                    .is_err()
95✔
175
                {
176
                    tracing::warn!("Dropping inbound stream because we are at capacity");
×
177
                } else {
95✔
178
                    self.exchanged_one_periodic_identify = true;
95✔
179
                }
95✔
180
            }
181
            future::Either::Right(stream) => {
4✔
182
                if self
4✔
183
                    .active_streams
4✔
184
                    .try_push(protocol::recv_push(stream).map_ok(Success::ReceivedIdentifyPush))
4✔
185
                    .is_err()
4✔
186
                {
187
                    tracing::warn!(
×
188
                        "Dropping inbound identify push stream because we are at capacity"
×
189
                    );
190
                }
4✔
191
            }
192
        }
193
    }
99✔
194

195
    fn on_fully_negotiated_outbound(
99✔
196
        &mut self,
99✔
197
        FullyNegotiatedOutbound {
99✔
198
            protocol: output, ..
99✔
199
        }: FullyNegotiatedOutbound<<Self as ConnectionHandler>::OutboundProtocol>,
99✔
200
    ) {
99✔
201
        match output {
99✔
202
            future::Either::Left(stream) => {
95✔
203
                if self
95✔
204
                    .active_streams
95✔
205
                    .try_push(protocol::recv_identify(stream).map_ok(Success::ReceivedIdentify))
95✔
206
                    .is_err()
95✔
207
                {
208
                    tracing::warn!("Dropping outbound identify stream because we are at capacity");
×
209
                }
95✔
210
            }
211
            future::Either::Right(stream) => {
4✔
212
                let info = self.build_info();
4✔
213

214
                if self
4✔
215
                    .active_streams
4✔
216
                    .try_push(
4✔
217
                        protocol::send_identify(stream, info).map_ok(Success::SentIdentifyPush),
4✔
218
                    )
4✔
219
                    .is_err()
4✔
220
                {
221
                    tracing::warn!(
×
222
                        "Dropping outbound identify push stream because we are at capacity"
×
223
                    );
224
                }
4✔
225
            }
226
        }
227
    }
99✔
228

229
    fn build_info(&mut self) -> Info {
99✔
230
        let signed_envelope = match self.local_key.as_ref() {
99✔
231
            KeyType::PublicKey(_) => None,
99✔
232
            KeyType::Keypair { keypair, .. } => libp2p_core::PeerRecord::new(
×
233
                keypair,
×
234
                Vec::from_iter(self.external_addresses.iter().cloned()),
×
235
            )
236
            .ok()
×
237
            .map(|r| r.into_signed_envelope()),
×
238
        };
239
        Info {
99✔
240
            public_key: self.local_key.public_key().clone(),
99✔
241
            protocol_version: self.protocol_version.clone(),
99✔
242
            agent_version: self.agent_version.clone(),
99✔
243
            listen_addrs: Vec::from_iter(self.external_addresses.iter().cloned()),
99✔
244
            protocols: Vec::from_iter(self.local_supported_protocols.iter().cloned()),
99✔
245
            observed_addr: self.observed_addr.clone(),
99✔
246
            signed_peer_record: signed_envelope,
99✔
247
        }
99✔
248
    }
99✔
249

250
    /// If the public key matches the remote peer, handles the given `info` and returns `true`.
251
    fn handle_incoming_info(&mut self, info: &Info) -> bool {
99✔
252
        let derived_peer_id = info.public_key.to_peer_id();
99✔
253
        if self.remote_peer_id != derived_peer_id {
99✔
254
            return false;
4✔
255
        }
95✔
256

257
        self.remote_info.replace(info.clone());
95✔
258

259
        self.update_supported_protocols_for_remote(info);
95✔
260
        true
95✔
261
    }
99✔
262

263
    fn update_supported_protocols_for_remote(&mut self, remote_info: &Info) {
95✔
264
        let new_remote_protocols = HashSet::from_iter(remote_info.protocols.clone());
95✔
265

266
        let remote_added_protocols = new_remote_protocols
95✔
267
            .difference(&self.remote_supported_protocols)
95✔
268
            .cloned()
95✔
269
            .collect::<HashSet<_>>();
95✔
270
        let remote_removed_protocols = self
95✔
271
            .remote_supported_protocols
95✔
272
            .difference(&new_remote_protocols)
95✔
273
            .cloned()
95✔
274
            .collect::<HashSet<_>>();
95✔
275

276
        if !remote_added_protocols.is_empty() {
95✔
277
            self.events
81✔
278
                .push(ConnectionHandlerEvent::ReportRemoteProtocols(
81✔
279
                    ProtocolSupport::Added(remote_added_protocols),
81✔
280
                ));
81✔
281
        }
81✔
282

283
        if !remote_removed_protocols.is_empty() {
95✔
284
            self.events
×
285
                .push(ConnectionHandlerEvent::ReportRemoteProtocols(
×
286
                    ProtocolSupport::Removed(remote_removed_protocols),
×
287
                ));
×
288
        }
95✔
289

290
        self.remote_supported_protocols = new_remote_protocols;
95✔
291
    }
95✔
292

293
    fn local_protocols_to_string(&mut self) -> String {
×
294
        self.local_supported_protocols
×
295
            .iter()
×
296
            .map(|p| p.to_string())
×
297
            .collect::<Vec<_>>()
×
298
            .join(", ")
×
299
    }
×
300
}
301

302
impl ConnectionHandler for Handler {
303
    type FromBehaviour = InEvent;
304
    type ToBehaviour = Event;
305
    type InboundProtocol =
306
        SelectUpgrade<ReadyUpgrade<StreamProtocol>, ReadyUpgrade<StreamProtocol>>;
307
    type OutboundProtocol = Either<ReadyUpgrade<StreamProtocol>, ReadyUpgrade<StreamProtocol>>;
308
    type OutboundOpenInfo = ();
309
    type InboundOpenInfo = ();
310

311
    fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
1,553✔
312
        SubstreamProtocol::new(
1,553✔
313
            SelectUpgrade::new(
1,553✔
314
                ReadyUpgrade::new(PROTOCOL_NAME),
1,553✔
315
                ReadyUpgrade::new(PUSH_PROTOCOL_NAME),
1,553✔
316
            ),
317
            (),
1,553✔
318
        )
319
    }
1,553✔
320

321
    fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
25✔
322
        match event {
25✔
323
            InEvent::AddressesChanged(addresses) => {
24✔
324
                self.external_addresses = addresses;
24✔
325
            }
24✔
326
            InEvent::Push => {
1✔
327
                self.events
1✔
328
                    .push(ConnectionHandlerEvent::OutboundSubstreamRequest {
1✔
329
                        protocol: SubstreamProtocol::new(
1✔
330
                            Either::Right(ReadyUpgrade::new(PUSH_PROTOCOL_NAME)),
1✔
331
                            (),
1✔
332
                        ),
1✔
333
                    });
1✔
334
            }
1✔
335
        }
336
    }
25✔
337

338
    #[tracing::instrument(level = "trace", name = "ConnectionHandler::poll", skip(self, cx))]
339
    fn poll(
2,183✔
340
        &mut self,
2,183✔
341
        cx: &mut Context<'_>,
2,183✔
342
    ) -> Poll<ConnectionHandlerEvent<Self::OutboundProtocol, (), Event>> {
2,183✔
343
        if let Some(event) = self.events.pop() {
2,183✔
344
            return Poll::Ready(event);
85✔
345
        }
2,098✔
346

347
        // Poll the future that fires when we need to identify the node again.
348
        if let Poll::Ready(()) = self.trigger_next_identify.poll_unpin(cx) {
2,098✔
349
            self.trigger_next_identify.reset(self.interval);
98✔
350
            let event = ConnectionHandlerEvent::OutboundSubstreamRequest {
98✔
351
                protocol: SubstreamProtocol::new(
98✔
352
                    Either::Left(ReadyUpgrade::new(PROTOCOL_NAME)),
98✔
353
                    (),
98✔
354
                ),
98✔
355
            };
98✔
356
            return Poll::Ready(event);
98✔
357
        }
2,000✔
358

359
        while let Poll::Ready(ready) = self.active_streams.poll_unpin(cx) {
2,004✔
360
            match ready {
198✔
361
                Ok(Ok(Success::ReceivedIdentify(remote_info))) => {
95✔
362
                    if self.handle_incoming_info(&remote_info) {
95✔
363
                        return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
91✔
364
                            Event::Identified(remote_info),
91✔
365
                        ));
91✔
366
                    } else {
367
                        tracing::warn!(
4✔
368
                            %self.remote_peer_id,
369
                            ?remote_info.public_key,
370
                            derived_peer_id=%remote_info.public_key.to_peer_id(),
×
371
                            "Discarding received identify message as public key does not match remote peer ID",
×
372
                        );
373
                    }
374
                }
375
                Ok(Ok(Success::SentIdentifyPush(info))) => {
4✔
376
                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
4✔
377
                        Event::IdentificationPushed(info),
4✔
378
                    ));
4✔
379
                }
380
                Ok(Ok(Success::SentIdentify)) => {
381
                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
95✔
382
                        Event::Identification,
95✔
383
                    ));
95✔
384
                }
385
                Ok(Ok(Success::ReceivedIdentifyPush(remote_push_info))) => {
4✔
386
                    if let Some(mut info) = self.remote_info.clone() {
4✔
387
                        info.merge(remote_push_info);
4✔
388

389
                        if self.handle_incoming_info(&info) {
4✔
390
                            return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
4✔
391
                                Event::Identified(info),
4✔
392
                            ));
4✔
393
                        } else {
394
                            tracing::warn!(
×
395
                                %self.remote_peer_id,
396
                                ?info.public_key,
397
                                derived_peer_id=%info.public_key.to_peer_id(),
×
398
                                "Discarding received identify message as public key does not match remote peer ID",
×
399
                            );
400
                        }
401
                    }
×
402
                }
403
                Ok(Err(e)) => {
×
404
                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
×
405
                        Event::IdentificationError(StreamUpgradeError::Apply(e)),
×
406
                    ));
×
407
                }
408
                Err(Timeout { .. }) => {
409
                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
×
410
                        Event::IdentificationError(StreamUpgradeError::Timeout),
×
411
                    ));
×
412
                }
413
            }
414
        }
415

416
        Poll::Pending
1,806✔
417
    }
2,183✔
418

419
    fn on_connection_event(
369✔
420
        &mut self,
369✔
421
        event: ConnectionEvent<Self::InboundProtocol, Self::OutboundProtocol>,
369✔
422
    ) {
369✔
423
        match event {
369✔
424
            ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => {
99✔
425
                self.on_fully_negotiated_inbound(fully_negotiated_inbound)
99✔
426
            }
427
            ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => {
99✔
428
                self.on_fully_negotiated_outbound(fully_negotiated_outbound)
99✔
429
            }
430
            ConnectionEvent::DialUpgradeError(DialUpgradeError { error, .. }) => {
×
431
                self.events.push(ConnectionHandlerEvent::NotifyBehaviour(
×
432
                    Event::IdentificationError(
433
                        error.map_upgrade_err(|e| libp2p_core::util::unreachable(e.into_inner())),
×
434
                    ),
435
                ));
436
                self.trigger_next_identify.reset(self.interval);
×
437
            }
438
            ConnectionEvent::LocalProtocolsChange(change) => {
90✔
439
                let before = tracing::enabled!(Level::DEBUG)
90✔
440
                    .then(|| self.local_protocols_to_string())
90✔
441
                    .unwrap_or_default();
90✔
442
                let protocols_changed = self.local_supported_protocols.on_protocols_change(change);
90✔
443
                let after = tracing::enabled!(Level::DEBUG)
90✔
444
                    .then(|| self.local_protocols_to_string())
90✔
445
                    .unwrap_or_default();
90✔
446

447
                if protocols_changed && self.exchanged_one_periodic_identify {
90✔
448
                    tracing::debug!(
3✔
449
                        peer=%self.remote_peer_id,
450
                        %before,
451
                        %after,
452
                        "Supported listen protocols changed, pushing to peer"
×
453
                    );
454

455
                    self.events
3✔
456
                        .push(ConnectionHandlerEvent::OutboundSubstreamRequest {
3✔
457
                            protocol: SubstreamProtocol::new(
3✔
458
                                Either::Right(ReadyUpgrade::new(PUSH_PROTOCOL_NAME)),
3✔
459
                                (),
3✔
460
                            ),
3✔
461
                        });
3✔
462
                }
87✔
463
            }
464
            _ => {}
81✔
465
        }
466
    }
369✔
467
}
468

469
enum Success {
470
    SentIdentify,
471
    ReceivedIdentify(Info),
472
    SentIdentifyPush(Info),
473
    ReceivedIdentifyPush(PushInfo),
474
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc