• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

butlergroup / rust-libp2p / 18610913338

18 Oct 2025 04:41AM UTC coverage: 78.379% (+2.5%) from 75.842%
18610913338

push

github

butlergroup
	modified:   .github/workflows/ci.yml

36944 of 47135 relevant lines covered (78.38%)

37728.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.37
/muxers/yamux/src/lib.rs
1
// Copyright 2018 Parity Technologies (UK) Ltd.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a
4
// copy of this software and associated documentation files (the "Software"),
5
// to deal in the Software without restriction, including without limitation
6
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7
// and/or sell copies of the Software, and to permit persons to whom the
8
// Software is furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19
// DEALINGS IN THE SOFTWARE.
20

21
//! Implementation of the [Yamux](https://github.com/hashicorp/yamux/blob/master/spec.md)  multiplexing protocol for libp2p.
22

23
#![cfg_attr(docsrs, feature(doc_cfg))]
24

25
use std::{
26
    collections::VecDeque,
27
    io,
28
    io::{IoSlice, IoSliceMut},
29
    iter,
30
    pin::Pin,
31
    task::{Context, Poll, Waker},
32
};
33

34
use either::Either;
35
use futures::{prelude::*, ready};
36
use libp2p_core::{
37
    muxing::{StreamMuxer, StreamMuxerEvent},
38
    upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade, UpgradeInfo},
39
};
40
use thiserror::Error;
41

42
/// A Yamux connection.
43
#[derive(Debug)]
44
pub struct Muxer<C> {
45
    connection: Either<yamux012::Connection<C>, yamux013::Connection<C>>,
46
    /// Temporarily buffers inbound streams in case our node is
47
    /// performing backpressure on the remote.
48
    ///
49
    /// The only way how yamux can make progress is by calling
50
    /// [`yamux013::Connection::poll_next_inbound`]. However, the [`StreamMuxer`] interface is
51
    /// designed to allow a caller to selectively make progress via
52
    /// [`StreamMuxer::poll_inbound`] and [`StreamMuxer::poll_outbound`] whilst the more general
53
    /// [`StreamMuxer::poll`] is designed to make progress on existing streams etc.
54
    ///
55
    /// This buffer stores inbound streams that are created whilst [`StreamMuxer::poll`] is called.
56
    /// Once the buffer is full, new inbound streams are dropped.
57
    inbound_stream_buffer: VecDeque<Stream>,
58
    /// Waker to be called when new inbound streams are available.
59
    inbound_stream_waker: Option<Waker>,
60
}
61

62
/// How many streams to buffer before we start resetting them.
63
///
64
/// This is equal to the ACK BACKLOG in `rust-yamux`.
65
/// Thus, for peers running on a recent version of `rust-libp2p`, we should never need to reset
66
/// streams because they'll voluntarily stop opening them once they hit the ACK backlog.
67
const MAX_BUFFERED_INBOUND_STREAMS: usize = 256;
68

69
impl<C> Muxer<C>
70
where
71
    C: AsyncRead + AsyncWrite + Send + Unpin + 'static,
72
{
73
    /// Create a new Yamux connection.
74
    fn new(connection: Either<yamux012::Connection<C>, yamux013::Connection<C>>) -> Self {
29,526✔
75
        Muxer {
29,526✔
76
            connection,
29,526✔
77
            inbound_stream_buffer: VecDeque::default(),
29,526✔
78
            inbound_stream_waker: None,
29,526✔
79
        }
29,526✔
80
    }
29,526✔
81
}
82

83
impl<C> StreamMuxer for Muxer<C>
84
where
85
    C: AsyncRead + AsyncWrite + Unpin + 'static,
86
{
87
    type Substream = Stream;
88
    type Error = Error;
89

90
    #[tracing::instrument(level = "trace", name = "StreamMuxer::poll_inbound", skip(self, cx))]
91
    fn poll_inbound(
250,340✔
92
        mut self: Pin<&mut Self>,
250,340✔
93
        cx: &mut Context<'_>,
250,340✔
94
    ) -> Poll<Result<Self::Substream, Self::Error>> {
250,340✔
95
        if let Some(stream) = self.inbound_stream_buffer.pop_front() {
250,340✔
96
            return Poll::Ready(Ok(stream));
19,338✔
97
        }
231,002✔
98

99
        if let Poll::Ready(res) = self.poll_inner(cx) {
231,002✔
100
            return Poll::Ready(res);
11✔
101
        }
230,991✔
102

103
        self.inbound_stream_waker = Some(cx.waker().clone());
230,991✔
104
        Poll::Pending
230,991✔
105
    }
250,340✔
106

107
    #[tracing::instrument(level = "trace", name = "StreamMuxer::poll_outbound", skip(self, cx))]
108
    fn poll_outbound(
19,353✔
109
        mut self: Pin<&mut Self>,
19,353✔
110
        cx: &mut Context<'_>,
19,353✔
111
    ) -> Poll<Result<Self::Substream, Self::Error>> {
19,353✔
112
        let stream = match self.connection.as_mut() {
19,353✔
113
            Either::Left(c) => ready!(c.poll_new_outbound(cx))
×
114
                .map_err(|e| Error(Either::Left(e)))
×
115
                .map(|s| Stream(Either::Left(s))),
×
116
            Either::Right(c) => ready!(c.poll_new_outbound(cx))
19,353✔
117
                .map_err(|e| Error(Either::Right(e)))
19,353✔
118
                .map(|s| Stream(Either::Right(s))),
19,353✔
119
        }?;
×
120
        Poll::Ready(Ok(stream))
19,353✔
121
    }
19,353✔
122

123
    #[tracing::instrument(level = "trace", name = "StreamMuxer::poll_close", skip(self, cx))]
124
    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
33✔
125
        match self.connection.as_mut() {
33✔
126
            Either::Left(c) => c.poll_close(cx).map_err(|e| Error(Either::Left(e))),
×
127
            Either::Right(c) => c.poll_close(cx).map_err(|e| Error(Either::Right(e))),
33✔
128
        }
129
    }
33✔
130

131
    #[tracing::instrument(level = "trace", name = "StreamMuxer::poll", skip(self, cx))]
132
    fn poll(
271,739✔
133
        self: Pin<&mut Self>,
271,739✔
134
        cx: &mut Context<'_>,
271,739✔
135
    ) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
271,739✔
136
        let this = self.get_mut();
271,739✔
137

138
        let inbound_stream = ready!(this.poll_inner(cx))?;
271,739✔
139

140
        if this.inbound_stream_buffer.len() >= MAX_BUFFERED_INBOUND_STREAMS {
19,338✔
141
            tracing::warn!(
×
142
                stream=%inbound_stream.0,
143
                "dropping stream because buffer is full"
×
144
            );
145
            drop(inbound_stream);
×
146
        } else {
147
            this.inbound_stream_buffer.push_back(inbound_stream);
19,338✔
148

149
            if let Some(waker) = this.inbound_stream_waker.take() {
19,338✔
150
                waker.wake()
15,130✔
151
            }
4,208✔
152
        }
153

154
        // Schedule an immediate wake-up, allowing other code to run.
155
        cx.waker().wake_by_ref();
19,338✔
156
        Poll::Pending
19,338✔
157
    }
271,739✔
158
}
159

160
/// A stream produced by the yamux multiplexer.
161
#[derive(Debug)]
162
pub struct Stream(Either<yamux012::Stream, yamux013::Stream>);
163

164
impl AsyncRead for Stream {
165
    fn poll_read(
526,371✔
166
        mut self: Pin<&mut Self>,
526,371✔
167
        cx: &mut Context<'_>,
526,371✔
168
        buf: &mut [u8],
526,371✔
169
    ) -> Poll<io::Result<usize>> {
526,371✔
170
        either::for_both!(self.0.as_mut(), s => Pin::new(s).poll_read(cx, buf))
526,371✔
171
    }
526,371✔
172

173
    fn poll_read_vectored(
×
174
        mut self: Pin<&mut Self>,
×
175
        cx: &mut Context<'_>,
×
176
        bufs: &mut [IoSliceMut<'_>],
×
177
    ) -> Poll<io::Result<usize>> {
×
178
        either::for_both!(self.0.as_mut(), s => Pin::new(s).poll_read_vectored(cx, bufs))
×
179
    }
×
180
}
181

182
impl AsyncWrite for Stream {
183
    fn poll_write(
188,663✔
184
        mut self: Pin<&mut Self>,
188,663✔
185
        cx: &mut Context<'_>,
188,663✔
186
        buf: &[u8],
188,663✔
187
    ) -> Poll<io::Result<usize>> {
188,663✔
188
        either::for_both!(self.0.as_mut(), s => Pin::new(s).poll_write(cx, buf))
188,663✔
189
    }
188,663✔
190

191
    fn poll_write_vectored(
×
192
        mut self: Pin<&mut Self>,
×
193
        cx: &mut Context<'_>,
×
194
        bufs: &[IoSlice<'_>],
×
195
    ) -> Poll<io::Result<usize>> {
×
196
        either::for_both!(self.0.as_mut(), s => Pin::new(s).poll_write_vectored(cx, bufs))
×
197
    }
×
198

199
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
263,396✔
200
        either::for_both!(self.0.as_mut(), s => Pin::new(s).poll_flush(cx))
263,396✔
201
    }
263,396✔
202

203
    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
38,381✔
204
        either::for_both!(self.0.as_mut(), s => Pin::new(s).poll_close(cx))
38,381✔
205
    }
38,381✔
206
}
207

208
impl<C> Muxer<C>
209
where
210
    C: AsyncRead + AsyncWrite + Unpin + 'static,
211
{
212
    fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Result<Stream, Error>> {
502,741✔
213
        let stream = match self.connection.as_mut() {
502,741✔
214
            Either::Left(c) => ready!(c.poll_next_inbound(cx))
×
215
                .ok_or(Error(Either::Left(yamux012::ConnectionError::Closed)))?
×
216
                .map_err(|e| Error(Either::Left(e)))
×
217
                .map(|s| Stream(Either::Left(s)))?,
×
218
            Either::Right(c) => ready!(c.poll_next_inbound(cx))
502,741✔
219
                .ok_or(Error(Either::Right(yamux013::ConnectionError::Closed)))?
21,381✔
220
                .map_err(|e| Error(Either::Right(e)))
19,540✔
221
                .map(|s| Stream(Either::Right(s)))?,
19,540✔
222
        };
223

224
        Poll::Ready(Ok(stream))
19,344✔
225
    }
502,741✔
226
}
227

228
/// The yamux configuration.
229
#[derive(Debug, Clone)]
230
pub struct Config(Either<Config012, Config013>);
231

232
impl Default for Config {
233
    fn default() -> Self {
15,732✔
234
        Self(Either::Right(Config013::default()))
15,732✔
235
    }
15,732✔
236
}
237

238
#[derive(Debug, Clone)]
239
struct Config012 {
240
    inner: yamux012::Config,
241
    mode: Option<yamux012::Mode>,
242
}
243

244
impl Default for Config012 {
245
    fn default() -> Self {
1✔
246
        let mut inner = yamux012::Config::default();
1✔
247
        // For conformity with mplex, read-after-close on a multiplexed
248
        // connection is never permitted and not configurable.
249
        inner.set_read_after_close(false);
1✔
250
        Self { inner, mode: None }
1✔
251
    }
1✔
252
}
253

254
/// The window update mode determines when window updates are
255
/// sent to the remote, giving it new credit to send more data.
256
pub struct WindowUpdateMode(yamux012::WindowUpdateMode);
257

258
impl WindowUpdateMode {
259
    /// The window update mode whereby the remote is given
260
    /// new credit via a window update whenever the current
261
    /// receive window is exhausted when data is received,
262
    /// i.e. this mode cannot exert back-pressure from application
263
    /// code that is slow to read from a substream.
264
    ///
265
    /// > **Note**: The receive buffer may overflow with this
266
    /// > strategy if the receiver is too slow in reading the
267
    /// > data from the buffer. The maximum receive buffer
268
    /// > size must be tuned appropriately for the desired
269
    /// > throughput and level of tolerance for (temporarily)
270
    /// > slow receivers.
271
    #[deprecated(note = "Use `WindowUpdateMode::on_read` instead.")]
272
    pub fn on_receive() -> Self {
×
273
        #[allow(deprecated)]
274
        WindowUpdateMode(yamux012::WindowUpdateMode::OnReceive)
×
275
    }
×
276

277
    /// The window update mode whereby the remote is given new
278
    /// credit only when the current receive window is exhausted
279
    /// when data is read from the substream's receive buffer,
280
    /// i.e. application code that is slow to read from a substream
281
    /// exerts back-pressure on the remote.
282
    ///
283
    /// > **Note**: If the receive window of a substream on
284
    /// > both peers is exhausted and both peers are blocked on
285
    /// > sending data before reading from the stream, a deadlock
286
    /// > occurs. To avoid this situation, reading from a substream
287
    /// > should never be blocked on writing to the same substream.
288
    ///
289
    /// > **Note**: With this strategy, there is usually no point in the
290
    /// > receive buffer being larger than the window size.
291
    pub fn on_read() -> Self {
×
292
        WindowUpdateMode(yamux012::WindowUpdateMode::OnRead)
×
293
    }
×
294
}
295

296
impl Config {
297
    /// Creates a new `YamuxConfig` in client mode, regardless of whether
298
    /// it will be used for an inbound or outbound upgrade.
299
    #[deprecated(note = "Will be removed with the next breaking release.")]
300
    pub fn client() -> Self {
×
301
        Self(Either::Left(Config012 {
×
302
            mode: Some(yamux012::Mode::Client),
×
303
            ..Default::default()
×
304
        }))
×
305
    }
×
306

307
    /// Creates a new `YamuxConfig` in server mode, regardless of whether
308
    /// it will be used for an inbound or outbound upgrade.
309
    #[deprecated(note = "Will be removed with the next breaking release.")]
310
    pub fn server() -> Self {
×
311
        Self(Either::Left(Config012 {
×
312
            mode: Some(yamux012::Mode::Server),
×
313
            ..Default::default()
×
314
        }))
×
315
    }
×
316

317
    /// Sets the size (in bytes) of the receive window per substream.
318
    #[deprecated(
319
        note = "Will be replaced in the next breaking release with a connection receive window size limit."
320
    )]
321
    pub fn set_receive_window_size(&mut self, num_bytes: u32) -> &mut Self {
×
322
        self.set(|cfg| cfg.set_receive_window(num_bytes))
×
323
    }
×
324

325
    /// Sets the maximum size (in bytes) of the receive buffer per substream.
326
    #[deprecated(note = "Will be removed with the next breaking release.")]
327
    pub fn set_max_buffer_size(&mut self, num_bytes: usize) -> &mut Self {
×
328
        self.set(|cfg| cfg.set_max_buffer_size(num_bytes))
×
329
    }
×
330

331
    /// Sets the maximum number of concurrent substreams.
332
    pub fn set_max_num_streams(&mut self, num_streams: usize) -> &mut Self {
1✔
333
        self.set(|cfg| cfg.set_max_num_streams(num_streams))
1✔
334
    }
1✔
335

336
    /// Sets the window update mode that determines when the remote
337
    /// is given new credit for sending more data.
338
    #[deprecated(
339
        note = "`WindowUpdate::OnRead` is the default. `WindowUpdate::OnReceive` breaks backpressure, is thus not recommended, and will be removed in the next breaking release. Thus this method becomes obsolete and will be removed with the next breaking release."
340
    )]
341
    pub fn set_window_update_mode(&mut self, mode: WindowUpdateMode) -> &mut Self {
×
342
        self.set(|cfg| cfg.set_window_update_mode(mode.0))
×
343
    }
×
344

345
    fn set(&mut self, f: impl FnOnce(&mut yamux012::Config) -> &mut yamux012::Config) -> &mut Self {
1✔
346
        let cfg012 = match self.0.as_mut() {
1✔
347
            Either::Left(c) => &mut c.inner,
×
348
            Either::Right(_) => {
349
                self.0 = Either::Left(Config012::default());
1✔
350
                &mut self.0.as_mut().unwrap_left().inner
1✔
351
            }
352
        };
353

354
        f(cfg012);
1✔
355

356
        self
1✔
357
    }
1✔
358
}
359

360
impl UpgradeInfo for Config {
361
    type Info = &'static str;
362
    type InfoIter = iter::Once<Self::Info>;
363

364
    fn protocol_info(&self) -> Self::InfoIter {
53,464✔
365
        iter::once("/yamux/1.0.0")
53,464✔
366
    }
53,464✔
367
}
368

369
impl<C> InboundConnectionUpgrade<C> for Config
370
where
371
    C: AsyncRead + AsyncWrite + Send + Unpin + 'static,
372
{
373
    type Output = Muxer<C>;
374
    type Error = io::Error;
375
    type Future = future::Ready<Result<Self::Output, Self::Error>>;
376

377
    fn upgrade_inbound(self, io: C, _: Self::Info) -> Self::Future {
14,897✔
378
        let connection = match self.0 {
14,897✔
379
            Either::Left(Config012 { inner, mode }) => Either::Left(yamux012::Connection::new(
×
380
                io,
×
381
                inner,
×
382
                mode.unwrap_or(yamux012::Mode::Server),
×
383
            )),
×
384
            Either::Right(Config013(cfg)) => {
14,897✔
385
                Either::Right(yamux013::Connection::new(io, cfg, yamux013::Mode::Server))
14,897✔
386
            }
387
        };
388

389
        future::ready(Ok(Muxer::new(connection)))
14,897✔
390
    }
14,897✔
391
}
392

393
impl<C> OutboundConnectionUpgrade<C> for Config
394
where
395
    C: AsyncRead + AsyncWrite + Send + Unpin + 'static,
396
{
397
    type Output = Muxer<C>;
398
    type Error = io::Error;
399
    type Future = future::Ready<Result<Self::Output, Self::Error>>;
400

401
    fn upgrade_outbound(self, io: C, _: Self::Info) -> Self::Future {
14,629✔
402
        let connection = match self.0 {
14,629✔
403
            Either::Left(Config012 { inner, mode }) => Either::Left(yamux012::Connection::new(
×
404
                io,
×
405
                inner,
×
406
                mode.unwrap_or(yamux012::Mode::Client),
×
407
            )),
×
408
            Either::Right(Config013(cfg)) => {
14,629✔
409
                Either::Right(yamux013::Connection::new(io, cfg, yamux013::Mode::Client))
14,629✔
410
            }
411
        };
412

413
        future::ready(Ok(Muxer::new(connection)))
14,629✔
414
    }
14,629✔
415
}
416

417
#[derive(Debug, Clone)]
418
struct Config013(yamux013::Config);
419

420
impl Default for Config013 {
421
    fn default() -> Self {
15,732✔
422
        let mut cfg = yamux013::Config::default();
15,732✔
423
        // For conformity with mplex, read-after-close on a multiplexed
424
        // connection is never permitted and not configurable.
425
        cfg.set_read_after_close(false);
15,732✔
426
        Self(cfg)
15,732✔
427
    }
15,732✔
428
}
429

430
/// The Yamux [`StreamMuxer`] error type.
431
#[derive(Debug, Error)]
432
#[error(transparent)]
433
pub struct Error(Either<yamux012::ConnectionError, yamux013::ConnectionError>);
434

435
impl From<Error> for io::Error {
436
    fn from(err: Error) -> Self {
×
437
        match err.0 {
×
438
            Either::Left(err) => match err {
×
439
                yamux012::ConnectionError::Io(e) => e,
×
440
                e => io::Error::other(e),
×
441
            },
442
            Either::Right(err) => match err {
×
443
                yamux013::ConnectionError::Io(e) => e,
×
444
                e => io::Error::other(e),
×
445
            },
446
        }
447
    }
×
448
}
449

450
#[cfg(test)]
451
mod test {
452
    use super::*;
453
    #[test]
454
    fn config_set_switches_to_v012() {
1✔
455
        // By default we use yamux v0.13. Thus we provide the benefits of yamux v0.13 to all users
456
        // that do not depend on any of the behaviors (i.e. configuration options) of v0.12.
457
        let mut cfg = Config::default();
1✔
458
        assert!(matches!(
1✔
459
            cfg,
1✔
460
            Config(Either::Right(Config013(yamux013::Config { .. })))
461
        ));
462

463
        // In case a user makes any configurations, use yamux v0.12 instead.
464
        cfg.set_max_num_streams(42);
1✔
465
        assert!(matches!(cfg, Config(Either::Left(Config012 { .. }))));
1✔
466
    }
1✔
467
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc