• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

butlergroup / rust-libp2p / 18606728921

17 Oct 2025 10:56PM UTC coverage: 74.664% (+4.3%) from 70.356%
18606728921

push

github

butlergroup
	modified:   .github/workflows/ci.yml

36070 of 48310 relevant lines covered (74.66%)

30290.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.01
/protocols/gossipsub/src/queue.rs
1
// Copyright 2020 Sigma Prime Pty Ltd.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a
4
// copy of this software and associated documentation files (the "Software"),
5
// to deal in the Software without restriction, including without limitation
6
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7
// and/or sell copies of the Software, and to permit persons to whom the
8
// Software is furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19
// DEALINGS IN THE SOFTWARE.
20

21
use std::{
22
    collections::{HashMap, VecDeque},
23
    pin::Pin,
24
    sync::{atomic::AtomicUsize, Arc, Mutex},
25
    task::{Context, Poll, Waker},
26
};
27

28
use crate::{types::RpcOut, MessageId};
29

30
const CONTROL_MSGS_LIMIT: usize = 20_000;
31

32
/// An async priority queue used to dispatch messages from the `NetworkBehaviour`
33
/// Provides a clean abstraction over high-priority (unbounded), control (bounded),
34
/// and non priority (bounded) message queues.
35
#[derive(Debug)]
36
pub(crate) struct Queue {
37
    /// High-priority unbounded queue (Subscribe, Unsubscribe)
38
    pub(crate) priority: Shared,
39
    /// Control messages bounded queue (Graft, Prune, IDontWant)
40
    pub(crate) control: Shared,
41
    /// Low-priority bounded queue (Publish, Forward, IHave, IWant)
42
    pub(crate) non_priority: Shared,
43
    /// The id of the current reference of the counter.
44
    pub(crate) id: usize,
45
    /// The total number of references for the queue.
46
    pub(crate) count: Arc<AtomicUsize>,
47
}
48

49
impl Queue {
50
    /// Create a new `Queue` with `capacity`.
51
    pub(crate) fn new(capacity: usize) -> Self {
1,206✔
52
        Self {
1,206✔
53
            priority: Shared::new(),
1,206✔
54
            control: Shared::with_capacity(CONTROL_MSGS_LIMIT),
1,206✔
55
            non_priority: Shared::with_capacity(capacity),
1,206✔
56
            id: 1,
1,206✔
57
            count: Arc::new(AtomicUsize::new(1)),
1,206✔
58
        }
1,206✔
59
    }
1,206✔
60

61
    /// Try to push a message to the Queue, return Err if the queue is full,
62
    /// which will only happen for control and non priority messages.
63
    pub(crate) fn try_push(&mut self, message: RpcOut) -> Result<(), Box<RpcOut>> {
5,922✔
64
        match message {
5,922✔
65
            RpcOut::Subscribe(_) | RpcOut::Unsubscribe(_) => {
66
                self.priority
1,546✔
67
                    .try_push(message)
1,546✔
68
                    .expect("Shared is unbounded");
1,546✔
69
                Ok(())
1,546✔
70
            }
71
            RpcOut::Graft(_) | RpcOut::Prune(_) | RpcOut::IDontWant(_) => {
72
                self.control.try_push(message)
582✔
73
            }
74
            RpcOut::Publish { .. }
75
            | RpcOut::Forward { .. }
76
            | RpcOut::IHave(_)
77
            | RpcOut::IWant(_) => self.non_priority.try_push(message),
3,794✔
78
        }
79
    }
5,922✔
80

81
    /// Remove pending low priority Publish and Forward messages.
82
    /// Returns the number of messages removed.
83
    pub(crate) fn remove_data_messages(&mut self, message_ids: &[MessageId]) -> usize {
2✔
84
        let mut count = 0;
2✔
85
        self.non_priority.retain(|message| match message {
2✔
86
            RpcOut::Publish { message_id, .. } | RpcOut::Forward { message_id, .. } => {
×
87
                if message_ids.contains(message_id) {
×
88
                    count += 1;
×
89
                    false
×
90
                } else {
91
                    true
×
92
                }
93
            }
94
            _ => true,
×
95
        });
×
96
        count
2✔
97
    }
2✔
98

99
    /// Pop an element from the queue.
100
    pub(crate) fn poll_pop(&mut self, cx: &mut Context) -> Poll<RpcOut> {
×
101
        // First we try the priority messages.
102
        if let Poll::Ready(rpc) = Pin::new(&mut self.priority).poll_pop(cx) {
×
103
            return Poll::Ready(rpc);
×
104
        }
×
105

106
        // Then we try the control messages.
107
        if let Poll::Ready(rpc) = Pin::new(&mut self.control).poll_pop(cx) {
×
108
            return Poll::Ready(rpc);
×
109
        }
×
110

111
        // Finally we try the non priority messages
112
        if let Poll::Ready(rpc) = Pin::new(&mut self.non_priority).poll_pop(cx) {
×
113
            return Poll::Ready(rpc);
×
114
        }
×
115

116
        Poll::Pending
×
117
    }
×
118

119
    /// Check if the queue is empty.
120
    pub(crate) fn is_empty(&self) -> bool {
3,989✔
121
        if !self.priority.is_empty() {
3,989✔
122
            return false;
583✔
123
        }
3,406✔
124

125
        if !self.control.is_empty() {
3,406✔
126
            return false;
168✔
127
        }
3,238✔
128

129
        if !self.non_priority.is_empty() {
3,238✔
130
            return false;
2,552✔
131
        }
686✔
132

133
        true
686✔
134
    }
3,989✔
135

136
    /// Returns the length of the priority queue.
137
    #[cfg(feature = "metrics")]
138
    pub(crate) fn priority_len(&self) -> usize {
139
        self.priority.len() + self.control.len()
140
    }
141

142
    /// Returns the length of the non priority queue.
143
    #[cfg(feature = "metrics")]
144
    pub(crate) fn non_priority_len(&self) -> usize {
145
        self.non_priority.len()
146
    }
147

148
    /// Attempts to pop a message from the queue.
149
    /// returns None if the queue is empty.
150
    #[cfg(test)]
151
    pub(crate) fn try_pop(&mut self) -> Option<RpcOut> {
3,303✔
152
        // Try priority first
153
        self.priority
3,303✔
154
            .try_pop()
3,303✔
155
            // Then control messages
156
            .or_else(|| self.control.try_pop())
3,303✔
157
            // Finally non priority
158
            .or_else(|| self.non_priority.try_pop())
3,303✔
159
    }
3,303✔
160
}
161

162
impl Clone for Queue {
163
    fn clone(&self) -> Self {
1,175✔
164
        let new_id = self.count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1,175✔
165
        Self {
1,175✔
166
            priority: Shared {
1,175✔
167
                inner: self.priority.inner.clone(),
1,175✔
168
                capacity: self.priority.capacity,
1,175✔
169
                id: new_id,
1,175✔
170
            },
1,175✔
171
            control: Shared {
1,175✔
172
                inner: self.control.inner.clone(),
1,175✔
173
                capacity: self.control.capacity,
1,175✔
174
                id: new_id,
1,175✔
175
            },
1,175✔
176
            non_priority: Shared {
1,175✔
177
                inner: self.non_priority.inner.clone(),
1,175✔
178
                capacity: self.non_priority.capacity,
1,175✔
179
                id: new_id,
1,175✔
180
            },
1,175✔
181
            id: self.id,
1,175✔
182
            count: self.count.clone(),
1,175✔
183
        }
1,175✔
184
    }
1,175✔
185
}
186

187
/// The internal shared part of the queue,
188
/// that allows for shallow copies of the queue among each connection of the remote.
189
#[derive(Debug)]
190
pub(crate) struct Shared {
191
    inner: Arc<Mutex<SharedInner>>,
192
    capacity: Option<usize>,
193
    id: usize,
194
}
195

196
impl Shared {
197
    pub(crate) fn with_capacity(capacity: usize) -> Self {
2,412✔
198
        Self {
2,412✔
199
            inner: Arc::new(Mutex::new(SharedInner {
2,412✔
200
                queue: VecDeque::new(),
2,412✔
201
                pending_pops: Default::default(),
2,412✔
202
            })),
2,412✔
203
            capacity: Some(capacity),
2,412✔
204
            id: 1,
2,412✔
205
        }
2,412✔
206
    }
2,412✔
207

208
    pub(crate) fn new() -> Self {
1,206✔
209
        Self {
1,206✔
210
            inner: Arc::new(Mutex::new(SharedInner {
1,206✔
211
                queue: VecDeque::new(),
1,206✔
212
                pending_pops: Default::default(),
1,206✔
213
            })),
1,206✔
214
            capacity: None,
1,206✔
215
            id: 1,
1,206✔
216
        }
1,206✔
217
    }
1,206✔
218

219
    /// Pop an element from the queue.
220
    pub(crate) fn poll_pop(self: std::pin::Pin<&mut Self>, cx: &mut Context) -> Poll<RpcOut> {
×
221
        let mut guard = self.inner.lock().expect("lock to not be poisoned");
×
222
        match guard.queue.pop_front() {
×
223
            Some(t) => Poll::Ready(t),
×
224
            None => {
225
                guard
×
226
                    .pending_pops
×
227
                    .entry(self.id)
×
228
                    .or_insert(cx.waker().clone());
×
229
                Poll::Pending
×
230
            }
231
        }
232
    }
×
233

234
    pub(crate) fn try_push(&mut self, message: RpcOut) -> Result<(), Box<RpcOut>> {
5,922✔
235
        let mut guard = self.inner.lock().expect("lock to not be poisoned");
5,922✔
236
        if self
5,922✔
237
            .capacity
5,922✔
238
            .is_some_and(|capacity| guard.queue.len() >= capacity)
5,922✔
239
        {
240
            return Err(Box::new(message));
7✔
241
        }
5,915✔
242

243
        guard.queue.push_back(message);
5,915✔
244
        // Wake pending registered pops.
245
        for (_, s) in guard.pending_pops.drain() {
5,915✔
246
            s.wake();
×
247
        }
×
248

249
        Ok(())
5,915✔
250
    }
5,922✔
251

252
    /// Retain only the elements specified by the predicate.
253
    /// In other words, remove all elements e for which f(&e) returns false. The elements are
254
    /// visited in unsorted (and unspecified) order. Returns the cleared messages.
255
    pub(crate) fn retain<F: FnMut(&RpcOut) -> bool>(&mut self, f: F) {
2✔
256
        let mut shared = self.inner.lock().expect("lock to not be poisoned");
2✔
257
        shared.queue.retain(f);
2✔
258
    }
2✔
259

260
    /// Check if the queue is empty.
261
    pub(crate) fn is_empty(&self) -> bool {
10,633✔
262
        let guard = self.inner.lock().expect("lock to not be poisoned");
10,633✔
263
        guard.queue.len() == 0
10,633✔
264
    }
10,633✔
265

266
    /// Returns the length of the queue.
267
    #[cfg(feature = "metrics")]
268
    pub(crate) fn len(&self) -> usize {
269
        let guard = self.inner.lock().expect("lock to not be poisoned");
270
        guard.queue.len()
271
    }
272

273
    /// Attempts to pop an message from the queue.
274
    /// returns None if the queue is empty.
275
    #[cfg(test)]
276
    pub(crate) fn try_pop(&mut self) -> Option<RpcOut> {
8,575✔
277
        let mut guard = self.inner.lock().expect("lock to not be poisoned");
8,575✔
278
        guard.queue.pop_front()
8,575✔
279
    }
8,575✔
280
}
281

282
impl Drop for Shared {
283
    fn drop(&mut self) {
7,143✔
284
        let mut guard = self.inner.lock().expect("lock to not be poisoned");
7,143✔
285
        guard.pending_pops.remove(&self.id);
7,143✔
286
    }
7,143✔
287
}
288

289
/// The shared stated by the `NetworkBehaviour`s and the `ConnectionHandler`s.
290
#[derive(Debug)]
291
struct SharedInner {
292
    queue: VecDeque<RpcOut>,
293
    pending_pops: HashMap<usize, Waker>,
294
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc