• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

butlergroup / rust-libp2p / 18610913338

18 Oct 2025 04:41AM UTC coverage: 78.379% (+2.5%) from 75.842%
18610913338

push

github

butlergroup
	modified:   .github/workflows/ci.yml

36944 of 47135 relevant lines covered (78.38%)

37728.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

44.21
/swarm/src/handler/one_shot.rs
1
// Copyright 2019 Parity Technologies (UK) Ltd.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a
4
// copy of this software and associated documentation files (the "Software"),
5
// to deal in the Software without restriction, including without limitation
6
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7
// and/or sell copies of the Software, and to permit persons to whom the
8
// Software is furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19
// DEALINGS IN THE SOFTWARE.
20

21
use std::{
22
    error,
23
    fmt::Debug,
24
    task::{Context, Poll},
25
    time::Duration,
26
};
27

28
use smallvec::SmallVec;
29

30
use crate::{
31
    handler::{
32
        ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError,
33
        FullyNegotiatedInbound, FullyNegotiatedOutbound, SubstreamProtocol,
34
    },
35
    upgrade::{InboundUpgradeSend, OutboundUpgradeSend},
36
    StreamUpgradeError,
37
};
38

39
/// A [`ConnectionHandler`] that opens a new substream for each request.
40
// TODO: Debug
41
pub struct OneShotHandler<TInbound, TOutbound, TEvent>
42
where
43
    TOutbound: OutboundUpgradeSend,
44
{
45
    /// The upgrade for inbound substreams.
46
    listen_protocol: SubstreamProtocol<TInbound, ()>,
47
    /// Queue of events to produce in `poll()`.
48
    events_out: SmallVec<[Result<TEvent, StreamUpgradeError<TOutbound::Error>>; 4]>,
49
    /// Queue of outbound substreams to open.
50
    dial_queue: SmallVec<[TOutbound; 4]>,
51
    /// Current number of concurrent outbound substreams being opened.
52
    dial_negotiated: u32,
53
    /// The configuration container for the handler
54
    config: OneShotHandlerConfig,
55
}
56

57
impl<TInbound, TOutbound, TEvent> OneShotHandler<TInbound, TOutbound, TEvent>
58
where
59
    TOutbound: OutboundUpgradeSend,
60
{
61
    /// Creates a `OneShotHandler`.
62
    pub fn new(
1✔
63
        listen_protocol: SubstreamProtocol<TInbound, ()>,
1✔
64
        config: OneShotHandlerConfig,
1✔
65
    ) -> Self {
1✔
66
        OneShotHandler {
1✔
67
            listen_protocol,
1✔
68
            events_out: SmallVec::new(),
1✔
69
            dial_queue: SmallVec::new(),
1✔
70
            dial_negotiated: 0,
1✔
71
            config,
1✔
72
        }
1✔
73
    }
1✔
74

75
    /// Returns the number of pending requests.
76
    pub fn pending_requests(&self) -> u32 {
×
77
        self.dial_negotiated + self.dial_queue.len() as u32
×
78
    }
×
79

80
    /// Returns a reference to the listen protocol configuration.
81
    ///
82
    /// > **Note**: If you modify the protocol, modifications will only applies to future inbound
83
    /// > substreams, not the ones already being negotiated.
84
    pub fn listen_protocol_ref(&self) -> &SubstreamProtocol<TInbound, ()> {
×
85
        &self.listen_protocol
×
86
    }
×
87

88
    /// Returns a mutable reference to the listen protocol configuration.
89
    ///
90
    /// > **Note**: If you modify the protocol, modifications will only applies to future inbound
91
    /// > substreams, not the ones already being negotiated.
92
    pub fn listen_protocol_mut(&mut self) -> &mut SubstreamProtocol<TInbound, ()> {
×
93
        &mut self.listen_protocol
×
94
    }
×
95

96
    /// Opens an outbound substream with `upgrade`.
97
    pub fn send_request(&mut self, upgrade: TOutbound) {
×
98
        self.dial_queue.push(upgrade);
×
99
    }
×
100
}
101

102
impl<TInbound, TOutbound, TEvent> Default for OneShotHandler<TInbound, TOutbound, TEvent>
103
where
104
    TOutbound: OutboundUpgradeSend,
105
    TInbound: InboundUpgradeSend + Default,
106
{
107
    fn default() -> Self {
×
108
        OneShotHandler::new(
×
109
            SubstreamProtocol::new(Default::default(), ()),
×
110
            OneShotHandlerConfig::default(),
×
111
        )
112
    }
×
113
}
114

115
impl<TInbound, TOutbound, TEvent> ConnectionHandler for OneShotHandler<TInbound, TOutbound, TEvent>
116
where
117
    TInbound: InboundUpgradeSend + Send + 'static,
118
    TOutbound: Debug + OutboundUpgradeSend,
119
    TInbound::Output: Into<TEvent>,
120
    TOutbound::Output: Into<TEvent>,
121
    TOutbound::Error: error::Error + Send + 'static,
122
    SubstreamProtocol<TInbound, ()>: Clone,
123
    TEvent: Debug + Send + 'static,
124
{
125
    type FromBehaviour = TOutbound;
126
    type ToBehaviour = Result<TEvent, StreamUpgradeError<TOutbound::Error>>;
127
    type InboundProtocol = TInbound;
128
    type OutboundProtocol = TOutbound;
129
    type OutboundOpenInfo = ();
130
    type InboundOpenInfo = ();
131

132
    fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
×
133
        self.listen_protocol.clone()
×
134
    }
×
135

136
    fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
×
137
        self.send_request(event);
×
138
    }
×
139

140
    fn poll(
1✔
141
        &mut self,
1✔
142
        _: &mut Context<'_>,
1✔
143
    ) -> Poll<ConnectionHandlerEvent<Self::OutboundProtocol, (), Self::ToBehaviour>> {
1✔
144
        if !self.events_out.is_empty() {
1✔
145
            return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
×
146
                self.events_out.remove(0),
×
147
            ));
×
148
        } else {
1✔
149
            self.events_out.shrink_to_fit();
1✔
150
        }
1✔
151

152
        if !self.dial_queue.is_empty() {
1✔
153
            if self.dial_negotiated < self.config.max_dial_negotiated {
×
154
                self.dial_negotiated += 1;
×
155
                let upgrade = self.dial_queue.remove(0);
×
156
                return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
×
157
                    protocol: SubstreamProtocol::new(upgrade, ())
×
158
                        .with_timeout(self.config.outbound_substream_timeout),
×
159
                });
×
160
            }
×
161
        } else {
1✔
162
            self.dial_queue.shrink_to_fit();
1✔
163
        }
1✔
164

165
        Poll::Pending
1✔
166
    }
1✔
167

168
    fn on_connection_event(
×
169
        &mut self,
×
170
        event: ConnectionEvent<Self::InboundProtocol, Self::OutboundProtocol>,
×
171
    ) {
×
172
        match event {
×
173
            ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
174
                protocol: out,
×
175
                ..
176
            }) => {
×
177
                self.events_out.push(Ok(out.into()));
×
178
            }
×
179
            ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
180
                protocol: out,
×
181
                ..
182
            }) => {
×
183
                self.dial_negotiated -= 1;
×
184
                self.events_out.push(Ok(out.into()));
×
185
            }
×
186
            ConnectionEvent::DialUpgradeError(DialUpgradeError { error, .. }) => {
×
187
                self.events_out.push(Err(error));
×
188
            }
×
189
            ConnectionEvent::AddressChange(_)
190
            | ConnectionEvent::ListenUpgradeError(_)
191
            | ConnectionEvent::LocalProtocolsChange(_)
192
            | ConnectionEvent::RemoteProtocolsChange(_) => {}
×
193
        }
194
    }
×
195
}
196

197
/// Configuration parameters for the `OneShotHandler`
198
#[derive(Debug)]
199
pub struct OneShotHandlerConfig {
200
    /// Timeout for outbound substream upgrades.
201
    pub outbound_substream_timeout: Duration,
202
    /// Maximum number of concurrent outbound substreams being opened.
203
    pub max_dial_negotiated: u32,
204
}
205

206
impl Default for OneShotHandlerConfig {
207
    fn default() -> Self {
1✔
208
        OneShotHandlerConfig {
1✔
209
            outbound_substream_timeout: Duration::from_secs(10),
1✔
210
            max_dial_negotiated: 8,
1✔
211
        }
1✔
212
    }
1✔
213
}
214

215
#[cfg(test)]
216
mod tests {
217
    use std::convert::Infallible;
218

219
    use futures::{executor::block_on, future::poll_fn};
220
    use libp2p_core::upgrade::DeniedUpgrade;
221

222
    use super::*;
223

224
    #[test]
225
    fn do_not_keep_idle_connection_alive() {
1✔
226
        let mut handler: OneShotHandler<_, DeniedUpgrade, Infallible> = OneShotHandler::new(
1✔
227
            SubstreamProtocol::new(DeniedUpgrade {}, ()),
1✔
228
            Default::default(),
1✔
229
        );
230

231
        block_on(poll_fn(|cx| loop {
1✔
232
            if handler.poll(cx).is_pending() {
1✔
233
                return Poll::Ready(());
1✔
234
            }
235
        }));
1✔
236

237
        assert!(!handler.connection_keep_alive());
1✔
238
    }
1✔
239
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc