• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

butlergroup / rust-libp2p / 18608335757

18 Oct 2025 12:54AM UTC coverage: 75.842% (+9.9%) from 65.932%
18608335757

push

github

butlergroup
	modified:   .github/workflows/ci.yml

40950 of 53994 relevant lines covered (75.84%)

64298.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.77
/protocols/identify/src/protocol.rs
1
// Copyright 2018 Parity Technologies (UK) Ltd.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a
4
// copy of this software and associated documentation files (the "Software"),
5
// to deal in the Software without restriction, including without limitation
6
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7
// and/or sell copies of the Software, and to permit persons to whom the
8
// Software is furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19
// DEALINGS IN THE SOFTWARE.
20

21
use std::io;
22

23
use asynchronous_codec::{FramedRead, FramedWrite};
24
use futures::prelude::*;
25
use libp2p_core::{multiaddr, Multiaddr, PeerRecord, SignedEnvelope};
26
use libp2p_identity as identity;
27
use libp2p_identity::PublicKey;
28
use libp2p_swarm::StreamProtocol;
29
use thiserror::Error;
30

31
use crate::proto;
32

33
const MAX_MESSAGE_SIZE_BYTES: usize = 4096;
34

35
pub const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/ipfs/id/1.0.0");
36

37
pub const PUSH_PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/ipfs/id/push/1.0.0");
38

39
/// Identify information of a peer sent in protocol messages.
40
#[derive(Debug, Clone)]
41
pub struct Info {
42
    /// The public key of the peer.
43
    pub public_key: PublicKey,
44
    /// Application-specific version of the protocol family used by the peer,
45
    /// e.g. `ipfs/1.0.0` or `polkadot/1.0.0`.
46
    pub protocol_version: String,
47
    /// Name and version of the peer, similar to the `User-Agent` header in
48
    /// the HTTP protocol.
49
    pub agent_version: String,
50
    /// The addresses that the peer is listening on.
51
    pub listen_addrs: Vec<Multiaddr>,
52
    /// The list of protocols supported by the peer, e.g. `/ipfs/ping/1.0.0`.
53
    pub protocols: Vec<StreamProtocol>,
54
    /// Address observed by or for the remote.
55
    pub observed_addr: Multiaddr,
56
    /// Verifiable addresses of the peer.
57
    pub signed_peer_record: Option<SignedEnvelope>,
58
}
59

60
impl Info {
61
    pub fn merge(&mut self, info: PushInfo) {
12✔
62
        if let Some(public_key) = info.public_key {
12✔
63
            self.public_key = public_key;
12✔
64
        }
12✔
65
        if let Some(protocol_version) = info.protocol_version {
12✔
66
            self.protocol_version = protocol_version;
12✔
67
        }
12✔
68
        if let Some(agent_version) = info.agent_version {
12✔
69
            self.agent_version = agent_version;
12✔
70
        }
12✔
71
        if !info.listen_addrs.is_empty() {
12✔
72
            self.listen_addrs = info.listen_addrs;
6✔
73
        }
9✔
74
        if !info.protocols.is_empty() {
12✔
75
            self.protocols = info.protocols;
12✔
76
        }
12✔
77
        if let Some(observed_addr) = info.observed_addr {
12✔
78
            self.observed_addr = observed_addr;
12✔
79
        }
12✔
80
    }
12✔
81
}
82

83
/// Identify push information of a peer sent in protocol messages.
84
/// Note that missing fields should be ignored, as peers may choose to send partial updates
85
/// containing only the fields whose values have changed.
86
#[derive(Debug, Clone)]
87
pub struct PushInfo {
88
    pub public_key: Option<PublicKey>,
89
    pub protocol_version: Option<String>,
90
    pub agent_version: Option<String>,
91
    pub listen_addrs: Vec<Multiaddr>,
92
    pub protocols: Vec<StreamProtocol>,
93
    pub observed_addr: Option<Multiaddr>,
94
}
95

96
pub(crate) async fn send_identify<T>(io: T, info: Info) -> Result<Info, UpgradeError>
192✔
97
where
192✔
98
    T: AsyncWrite + Unpin,
192✔
99
{
192✔
100
    tracing::trace!("Sending: {:?}", info);
192✔
101

102
    let listen_addrs = info.listen_addrs.iter().map(|addr| addr.to_vec()).collect();
274✔
103

104
    let pubkey_bytes = info.public_key.encode_protobuf();
192✔
105

106
    let message = proto::Identify {
192✔
107
        agentVersion: Some(info.agent_version.clone()),
192✔
108
        protocolVersion: Some(info.protocol_version.clone()),
192✔
109
        publicKey: Some(pubkey_bytes),
192✔
110
        listenAddrs: listen_addrs,
192✔
111
        observedAddr: Some(info.observed_addr.to_vec()),
192✔
112
        protocols: info.protocols.iter().map(|p| p.to_string()).collect(),
437✔
113
        signedPeerRecord: info
192✔
114
            .signed_peer_record
192✔
115
            .clone()
192✔
116
            .map(|r| r.into_protobuf_encoding()),
192✔
117
    };
118

119
    let mut framed_io = FramedWrite::new(
192✔
120
        io,
192✔
121
        quick_protobuf_codec::Codec::<proto::Identify>::new(MAX_MESSAGE_SIZE_BYTES),
192✔
122
    );
123

124
    framed_io.send(message).await?;
192✔
125
    framed_io.close().await?;
192✔
126

127
    Ok(info)
192✔
128
}
192✔
129

130
pub(crate) async fn recv_push<T>(socket: T) -> Result<PushInfo, UpgradeError>
12✔
131
where
12✔
132
    T: AsyncRead + AsyncWrite + Unpin,
12✔
133
{
12✔
134
    let info = recv(socket).await?.try_into()?;
12✔
135

136
    tracing::trace!(?info, "Received");
12✔
137

138
    Ok(info)
12✔
139
}
12✔
140

141
pub(crate) async fn recv_identify<T>(socket: T) -> Result<Info, UpgradeError>
180✔
142
where
180✔
143
    T: AsyncRead + AsyncWrite + Unpin,
180✔
144
{
180✔
145
    let info = recv(socket).await?.try_into()?;
180✔
146

147
    tracing::trace!(?info, "Received");
180✔
148

149
    Ok(info)
180✔
150
}
180✔
151

152
async fn recv<T>(socket: T) -> Result<proto::Identify, UpgradeError>
192✔
153
where
192✔
154
    T: AsyncRead + AsyncWrite + Unpin,
192✔
155
{
192✔
156
    // Even though we won't write to the stream anymore we don't close it here.
157
    // The reason for this is that the `close` call on some transport's require the
158
    // remote's ACK, but it could be that the remote already dropped the stream
159
    // after finishing their write.
160

161
    let info = FramedRead::new(
192✔
162
        socket,
192✔
163
        quick_protobuf_codec::Codec::<proto::Identify>::new(MAX_MESSAGE_SIZE_BYTES),
192✔
164
    )
192✔
165
    .next()
192✔
166
    .await
192✔
167
    .ok_or(UpgradeError::StreamClosed)??;
192✔
168

169
    Ok(info)
192✔
170
}
192✔
171

172
fn parse_listen_addrs(listen_addrs: Vec<Vec<u8>>) -> Vec<Multiaddr> {
195✔
173
    listen_addrs
195✔
174
        .into_iter()
195✔
175
        .filter_map(|bytes| match Multiaddr::try_from(bytes) {
280✔
176
            Ok(a) => Some(a),
277✔
177
            Err(e) => {
3✔
178
                tracing::debug!("Unable to parse multiaddr: {e:?}");
3✔
179
                None
3✔
180
            }
181
        })
280✔
182
        .collect()
195✔
183
}
195✔
184

185
fn parse_protocols(protocols: Vec<String>) -> Vec<StreamProtocol> {
195✔
186
    protocols
195✔
187
        .into_iter()
195✔
188
        .filter_map(|p| match StreamProtocol::try_from_owned(p) {
440✔
189
            Ok(p) => Some(p),
437✔
190
            Err(e) => {
×
191
                tracing::debug!("Received invalid protocol from peer: {e}");
×
192
                None
×
193
            }
194
        })
437✔
195
        .collect()
195✔
196
}
195✔
197

198
fn parse_public_key(public_key: Option<Vec<u8>>) -> Option<PublicKey> {
195✔
199
    public_key.and_then(|key| match PublicKey::try_decode_protobuf(&key) {
195✔
200
        Ok(k) => Some(k),
195✔
201
        Err(e) => {
×
202
            tracing::debug!("Unable to decode public key: {e:?}");
×
203
            None
×
204
        }
205
    })
195✔
206
}
195✔
207

208
fn parse_observed_addr(observed_addr: Option<Vec<u8>>) -> Option<Multiaddr> {
195✔
209
    observed_addr.and_then(|bytes| match Multiaddr::try_from(bytes) {
195✔
210
        Ok(a) => Some(a),
192✔
211
        Err(e) => {
×
212
            tracing::debug!("Unable to parse observed multiaddr: {e:?}");
×
213
            None
×
214
        }
215
    })
192✔
216
}
195✔
217

218
impl TryFrom<proto::Identify> for Info {
219
    type Error = UpgradeError;
220

221
    fn try_from(msg: proto::Identify) -> Result<Self, Self::Error> {
180✔
222
        let identify_public_key = {
180✔
223
            match parse_public_key(msg.publicKey) {
180✔
224
                Some(key) => key,
180✔
225
                // This will always produce a DecodingError if the public key is missing.
226
                None => PublicKey::try_decode_protobuf(Default::default())?,
×
227
            }
228
        };
229

230
        // When signedPeerRecord contains valid addresses, ignore addresses in listenAddrs.
231
        // When signedPeerRecord is invalid or signed by others, ignore the signedPeerRecord(set to
232
        // `None`).
233
        let (listen_addrs, signed_envelope) = msg
180✔
234
            .signedPeerRecord
180✔
235
            .and_then(|b| {
180✔
236
                let envelope = SignedEnvelope::from_protobuf_encoding(b.as_ref()).ok()?;
×
237
                let peer_record = PeerRecord::from_signed_envelope(envelope).ok()?;
×
238
                (peer_record.peer_id() == identify_public_key.to_peer_id()).then_some((
×
239
                    peer_record.addresses().to_vec(),
×
240
                    Some(peer_record.into_signed_envelope()),
×
241
                ))
×
242
            })
×
243
            .unwrap_or_else(|| (parse_listen_addrs(msg.listenAddrs), None));
180✔
244

245
        let info = Info {
180✔
246
            public_key: identify_public_key,
180✔
247
            protocol_version: msg.protocolVersion.unwrap_or_default(),
180✔
248
            agent_version: msg.agentVersion.unwrap_or_default(),
180✔
249
            listen_addrs,
180✔
250
            protocols: parse_protocols(msg.protocols),
180✔
251
            observed_addr: parse_observed_addr(msg.observedAddr).unwrap_or(Multiaddr::empty()),
180✔
252
            signed_peer_record: signed_envelope,
180✔
253
        };
180✔
254

255
        Ok(info)
180✔
256
    }
180✔
257
}
258

259
impl TryFrom<proto::Identify> for PushInfo {
260
    type Error = UpgradeError;
261

262
    fn try_from(msg: proto::Identify) -> Result<Self, Self::Error> {
15✔
263
        let info = PushInfo {
15✔
264
            public_key: parse_public_key(msg.publicKey),
15✔
265
            protocol_version: msg.protocolVersion,
15✔
266
            agent_version: msg.agentVersion,
15✔
267
            listen_addrs: parse_listen_addrs(msg.listenAddrs),
15✔
268
            protocols: parse_protocols(msg.protocols),
15✔
269
            observed_addr: parse_observed_addr(msg.observedAddr),
15✔
270
        };
15✔
271

272
        Ok(info)
15✔
273
    }
15✔
274
}
275

276
#[derive(Debug, Error)]
277
pub enum UpgradeError {
278
    #[error(transparent)]
279
    Codec(#[from] quick_protobuf_codec::Error),
280
    #[error("I/O interaction failed")]
281
    Io(#[from] io::Error),
282
    #[error("Stream closed")]
283
    StreamClosed,
284
    #[error("Failed decoding multiaddr")]
285
    Multiaddr(#[from] multiaddr::Error),
286
    #[error("Failed decoding public key")]
287
    PublicKey(#[from] identity::DecodingError),
288
}
289

290
#[cfg(test)]
291
mod tests {
292
    use std::str::FromStr;
293

294
    use libp2p_core::PeerRecord;
295
    use libp2p_identity as identity;
296
    use quick_protobuf::{BytesReader, MessageRead, MessageWrite, Writer};
297

298
    use super::*;
299

300
    #[test]
301
    fn skip_invalid_multiaddr() {
3✔
302
        let valid_multiaddr: Multiaddr = "/ip6/2001:db8::/tcp/1234".parse().unwrap();
3✔
303
        let valid_multiaddr_bytes = valid_multiaddr.to_vec();
3✔
304

305
        let invalid_multiaddr = {
3✔
306
            let a = vec![255; 8];
3✔
307
            assert!(Multiaddr::try_from(a.clone()).is_err());
3✔
308
            a
3✔
309
        };
310

311
        let payload = proto::Identify {
3✔
312
            agentVersion: None,
3✔
313
            listenAddrs: vec![valid_multiaddr_bytes, invalid_multiaddr],
3✔
314
            observedAddr: None,
3✔
315
            protocolVersion: None,
3✔
316
            protocols: vec![],
3✔
317
            publicKey: Some(
3✔
318
                identity::Keypair::generate_ed25519()
3✔
319
                    .public()
3✔
320
                    .encode_protobuf(),
3✔
321
            ),
3✔
322
            signedPeerRecord: None,
3✔
323
        };
3✔
324

325
        let info = PushInfo::try_from(payload).expect("not to fail");
3✔
326

327
        assert_eq!(info.listen_addrs, vec![valid_multiaddr])
3✔
328
    }
3✔
329

330
    #[test]
331
    fn protobuf_roundtrip() {
3✔
332
        // from go implementation of identify,
333
        // see https://github.com/libp2p/go-libp2p/blob/2209ae05976df6a1cc2631c961f57549d109008c/p2p/protocol/identify/pb/identify.pb.go#L133
334
        // signedPeerRecord field is a dummy one that can't be properly parsed into SignedEnvelope,
335
        // but the wire format doesn't care.
336
        let go_protobuf: [u8; 375] = [
3✔
337
            0x0a, 0x27, 0x70, 0x32, 0x70, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c,
3✔
338
            0x2f, 0x69, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x79, 0x2f, 0x70, 0x62, 0x2f, 0x69,
3✔
339
            0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12,
3✔
340
            0x0b, 0x69, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x79, 0x2e, 0x70, 0x62, 0x22, 0x86,
3✔
341
            0x02, 0x0a, 0x08, 0x49, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x79, 0x12, 0x28, 0x0a,
3✔
342
            0x0f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x56, 0x65, 0x72, 0x73, 0x69,
3✔
343
            0x6f, 0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x70, 0x72, 0x6f, 0x74,
3✔
344
            0x6f, 0x63, 0x6f, 0x6c, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x22, 0x0a,
3✔
345
            0x0c, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18,
3✔
346
            0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x56, 0x65,
3✔
347
            0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1c, 0x0a, 0x09, 0x70, 0x75, 0x62, 0x6c, 0x69,
3✔
348
            0x63, 0x4b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x70, 0x75,
3✔
349
            0x62, 0x6c, 0x69, 0x63, 0x4b, 0x65, 0x79, 0x12, 0x20, 0x0a, 0x0b, 0x6c, 0x69, 0x73,
3✔
350
            0x74, 0x65, 0x6e, 0x41, 0x64, 0x64, 0x72, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0c,
3✔
351
            0x52, 0x0b, 0x6c, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x41, 0x64, 0x64, 0x72, 0x73, 0x12,
3✔
352
            0x22, 0x0a, 0x0c, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x65, 0x64, 0x41, 0x64, 0x64,
3✔
353
            0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0c, 0x6f, 0x62, 0x73, 0x65, 0x72,
3✔
354
            0x76, 0x65, 0x64, 0x41, 0x64, 0x64, 0x72, 0x12, 0x1c, 0x0a, 0x09, 0x70, 0x72, 0x6f,
3✔
355
            0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x09,
3✔
356
            0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x73, 0x12, 0x2a, 0x0a, 0x10, 0x73,
3✔
357
            0x69, 0x67, 0x6e, 0x65, 0x64, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x63, 0x6f, 0x72,
3✔
358
            0x64, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x10, 0x73, 0x69, 0x67, 0x6e, 0x65,
3✔
359
            0x64, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x42, 0x36, 0x5a,
3✔
360
            0x34, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x69,
3✔
361
            0x62, 0x70, 0x32, 0x70, 0x2f, 0x67, 0x6f, 0x2d, 0x6c, 0x69, 0x62, 0x70, 0x32, 0x70,
3✔
362
            0x2f, 0x70, 0x32, 0x70, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2f,
3✔
363
            0x69, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x79, 0x2f, 0x70, 0x62,
3✔
364
        ];
3✔
365
        let mut buf = [0u8; 375];
3✔
366
        let mut message =
3✔
367
            proto::Identify::from_reader(&mut BytesReader::from_bytes(&go_protobuf), &go_protobuf)
3✔
368
                .expect("read to succeed");
3✔
369

370
        // The actual bytes they put in is "github.com/libp2p/go-libp2p/p2p/protocol/identify/pb".
371
        // Starting with Z4 means it is zig-zag-encoded 4-byte varint of string, appended by
372
        // protobuf.
373
        assert_eq!(
3✔
374
            String::from_utf8(
3✔
375
                message
3✔
376
                    .signedPeerRecord
3✔
377
                    .clone()
3✔
378
                    .expect("field to be present")
3✔
379
            )
380
            .expect("parse to succeed"),
3✔
381
            "Z4github.com/libp2p/go-libp2p/p2p/protocol/identify/pb".to_string()
3✔
382
        );
383
        message
3✔
384
            .write_message(&mut Writer::new(&mut buf[..]))
3✔
385
            .expect("same length after roundtrip");
3✔
386
        assert_eq!(go_protobuf, buf);
3✔
387

388
        let identity = identity::Keypair::generate_ed25519();
3✔
389
        let record = PeerRecord::new(
3✔
390
            &identity,
3✔
391
            vec![Multiaddr::from_str("/ip4/0.0.0.0").expect("parse to succeed")],
3✔
392
        )
393
        .expect("infallible siging using ed25519");
3✔
394
        message
3✔
395
            .signedPeerRecord
3✔
396
            .replace(record.into_signed_envelope().into_protobuf_encoding());
3✔
397
        let mut buf = Vec::new();
3✔
398
        message
3✔
399
            .write_message(&mut Writer::new(&mut buf))
3✔
400
            .expect("write to succeed");
3✔
401
        let parsed_message = proto::Identify::from_reader(&mut BytesReader::from_bytes(&buf), &buf)
3✔
402
            .expect("read to succeed");
3✔
403
        assert_eq!(message, parsed_message)
3✔
404
    }
3✔
405
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc