• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

butlergroup / rust-libp2p / 18610913338

18 Oct 2025 04:41AM UTC coverage: 78.379% (+2.5%) from 75.842%
18610913338

push

github

butlergroup
	modified:   .github/workflows/ci.yml

36944 of 47135 relevant lines covered (78.38%)

37728.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.19
/misc/multistream-select/src/protocol.rs
1
// Copyright 2017 Parity Technologies (UK) Ltd.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a
4
// copy of this software and associated documentation files (the "Software"),
5
// to deal in the Software without restriction, including without limitation
6
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7
// and/or sell copies of the Software, and to permit persons to whom the
8
// Software is furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19
// DEALINGS IN THE SOFTWARE.
20

21
//! Multistream-select protocol messages an I/O operations for
22
//! constructing protocol negotiation flows.
23
//!
24
//! A protocol negotiation flow is constructed by using the
25
//! `Stream` and `Sink` implementations of `MessageIO` and
26
//! `MessageReader`.
27

28
use std::{
29
    error::Error,
30
    fmt, io,
31
    pin::Pin,
32
    task::{Context, Poll},
33
};
34

35
use bytes::{BufMut, Bytes, BytesMut};
36
use futures::{io::IoSlice, prelude::*, ready};
37
use unsigned_varint as uvi;
38

39
use crate::{
40
    length_delimited::{LengthDelimited, LengthDelimitedReader},
41
    Version,
42
};
43

44
/// The maximum number of supported protocols that can be processed.
45
const MAX_PROTOCOLS: usize = 1000;
46

47
/// The encoded form of a multistream-select 1.0.0 header message.
48
const MSG_MULTISTREAM_1_0: &[u8] = b"/multistream/1.0.0\n";
49
/// The encoded form of a multistream-select 'na' message.
50
const MSG_PROTOCOL_NA: &[u8] = b"na\n";
51
/// The encoded form of a multistream-select 'ls' message.
52
const MSG_LS: &[u8] = b"ls\n";
53

54
/// The multistream-select header lines preceding negotiation.
55
///
56
/// Every [`Version`] has a corresponding header line.
57
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
58
pub(crate) enum HeaderLine {
59
    /// The `/multistream/1.0.0` header line.
60
    V1,
61
}
62

63
impl From<Version> for HeaderLine {
64
    fn from(v: Version) -> HeaderLine {
188,645✔
65
        match v {
188,645✔
66
            Version::V1 | Version::V1Lazy => HeaderLine::V1,
188,645✔
67
        }
68
    }
188,645✔
69
}
70

71
/// A protocol (name) exchanged during protocol negotiation.
72
#[derive(Clone, Debug, PartialEq, Eq)]
73
pub(crate) struct Protocol(String);
74
impl AsRef<str> for Protocol {
75
    fn as_ref(&self) -> &str {
284,168✔
76
        self.0.as_ref()
284,168✔
77
    }
284,168✔
78
}
79

80
impl TryFrom<Bytes> for Protocol {
81
    type Error = ProtocolError;
82

83
    fn try_from(value: Bytes) -> Result<Self, Self::Error> {
187,923✔
84
        if !value.as_ref().starts_with(b"/") {
187,923✔
85
            return Err(ProtocolError::InvalidProtocol);
×
86
        }
187,923✔
87
        let protocol_as_string =
187,923✔
88
            String::from_utf8(value.to_vec()).map_err(|_| ProtocolError::InvalidProtocol)?;
187,923✔
89

90
        Ok(Protocol(protocol_as_string))
187,923✔
91
    }
187,923✔
92
}
93

94
impl TryFrom<&[u8]> for Protocol {
95
    type Error = ProtocolError;
96

97
    fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
×
98
        Self::try_from(Bytes::copy_from_slice(value))
×
99
    }
×
100
}
101

102
impl TryFrom<&str> for Protocol {
103
    type Error = ProtocolError;
104

105
    fn try_from(value: &str) -> Result<Self, Self::Error> {
192,270✔
106
        if !value.starts_with('/') {
192,270✔
107
            return Err(ProtocolError::InvalidProtocol);
×
108
        }
192,270✔
109

110
        Ok(Protocol(value.to_owned()))
192,270✔
111
    }
192,270✔
112
}
113

114
impl fmt::Display for Protocol {
115
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2,012✔
116
        write!(f, "{}", self.0)
2,012✔
117
    }
2,012✔
118
}
119

120
/// A multistream-select protocol message.
121
///
122
/// Multistream-select protocol messages are exchanged with the goal
123
/// of agreeing on an application-layer protocol to use on an I/O stream.
124
#[derive(Debug, Clone, PartialEq, Eq)]
125
pub(crate) enum Message {
126
    /// A header message identifies the multistream-select protocol
127
    /// that the sender wishes to speak.
128
    Header(HeaderLine),
129
    /// A protocol message identifies a protocol request or acknowledgement.
130
    Protocol(Protocol),
131
    /// A message through which a peer requests the complete list of
132
    /// supported protocols from the remote.
133
    ListProtocols,
134
    /// A message listing all supported protocols of a peer.
135
    Protocols(Vec<Protocol>),
136
    /// A message signaling that a requested protocol is not available.
137
    NotAvailable,
138
}
139

140
impl Message {
141
    /// Encodes a `Message` into its byte representation.
142
    fn encode(&self, dest: &mut BytesMut) {
382,214✔
143
        match self {
382,214✔
144
            Message::Header(HeaderLine::V1) => {
190,675✔
145
                dest.reserve(MSG_MULTISTREAM_1_0.len());
190,675✔
146
                dest.put(MSG_MULTISTREAM_1_0);
190,675✔
147
            }
190,675✔
148
            Message::Protocol(p) => {
189,963✔
149
                let len = p.as_ref().len() + 1; // + 1 for \n
189,963✔
150
                dest.reserve(len);
189,963✔
151
                dest.put(p.0.as_ref());
189,963✔
152
                dest.put_u8(b'\n');
189,963✔
153
            }
189,963✔
154
            Message::ListProtocols => {
20✔
155
                dest.reserve(MSG_LS.len());
20✔
156
                dest.put(MSG_LS);
20✔
157
            }
20✔
158
            Message::Protocols(ps) => {
23✔
159
                let mut buf = uvi::encode::usize_buffer();
23✔
160
                let mut encoded = Vec::with_capacity(ps.len());
23✔
161
                for p in ps {
1,040✔
162
                    encoded.extend(uvi::encode::usize(p.as_ref().len() + 1, &mut buf)); // +1 for '\n'
1,017✔
163
                    encoded.extend_from_slice(p.0.as_ref());
1,017✔
164
                    encoded.push(b'\n')
1,017✔
165
                }
166
                encoded.push(b'\n');
23✔
167
                dest.reserve(encoded.len());
23✔
168
                dest.put(encoded.as_ref());
23✔
169
            }
170
            Message::NotAvailable => {
1,533✔
171
                dest.reserve(MSG_PROTOCOL_NA.len());
1,533✔
172
                dest.put(MSG_PROTOCOL_NA);
1,533✔
173
            }
1,533✔
174
        }
175
    }
382,214✔
176

177
    /// Decodes a `Message` from its byte representation.
178
    fn decode(mut msg: Bytes) -> Result<Message, ProtocolError> {
376,100✔
179
        if msg == MSG_MULTISTREAM_1_0 {
376,100✔
180
            return Ok(Message::Header(HeaderLine::V1));
187,618✔
181
        }
188,482✔
182

183
        if msg == MSG_PROTOCOL_NA {
188,482✔
184
            return Ok(Message::NotAvailable);
1,533✔
185
        }
186,949✔
186

187
        if msg == MSG_LS {
186,949✔
188
            return Ok(Message::ListProtocols);
20✔
189
        }
186,929✔
190

191
        // If it starts with a `/`, ends with a line feed without any
192
        // other line feeds in-between, it must be a protocol name.
193
        if msg.first() == Some(&b'/')
186,929✔
194
            && msg.last() == Some(&b'\n')
186,906✔
195
            && !msg[..msg.len() - 1].contains(&b'\n')
186,906✔
196
        {
197
            let p = Protocol::try_from(msg.split_to(msg.len() - 1))?;
186,906✔
198
            return Ok(Message::Protocol(p));
186,906✔
199
        }
23✔
200

201
        // At this point, it must be an `ls` response, i.e. one or more
202
        // length-prefixed, newline-delimited protocol names.
203
        let mut protocols = Vec::new();
23✔
204
        let mut remaining: &[u8] = &msg;
23✔
205
        loop {
206
            // A well-formed message must be terminated with a newline.
207
            if remaining == [b'\n'] {
1,040✔
208
                break;
23✔
209
            } else if protocols.len() == MAX_PROTOCOLS {
1,017✔
210
                return Err(ProtocolError::TooManyProtocols);
×
211
            }
1,017✔
212

213
            // Decode the length of the next protocol name and check that
214
            // it ends with a line feed.
215
            let (len, tail) = uvi::decode::usize(remaining)?;
1,017✔
216
            if len == 0 || len > tail.len() || tail[len - 1] != b'\n' {
1,017✔
217
                return Err(ProtocolError::InvalidMessage);
×
218
            }
1,017✔
219

220
            // Parse the protocol name.
221
            let p = Protocol::try_from(Bytes::copy_from_slice(&tail[..len - 1]))?;
1,017✔
222
            protocols.push(p);
1,017✔
223

224
            // Skip ahead to the next protocol.
225
            remaining = &tail[len..];
1,017✔
226
        }
227

228
        Ok(Message::Protocols(protocols))
23✔
229
    }
376,100✔
230
}
231

232
/// A `MessageIO` implements a [`Stream`] and [`Sink`] of [`Message`]s.
233
#[pin_project::pin_project]
234
pub(crate) struct MessageIO<R> {
235
    #[pin]
236
    inner: LengthDelimited<R>,
237
}
238

239
impl<R> MessageIO<R> {
240
    /// Constructs a new `MessageIO` resource wrapping the given I/O stream.
241
    pub(crate) fn new(inner: R) -> MessageIO<R>
113,708✔
242
    where
113,708✔
243
        R: AsyncRead + AsyncWrite,
113,708✔
244
    {
245
        Self {
113,708✔
246
            inner: LengthDelimited::new(inner),
113,708✔
247
        }
113,708✔
248
    }
113,708✔
249

250
    /// Converts the [`MessageIO`] into a [`MessageReader`], dropping the
251
    /// [`Message`]-oriented `Sink` in favour of direct `AsyncWrite` access
252
    /// to the underlying I/O stream.
253
    ///
254
    /// This is typically done if further negotiation messages are expected to be
255
    /// received but no more messages are written, allowing the writing of
256
    /// follow-up protocol data to commence.
257
    pub(crate) fn into_reader(self) -> MessageReader<R> {
467✔
258
        MessageReader {
467✔
259
            inner: self.inner.into_reader(),
467✔
260
        }
467✔
261
    }
467✔
262

263
    /// Drops the [`MessageIO`] resource, yielding the underlying I/O stream.
264
    ///
265
    /// # Panics
266
    ///
267
    /// Panics if the read buffer or write buffer is not empty, meaning that an incoming
268
    /// protocol negotiation frame has been partially read or an outgoing frame
269
    /// has not yet been flushed. The read buffer is guaranteed to be empty whenever
270
    /// `MessageIO::poll` returned a message. The write buffer is guaranteed to be empty
271
    /// when the sink has been flushed.
272
    pub(crate) fn into_inner(self) -> R {
98,725✔
273
        self.inner.into_inner()
98,725✔
274
    }
98,725✔
275
}
276

277
impl<R> Sink<Message> for MessageIO<R>
278
where
279
    R: AsyncWrite,
280
{
281
    type Error = ProtocolError;
282

283
    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
205,492✔
284
        self.project().inner.poll_ready(cx).map_err(From::from)
205,492✔
285
    }
205,492✔
286

287
    fn start_send(self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
205,492✔
288
        let mut buf = BytesMut::new();
205,492✔
289
        item.encode(&mut buf);
205,492✔
290
        self.project()
205,492✔
291
            .inner
205,492✔
292
            .start_send(buf.freeze())
205,492✔
293
            .map_err(From::from)
205,492✔
294
    }
205,492✔
295

296
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
153,711✔
297
        self.project().inner.poll_flush(cx).map_err(From::from)
153,711✔
298
    }
153,711✔
299

300
    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
×
301
        self.project().inner.poll_close(cx).map_err(From::from)
×
302
    }
×
303
}
304

305
impl<R> Stream for MessageIO<R>
306
where
307
    R: AsyncRead,
308
{
309
    type Item = Result<Message, ProtocolError>;
310

311
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
268,971✔
312
        match poll_stream(self.project().inner, cx) {
268,971✔
313
            Poll::Pending => Poll::Pending,
66,051✔
314
            Poll::Ready(None) => Poll::Ready(None),
796✔
315
            Poll::Ready(Some(Ok(m))) => Poll::Ready(Some(Ok(m))),
201,875✔
316
            Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
249✔
317
        }
318
    }
268,971✔
319
}
320

321
/// A `MessageReader` implements a `Stream` of `Message`s on an underlying
322
/// I/O resource combined with direct `AsyncWrite` access.
323
#[pin_project::pin_project]
324
#[derive(Debug)]
325
pub(crate) struct MessageReader<R> {
326
    #[pin]
327
    inner: LengthDelimitedReader<R>,
328
}
329

330
impl<R> MessageReader<R> {
331
    /// Drops the `MessageReader` resource, yielding the underlying I/O stream
332
    /// together with the remaining write buffer containing the protocol
333
    /// negotiation frame data that has not yet been written to the I/O stream.
334
    ///
335
    /// # Panics
336
    ///
337
    /// Panics if the read buffer or write buffer is not empty, meaning that either
338
    /// an incoming protocol negotiation frame has been partially read, or an
339
    /// outgoing frame has not yet been flushed. The read buffer is guaranteed to
340
    /// be empty whenever `MessageReader::poll` returned a message. The write
341
    /// buffer is guaranteed to be empty whenever the sink has been flushed.
342
    pub(crate) fn into_inner(self) -> R {
1✔
343
        self.inner.into_inner()
1✔
344
    }
1✔
345
}
346

347
impl<R> Stream for MessageReader<R>
348
where
349
    R: AsyncRead,
350
{
351
    type Item = Result<Message, ProtocolError>;
352

353
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1,266✔
354
        poll_stream(self.project().inner, cx)
1,266✔
355
    }
1,266✔
356
}
357

358
impl<TInner> AsyncWrite for MessageReader<TInner>
359
where
360
    TInner: AsyncWrite,
361
{
362
    fn poll_write(
250✔
363
        self: Pin<&mut Self>,
250✔
364
        cx: &mut Context<'_>,
250✔
365
        buf: &[u8],
250✔
366
    ) -> Poll<Result<usize, io::Error>> {
250✔
367
        self.project().inner.poll_write(cx, buf)
250✔
368
    }
250✔
369

370
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
1,027✔
371
        self.project().inner.poll_flush(cx)
1,027✔
372
    }
1,027✔
373

374
    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
1✔
375
        self.project().inner.poll_close(cx)
1✔
376
    }
1✔
377

378
    fn poll_write_vectored(
×
379
        self: Pin<&mut Self>,
×
380
        cx: &mut Context<'_>,
×
381
        bufs: &[IoSlice<'_>],
×
382
    ) -> Poll<Result<usize, io::Error>> {
×
383
        self.project().inner.poll_write_vectored(cx, bufs)
×
384
    }
×
385
}
386

387
fn poll_stream<S>(
270,237✔
388
    stream: Pin<&mut S>,
270,237✔
389
    cx: &mut Context<'_>,
270,237✔
390
) -> Poll<Option<Result<Message, ProtocolError>>>
270,237✔
391
where
270,237✔
392
    S: Stream<Item = Result<Bytes, io::Error>>,
270,237✔
393
{
394
    let msg = if let Some(msg) = ready!(stream.poll_next(cx)?) {
270,237✔
395
        match Message::decode(msg) {
202,582✔
396
            Ok(m) => m,
202,582✔
397
            Err(err) => return Poll::Ready(Some(Err(err))),
×
398
        }
399
    } else {
400
        return Poll::Ready(None);
796✔
401
    };
402

403
    tracing::trace!(message=?msg, "Received message");
202,582✔
404

405
    Poll::Ready(Some(Ok(msg)))
202,582✔
406
}
270,237✔
407

408
/// A protocol error.
409
#[derive(Debug)]
410
pub enum ProtocolError {
411
    /// I/O error.
412
    IoError(io::Error),
413

414
    /// Received an invalid message from the remote.
415
    InvalidMessage,
416

417
    /// A protocol (name) is invalid.
418
    InvalidProtocol,
419

420
    /// Too many protocols have been returned by the remote.
421
    TooManyProtocols,
422
}
423

424
impl From<io::Error> for ProtocolError {
425
    fn from(err: io::Error) -> ProtocolError {
789✔
426
        ProtocolError::IoError(err)
789✔
427
    }
789✔
428
}
429

430
impl From<ProtocolError> for io::Error {
431
    fn from(err: ProtocolError) -> Self {
×
432
        if let ProtocolError::IoError(e) = err {
×
433
            return e;
×
434
        }
×
435
        io::ErrorKind::InvalidData.into()
×
436
    }
×
437
}
438

439
impl From<uvi::decode::Error> for ProtocolError {
440
    fn from(err: uvi::decode::Error) -> ProtocolError {
×
441
        Self::from(io::Error::new(io::ErrorKind::InvalidData, err.to_string()))
×
442
    }
×
443
}
444

445
impl Error for ProtocolError {
446
    fn source(&self) -> Option<&(dyn Error + 'static)> {
×
447
        match *self {
×
448
            ProtocolError::IoError(ref err) => Some(err),
×
449
            _ => None,
×
450
        }
451
    }
×
452
}
453

454
impl fmt::Display for ProtocolError {
455
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
×
456
        match self {
×
457
            ProtocolError::IoError(e) => write!(fmt, "I/O error: {e}"),
×
458
            ProtocolError::InvalidMessage => write!(fmt, "Received an invalid message."),
×
459
            ProtocolError::InvalidProtocol => write!(fmt, "A protocol (name) is invalid."),
×
460
            ProtocolError::TooManyProtocols => write!(fmt, "Too many protocols received."),
×
461
        }
462
    }
×
463
}
464

465
#[cfg(test)]
466
mod tests {
467
    use std::iter;
468

469
    use quickcheck::*;
470

471
    use super::*;
472

473
    impl Arbitrary for Protocol {
474
        fn arbitrary(g: &mut Gen) -> Protocol {
1,030✔
475
            let n = g.gen_range(1..g.size());
1,030✔
476
            let p: String = iter::repeat(())
1,030✔
477
                .map(|()| char::arbitrary(g))
204,996✔
478
                .filter(|&c| c.is_ascii_alphanumeric())
204,996✔
479
                .take(n)
1,030✔
480
                .collect();
1,030✔
481
            Protocol(format!("/{p}"))
1,030✔
482
        }
1,030✔
483
    }
484

485
    impl Arbitrary for Message {
486
        fn arbitrary(g: &mut Gen) -> Message {
100✔
487
            match g.gen_range(0..5u8) {
100✔
488
                0 => Message::Header(HeaderLine::V1),
24✔
489
                1 => Message::NotAvailable,
20✔
490
                2 => Message::ListProtocols,
20✔
491
                3 => Message::Protocol(Protocol::arbitrary(g)),
13✔
492
                4 => Message::Protocols(Vec::arbitrary(g)),
23✔
493
                _ => panic!(),
494
            }
495
        }
100✔
496
    }
497

498
    #[test]
499
    fn encode_decode_message() {
1✔
500
        fn prop(msg: Message) {
100✔
501
            let mut buf = BytesMut::new();
100✔
502
            msg.encode(&mut buf);
100✔
503
            match Message::decode(buf.freeze()) {
100✔
504
                Ok(m) => assert_eq!(m, msg),
100✔
505
                Err(e) => panic!("Decoding failed: {e:?}"),
506
            }
507
        }
100✔
508
        quickcheck(prop as fn(_))
1✔
509
    }
1✔
510
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc